Coding style.

Using asyncio coroutines for workers instead of threads.
This commit is contained in:
Andrew Hamilton 2016-03-09 10:47:09 +00:00
parent b2e087a9db
commit 4fa5b524d4
8 changed files with 103 additions and 101 deletions

3
BUGS
View file

@ -9,6 +9,9 @@ Current
the timed out status when un-paused. the timed out status when un-paused.
- sandbox_fs_test.py is failing when run by the vigil that was started in - sandbox_fs_test.py is failing when run by the vigil that was started in
my startup script, but passing otherwise. my startup script, but passing otherwise.
- When shutting down "Exception ignored in:" noise appears. Four messages
for every worker running. This started with the move to asyncio for the
workers.
Current (tool related) Current (tool related)

View file

@ -79,14 +79,6 @@ class SandboxFs:
mount.umount() mount.umount()
self.overlay_mounts = [] self.overlay_mounts = []
def Popen(self, command, env=None): def command(self, command, env=None):
full_command = _in_chroot(self.mount_point, return _in_chroot(self.mount_point,
_in_directory(os.getcwd(), command)) _in_directory(os.getcwd(), command))
return subprocess.Popen(full_command, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=env)
def run_command(self, command, env=None):
process = self.Popen(command, env)
stdout, stderr = process.communicate()
return stdout, stderr, process.returncode

View file

@ -35,7 +35,7 @@ class SandboxFilesystemTestCase(unittest.TestCase):
self.assertTrue(os.path.exists(home_directory)) self.assertTrue(os.path.exists(home_directory))
def test_run_a_command_in_the_sandbox(self): def test_run_a_command_in_the_sandbox(self):
stdout, stderr, returncode = self.sandbox.run_command(["pwd"]) stdout = subprocess.check_output(self.sandbox.command(["pwd"]))
self.assertEqual(stdout.strip().decode("utf-8"), os.environ["PWD"]) self.assertEqual(stdout.strip().decode("utf-8"), os.environ["PWD"])

View file

@ -4,6 +4,7 @@
# Licensed under the Artistic License 2.0. # Licensed under the Artistic License 2.0.
import ast import ast
import asyncio
import contextlib import contextlib
import dis import dis
import enum import enum
@ -654,7 +655,8 @@ class Result:
self.status = status self.status = status
self.entry.appearance_cache = None self.entry.appearance_cache = None
def run(self, log, appearance_changed_event, worker, runner): @asyncio.coroutine
def run(self, log, appearance_changed_event, runner):
self.is_placeholder = False self.is_placeholder = False
tool_name = tool_name_colored(self.tool, self.path) tool_name = tool_name_colored(self.tool, self.path)
path = path_colored(self.path) path = path_colored(self.path)
@ -665,7 +667,7 @@ class Result:
runner.pause() runner.pause()
appearance_changed_event.set() appearance_changed_event.set()
start_time = time.time() start_time = time.time()
new_status = worker.run_tool(self.path, self.tool) new_status = yield from runner.run_tool(self.path, self.tool)
Result.result.fget.evict(self) Result.result.fget.evict(self)
end_time = time.time() end_time = time.time()
self.set_status(new_status) self.set_status(new_status)

111
vigil
View file

@ -70,6 +70,7 @@ import time
import traceback import traceback
import docopt import docopt
import psutil
import pyinotify import pyinotify
import urwid import urwid
import urwid.raw_display import urwid.raw_display
@ -710,7 +711,7 @@ class Screen:
runner.continue_() runner.continue_()
def quit_(self): def quit_(self):
raise KeyboardInterrupt os.kill(os.getpid(), signal.SIGINT)
def refresh(self): def refresh(self):
self._summary.refresh(self._log) self._summary.refresh(self._log)
@ -834,18 +835,53 @@ def _regulate_temperature(log):
log.log_message("The computer has cooled down. Continuing...") log.log_message("The computer has cooled down. Continuing...")
def _make_process_nicest(pid):
process = psutil.Process(pid)
process.nice(19)
process.ionice(psutil.IOPRIO_CLASS_IDLE)
class Runner: class Runner:
def __init__(self, sandbox, is_already_paused, is_being_tested): def __init__(self, sandbox, is_already_paused, is_being_tested):
self.result = None self.sandbox = sandbox
self.worker = worker.Worker(sandbox)
self.is_already_paused = is_already_paused self.is_already_paused = is_already_paused
self.is_being_tested = is_being_tested self.is_being_tested = is_being_tested
self.result = None
self.process = None
self.child_pid = None
@asyncio.coroutine
def create_process(self):
if self.sandbox is None:
command = [worker.__file__]
else:
cache_path = os.path.join(os.getcwd(), tools.CACHE_PATH)
cache_mount = self.sandbox.mount_point + cache_path
subprocess.check_call(["sudo", "mount", "--bind", cache_path,
cache_mount])
command = self.sandbox.command([worker.__file__])
create = asyncio.create_subprocess_exec(
*command, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
self.process = yield from create
pid_line = yield from self.process.stdout.readline()
self.child_pid = int(pid_line.strip())
@asyncio.coroutine
def run_tool(self, path, tool):
self.process.stdin.write(("%s\n%s\n" %
(tool.__qualname__, path)).encode("utf-8"))
data = yield from self.process.stdout.readline()
return tools.Status(int(data))
@asyncio.coroutine
def job_runner(self, summary, log, jobs_added_event, def job_runner(self, summary, log, jobs_added_event,
appearance_changed_event): appearance_changed_event):
yield from self.create_process()
_make_process_nicest(self.child_pid)
while True: while True:
jobs_added_event.wait() yield from jobs_added_event.wait()
while True: while True:
# _regulate_temperature(log) # My fan is broken # _regulate_temperature(log) # My fan is broken
try: try:
@ -857,8 +893,7 @@ class Runner:
if self.is_being_tested: if self.is_being_tested:
os.kill(os.getpid(), signal.SIGINT) os.kill(os.getpid(), signal.SIGINT)
break break
with contextlib.suppress(ValueError): # Process was terminated yield from self.result.run(log, appearance_changed_event,
self.result.run(log, appearance_changed_event, self.worker,
self) self)
summary.completed_total += 1 summary.completed_total += 1
jobs_added_event.clear() jobs_added_event.clear()
@ -866,14 +901,14 @@ class Runner:
def pause(self): def pause(self):
if self.result is not None and \ if self.result is not None and \
self.result.status == tools.Status.running: self.result.status == tools.Status.running:
self.worker.pause() os.kill(self.child_pid, signal.SIGSTOP)
self.result.set_status(tools.Status.paused) self.result.set_status(tools.Status.paused)
def continue_(self): def continue_(self):
if self.result is not None and \ if self.result is not None and \
self.result.status == tools.Status.paused: self.result.status == tools.Status.paused:
self.result.set_status(tools.Status.running) self.result.set_status(tools.Status.running)
self.worker.continue_() os.kill(self.child_pid, signal.SIGCONT)
def _add_watch_manager_to_mainloop(root_path, mainloop, on_filesystem_change, def _add_watch_manager_to_mainloop(root_path, mainloop, on_filesystem_change,
@ -920,13 +955,12 @@ def _update_screen(main_widget, appearance_changed_event):
fill3.patch_screen(main_widget) fill3.patch_screen(main_widget)
def main(root_path, worker_count=None, is_sandboxed=True, editor_command=None, def main(root_path, loop, worker_count=None, is_sandboxed=True,
is_being_tested=False): editor_command=None, is_being_tested=False):
if worker_count is None: if worker_count is None:
worker_count = multiprocessing.cpu_count()*2 worker_count = multiprocessing.cpu_count()*2
global _UPDATE_THREAD_STOPPED global _UPDATE_THREAD_STOPPED
loop = asyncio.get_event_loop() jobs_added_event = asyncio.Event()
jobs_added_event = threading.Event()
appearance_changed_event = threading.Event() appearance_changed_event = threading.Event()
is_first_run = True is_first_run = True
try: try:
@ -965,54 +999,55 @@ def main(root_path, worker_count=None, is_sandboxed=True, editor_command=None,
sandbox = sandbox_fs.SandboxFs(sandbox_temp_dir) sandbox = sandbox_fs.SandboxFs(sandbox_temp_dir)
else: else:
sandbox = None sandbox = None
try:
def start_runners():
if is_sandboxed: if is_sandboxed:
log.log_message("Making filesystem sandbox...") log.log_message("Making filesystem sandbox...")
sandbox.mount() sandbox.mount()
log.log_message("Sandbox made.") log.log_message("Sandbox made.")
else: else:
log.log_message("Running without the filesystem sandbox...") log.log_message("Running without the filesystem sandbox...")
log.log_message("Starting workers...") log.log_message("Starting workers (%s) ..." % worker_count)
for index in range(worker_count): for index in range(worker_count):
runners.append(Runner(sandbox, screen._is_paused, is_being_tested)) runner = Runner(sandbox, screen._is_paused, is_being_tested)
log.log_message("Workers started. (%s)" % worker_count) runners.append(runner)
for runner in runners: future = runner.job_runner(
args = (summary, log, jobs_added_event, appearance_changed_event) summary, log, jobs_added_event, appearance_changed_event)
threading.Thread(target=runner.job_runner, args=args, runner.future = asyncio.ensure_future(future, loop=loop)
daemon=True).start()
try:
threading.Thread(target=start_runners, daemon=True).start()
def on_input(urwid_screen): def on_window_resize():
for event in urwid_screen.get_input():
screen.on_input_event(event)
def on_window_resize(n, frame):
appearance_changed_event.set() appearance_changed_event.set()
appearance_changed_event.set() appearance_changed_event.set()
update_display_thread = threading.Thread( update_display_thread = threading.Thread(
target=_update_screen, args=(screen, appearance_changed_event), target=_update_screen, args=(screen, appearance_changed_event),
daemon=True) daemon=True)
def exit_loop():
log.log_command("Exiting...")
time.sleep(0.05)
for runner in runners:
runner.pause()
runner.future.cancel()
if runner.result is not None:
runner.result.reset()
loop.stop()
loop.add_signal_handler(signal.SIGWINCH, on_window_resize)
loop.add_signal_handler(signal.SIGINT, exit_loop)
loop.add_signal_handler(signal.SIGTERM, exit_loop)
with terminal.hidden_cursor(): with terminal.hidden_cursor():
with _urwid_screen() as urwid_screen: with _urwid_screen() as urwid_screen:
def on_input(urwid_screen):
for event in urwid_screen.get_input():
screen.on_input_event(event)
loop.add_reader(sys.stdin, on_input, urwid_screen) loop.add_reader(sys.stdin, on_input, urwid_screen)
update_display_thread.start() update_display_thread.start()
try: try:
signal.signal(signal.SIGWINCH, on_window_resize)
with contextlib.suppress(KeyboardInterrupt):
loop.run_forever() loop.run_forever()
log.log_command("Exiting...")
time.sleep(0.05)
finally: finally:
_UPDATE_THREAD_STOPPED = True _UPDATE_THREAD_STOPPED = True
appearance_changed_event.set() appearance_changed_event.set()
update_display_thread.join() update_display_thread.join()
log.log_message("Program stopped.") log.log_message("Program stopped.")
for runner in runners:
runner.pause()
if runner.result is not None:
runner.result.reset()
# Cannot pickle generators, locks, sockets or events. # Cannot pickle generators, locks, sockets or events.
(summary.closest_placeholder_generator, summary._lock, (summary.closest_placeholder_generator, summary._lock,
summary._jobs_added_event, screen._appearance_changed_event, summary._jobs_added_event, screen._appearance_changed_event,
@ -1088,4 +1123,6 @@ if __name__ == "__main__":
with terminal.console_title("vigil: " + os.path.basename(root_path)): with terminal.console_title("vigil: " + os.path.basename(root_path)):
_manage_cache(root_path) _manage_cache(root_path)
with _chdir(root_path): # FIX: Don't change directory if possible. with _chdir(root_path): # FIX: Don't change directory if possible.
main(root_path, worker_count, is_sandboxed, editor_command) loop = asyncio.get_event_loop()
main(root_path, loop, worker_count, is_sandboxed, editor_command)
loop.close()

View file

@ -3,6 +3,7 @@
# Copyright (C) 2015-2016 Andrew Hamilton. All rights reserved. # Copyright (C) 2015-2016 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0. # Licensed under the Artistic License 2.0.
import asyncio
import contextlib import contextlib
import io import io
import os import os
@ -218,7 +219,7 @@ def _all_processes():
class MainTestCase(unittest.TestCase): class MainTestCase(unittest.TestCase):
def test_main_and_restart_and_no_leaks_and_is_relocatable(self): def test_main_and_restart_and_no_leaks_and_is_relocatable(self):
def test_run(root_path): def test_run(root_path, loop):
mount_total = _mount_total() mount_total = _mount_total()
tmp_total = _tmp_total() tmp_total = _tmp_total()
# processes = _all_processes() # processes = _all_processes()
@ -227,8 +228,8 @@ class MainTestCase(unittest.TestCase):
vigil._manage_cache(root_path) vigil._manage_cache(root_path)
with vigil._chdir(root_path): with vigil._chdir(root_path):
with contextlib.redirect_stdout(io.StringIO()): with contextlib.redirect_stdout(io.StringIO()):
vigil.main(root_path, worker_count=2, is_sandboxed=True, vigil.main(root_path, loop, worker_count=2,
is_being_tested=True) is_sandboxed=True, is_being_tested=True)
for file_name in ["summary.pickle", "creation_time", "log", for file_name in ["summary.pickle", "creation_time", "log",
"foo-metadata", "foo-contents"]: "foo-metadata", "foo-contents"]:
self.assertTrue(os.path.exists(".vigil/" + file_name)) self.assertTrue(os.path.exists(".vigil/" + file_name))
@ -237,12 +238,14 @@ class MainTestCase(unittest.TestCase):
# self.assertEqual(_all_processes(), processes) # Fix # self.assertEqual(_all_processes(), processes) # Fix
temp_dir = tempfile.mkdtemp() temp_dir = tempfile.mkdtemp()
try: try:
loop = asyncio.get_event_loop()
first_dir = os.path.join(temp_dir, "first") first_dir = os.path.join(temp_dir, "first")
os.mkdir(first_dir) os.mkdir(first_dir)
test_run(first_dir) test_run(first_dir, loop)
second_dir = os.path.join(temp_dir, "second") second_dir = os.path.join(temp_dir, "second")
os.rename(first_dir, second_dir) os.rename(first_dir, second_dir)
test_run(second_dir) test_run(second_dir, loop)
loop.close()
finally: finally:
shutil.rmtree(temp_dir) shutil.rmtree(temp_dir)

View file

@ -4,50 +4,10 @@
# Licensed under the Artistic License 2.0. # Licensed under the Artistic License 2.0.
import os import os
import signal
import subprocess
import psutil
import tools import tools
def _make_process_nicest(pid):
process = psutil.Process(pid)
process.nice(19)
process.ionice(psutil.IOPRIO_CLASS_IDLE)
class Worker:
def __init__(self, sandbox):
self.sandbox = sandbox
if sandbox is None:
self.process = subprocess.Popen(
[__file__], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else:
cache_path = os.path.join(os.getcwd(), tools.CACHE_PATH)
self.cache_mount = sandbox.mount_point + cache_path
subprocess.check_call(["sudo", "mount", "--bind", cache_path,
self.cache_mount])
self.process = sandbox.Popen([__file__])
self.child_pid = int(self.process.stdout.readline())
_make_process_nicest(self.child_pid)
def run_tool(self, path, tool):
self.process.stdin.write(("%s\n%s\n" %
(tool.__qualname__, path)).encode("utf-8"))
self.process.stdin.flush()
return tools.Status(int(self.process.stdout.readline()))
def pause(self):
os.kill(self.child_pid, signal.SIGSTOP)
def continue_(self):
os.kill(self.child_pid, signal.SIGCONT)
def main(): def main():
print(os.getpid(), flush=True) print(os.getpid(), flush=True)
while True: while True:

View file

@ -4,6 +4,7 @@
# Licensed under the Artistic License 2.0. # Licensed under the Artistic License 2.0.
import asyncio
import os import os
import shutil import shutil
import tempfile import tempfile
@ -11,7 +12,7 @@ import unittest
import sandbox_fs import sandbox_fs
import tools import tools
import worker import vigil
class WorkerTestCase(unittest.TestCase): class WorkerTestCase(unittest.TestCase):
@ -28,7 +29,11 @@ class WorkerTestCase(unittest.TestCase):
os.chdir(self.original_working_dir) os.chdir(self.original_working_dir)
def _test_worker(self, sandbox): def _test_worker(self, sandbox):
status = worker.Worker(sandbox).run_tool("foo", tools.metadata) loop = asyncio.get_event_loop()
worker_ = vigil.Runner(sandbox, False, False)
loop.run_until_complete(worker_.create_process())
future = worker_.run_tool("foo", tools.metadata)
status = loop.run_until_complete(future)
self.assertEqual(status, tools.Status.normal) self.assertEqual(status, tools.Status.normal)
result_path = os.path.join(tools.CACHE_PATH, "foo-metadata") result_path = os.path.join(tools.CACHE_PATH, "foo-metadata")
self.assertTrue(os.path.exists(result_path)) self.assertTrue(os.path.exists(result_path))