diff --git a/BUGS b/BUGS index 2c1783b..0866669 100644 --- a/BUGS +++ b/BUGS @@ -9,6 +9,9 @@ Current the timed out status when un-paused. - sandbox_fs_test.py is failing when run by the vigil that was started in my startup script, but passing otherwise. +- When shutting down "Exception ignored in:" noise appears. Four messages + for every worker running. This started with the move to asyncio for the + workers. Current (tool related) diff --git a/sandbox_fs.py b/sandbox_fs.py index 42a1a02..ddeaa01 100644 --- a/sandbox_fs.py +++ b/sandbox_fs.py @@ -79,14 +79,6 @@ class SandboxFs: mount.umount() self.overlay_mounts = [] - def Popen(self, command, env=None): - full_command = _in_chroot(self.mount_point, - _in_directory(os.getcwd(), command)) - return subprocess.Popen(full_command, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=env) - - def run_command(self, command, env=None): - process = self.Popen(command, env) - stdout, stderr = process.communicate() - return stdout, stderr, process.returncode + def command(self, command, env=None): + return _in_chroot(self.mount_point, + _in_directory(os.getcwd(), command)) diff --git a/sandbox_fs_test.py b/sandbox_fs_test.py index d56d71c..b354bcf 100755 --- a/sandbox_fs_test.py +++ b/sandbox_fs_test.py @@ -35,7 +35,7 @@ class SandboxFilesystemTestCase(unittest.TestCase): self.assertTrue(os.path.exists(home_directory)) def test_run_a_command_in_the_sandbox(self): - stdout, stderr, returncode = self.sandbox.run_command(["pwd"]) + stdout = subprocess.check_output(self.sandbox.command(["pwd"])) self.assertEqual(stdout.strip().decode("utf-8"), os.environ["PWD"]) diff --git a/tools.py b/tools.py index 6436ec2..4bc48ec 100644 --- a/tools.py +++ b/tools.py @@ -4,6 +4,7 @@ # Licensed under the Artistic License 2.0. import ast +import asyncio import contextlib import dis import enum @@ -654,7 +655,8 @@ class Result: self.status = status self.entry.appearance_cache = None - def run(self, log, appearance_changed_event, worker, runner): + @asyncio.coroutine + def run(self, log, appearance_changed_event, runner): self.is_placeholder = False tool_name = tool_name_colored(self.tool, self.path) path = path_colored(self.path) @@ -665,7 +667,7 @@ class Result: runner.pause() appearance_changed_event.set() start_time = time.time() - new_status = worker.run_tool(self.path, self.tool) + new_status = yield from runner.run_tool(self.path, self.tool) Result.result.fget.evict(self) end_time = time.time() self.set_status(new_status) diff --git a/vigil b/vigil index 6620e68..eb46083 100755 --- a/vigil +++ b/vigil @@ -70,6 +70,7 @@ import time import traceback import docopt +import psutil import pyinotify import urwid import urwid.raw_display @@ -710,7 +711,7 @@ class Screen: runner.continue_() def quit_(self): - raise KeyboardInterrupt + os.kill(os.getpid(), signal.SIGINT) def refresh(self): self._summary.refresh(self._log) @@ -834,18 +835,53 @@ def _regulate_temperature(log): log.log_message("The computer has cooled down. Continuing...") +def _make_process_nicest(pid): + process = psutil.Process(pid) + process.nice(19) + process.ionice(psutil.IOPRIO_CLASS_IDLE) + + class Runner: def __init__(self, sandbox, is_already_paused, is_being_tested): - self.result = None - self.worker = worker.Worker(sandbox) + self.sandbox = sandbox self.is_already_paused = is_already_paused self.is_being_tested = is_being_tested + self.result = None + self.process = None + self.child_pid = None + @asyncio.coroutine + def create_process(self): + if self.sandbox is None: + command = [worker.__file__] + else: + cache_path = os.path.join(os.getcwd(), tools.CACHE_PATH) + cache_mount = self.sandbox.mount_point + cache_path + subprocess.check_call(["sudo", "mount", "--bind", cache_path, + cache_mount]) + command = self.sandbox.command([worker.__file__]) + create = asyncio.create_subprocess_exec( + *command, stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + self.process = yield from create + pid_line = yield from self.process.stdout.readline() + self.child_pid = int(pid_line.strip()) + + @asyncio.coroutine + def run_tool(self, path, tool): + self.process.stdin.write(("%s\n%s\n" % + (tool.__qualname__, path)).encode("utf-8")) + data = yield from self.process.stdout.readline() + return tools.Status(int(data)) + + @asyncio.coroutine def job_runner(self, summary, log, jobs_added_event, appearance_changed_event): + yield from self.create_process() + _make_process_nicest(self.child_pid) while True: - jobs_added_event.wait() + yield from jobs_added_event.wait() while True: # _regulate_temperature(log) # My fan is broken try: @@ -857,23 +893,22 @@ class Runner: if self.is_being_tested: os.kill(os.getpid(), signal.SIGINT) break - with contextlib.suppress(ValueError): # Process was terminated - self.result.run(log, appearance_changed_event, self.worker, - self) - summary.completed_total += 1 + yield from self.result.run(log, appearance_changed_event, + self) + summary.completed_total += 1 jobs_added_event.clear() def pause(self): if self.result is not None and \ self.result.status == tools.Status.running: - self.worker.pause() + os.kill(self.child_pid, signal.SIGSTOP) self.result.set_status(tools.Status.paused) def continue_(self): if self.result is not None and \ self.result.status == tools.Status.paused: self.result.set_status(tools.Status.running) - self.worker.continue_() + os.kill(self.child_pid, signal.SIGCONT) def _add_watch_manager_to_mainloop(root_path, mainloop, on_filesystem_change, @@ -920,13 +955,12 @@ def _update_screen(main_widget, appearance_changed_event): fill3.patch_screen(main_widget) -def main(root_path, worker_count=None, is_sandboxed=True, editor_command=None, - is_being_tested=False): +def main(root_path, loop, worker_count=None, is_sandboxed=True, + editor_command=None, is_being_tested=False): if worker_count is None: worker_count = multiprocessing.cpu_count()*2 global _UPDATE_THREAD_STOPPED - loop = asyncio.get_event_loop() - jobs_added_event = threading.Event() + jobs_added_event = asyncio.Event() appearance_changed_event = threading.Event() is_first_run = True try: @@ -965,54 +999,55 @@ def main(root_path, worker_count=None, is_sandboxed=True, editor_command=None, sandbox = sandbox_fs.SandboxFs(sandbox_temp_dir) else: sandbox = None - - def start_runners(): + try: if is_sandboxed: log.log_message("Making filesystem sandbox...") sandbox.mount() log.log_message("Sandbox made.") else: log.log_message("Running without the filesystem sandbox...") - log.log_message("Starting workers...") + log.log_message("Starting workers (%s) ..." % worker_count) for index in range(worker_count): - runners.append(Runner(sandbox, screen._is_paused, is_being_tested)) - log.log_message("Workers started. (%s)" % worker_count) - for runner in runners: - args = (summary, log, jobs_added_event, appearance_changed_event) - threading.Thread(target=runner.job_runner, args=args, - daemon=True).start() - try: - threading.Thread(target=start_runners, daemon=True).start() + runner = Runner(sandbox, screen._is_paused, is_being_tested) + runners.append(runner) + future = runner.job_runner( + summary, log, jobs_added_event, appearance_changed_event) + runner.future = asyncio.ensure_future(future, loop=loop) - def on_input(urwid_screen): - for event in urwid_screen.get_input(): - screen.on_input_event(event) - - def on_window_resize(n, frame): + def on_window_resize(): appearance_changed_event.set() appearance_changed_event.set() update_display_thread = threading.Thread( target=_update_screen, args=(screen, appearance_changed_event), daemon=True) + + def exit_loop(): + log.log_command("Exiting...") + time.sleep(0.05) + for runner in runners: + runner.pause() + runner.future.cancel() + if runner.result is not None: + runner.result.reset() + loop.stop() + loop.add_signal_handler(signal.SIGWINCH, on_window_resize) + loop.add_signal_handler(signal.SIGINT, exit_loop) + loop.add_signal_handler(signal.SIGTERM, exit_loop) with terminal.hidden_cursor(): with _urwid_screen() as urwid_screen: + + def on_input(urwid_screen): + for event in urwid_screen.get_input(): + screen.on_input_event(event) loop.add_reader(sys.stdin, on_input, urwid_screen) update_display_thread.start() try: - signal.signal(signal.SIGWINCH, on_window_resize) - with contextlib.suppress(KeyboardInterrupt): - loop.run_forever() - log.log_command("Exiting...") - time.sleep(0.05) + loop.run_forever() finally: _UPDATE_THREAD_STOPPED = True appearance_changed_event.set() update_display_thread.join() log.log_message("Program stopped.") - for runner in runners: - runner.pause() - if runner.result is not None: - runner.result.reset() # Cannot pickle generators, locks, sockets or events. (summary.closest_placeholder_generator, summary._lock, summary._jobs_added_event, screen._appearance_changed_event, @@ -1088,4 +1123,6 @@ if __name__ == "__main__": with terminal.console_title("vigil: " + os.path.basename(root_path)): _manage_cache(root_path) with _chdir(root_path): # FIX: Don't change directory if possible. - main(root_path, worker_count, is_sandboxed, editor_command) + loop = asyncio.get_event_loop() + main(root_path, loop, worker_count, is_sandboxed, editor_command) + loop.close() diff --git a/vigil_test.py b/vigil_test.py index 83fe217..cea8c69 100755 --- a/vigil_test.py +++ b/vigil_test.py @@ -3,6 +3,7 @@ # Copyright (C) 2015-2016 Andrew Hamilton. All rights reserved. # Licensed under the Artistic License 2.0. +import asyncio import contextlib import io import os @@ -218,7 +219,7 @@ def _all_processes(): class MainTestCase(unittest.TestCase): def test_main_and_restart_and_no_leaks_and_is_relocatable(self): - def test_run(root_path): + def test_run(root_path, loop): mount_total = _mount_total() tmp_total = _tmp_total() # processes = _all_processes() @@ -227,8 +228,8 @@ class MainTestCase(unittest.TestCase): vigil._manage_cache(root_path) with vigil._chdir(root_path): with contextlib.redirect_stdout(io.StringIO()): - vigil.main(root_path, worker_count=2, is_sandboxed=True, - is_being_tested=True) + vigil.main(root_path, loop, worker_count=2, + is_sandboxed=True, is_being_tested=True) for file_name in ["summary.pickle", "creation_time", "log", "foo-metadata", "foo-contents"]: self.assertTrue(os.path.exists(".vigil/" + file_name)) @@ -237,12 +238,14 @@ class MainTestCase(unittest.TestCase): # self.assertEqual(_all_processes(), processes) # Fix temp_dir = tempfile.mkdtemp() try: + loop = asyncio.get_event_loop() first_dir = os.path.join(temp_dir, "first") os.mkdir(first_dir) - test_run(first_dir) + test_run(first_dir, loop) second_dir = os.path.join(temp_dir, "second") os.rename(first_dir, second_dir) - test_run(second_dir) + test_run(second_dir, loop) + loop.close() finally: shutil.rmtree(temp_dir) diff --git a/worker.py b/worker.py index a738c3e..cd59d57 100755 --- a/worker.py +++ b/worker.py @@ -4,50 +4,10 @@ # Licensed under the Artistic License 2.0. import os -import signal -import subprocess - -import psutil import tools -def _make_process_nicest(pid): - process = psutil.Process(pid) - process.nice(19) - process.ionice(psutil.IOPRIO_CLASS_IDLE) - - -class Worker: - - def __init__(self, sandbox): - self.sandbox = sandbox - if sandbox is None: - self.process = subprocess.Popen( - [__file__], stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - else: - cache_path = os.path.join(os.getcwd(), tools.CACHE_PATH) - self.cache_mount = sandbox.mount_point + cache_path - subprocess.check_call(["sudo", "mount", "--bind", cache_path, - self.cache_mount]) - self.process = sandbox.Popen([__file__]) - self.child_pid = int(self.process.stdout.readline()) - _make_process_nicest(self.child_pid) - - def run_tool(self, path, tool): - self.process.stdin.write(("%s\n%s\n" % - (tool.__qualname__, path)).encode("utf-8")) - self.process.stdin.flush() - return tools.Status(int(self.process.stdout.readline())) - - def pause(self): - os.kill(self.child_pid, signal.SIGSTOP) - - def continue_(self): - os.kill(self.child_pid, signal.SIGCONT) - - def main(): print(os.getpid(), flush=True) while True: diff --git a/worker_test.py b/worker_test.py index e3a4240..5970309 100755 --- a/worker_test.py +++ b/worker_test.py @@ -4,6 +4,7 @@ # Licensed under the Artistic License 2.0. +import asyncio import os import shutil import tempfile @@ -11,7 +12,7 @@ import unittest import sandbox_fs import tools -import worker +import vigil class WorkerTestCase(unittest.TestCase): @@ -28,7 +29,11 @@ class WorkerTestCase(unittest.TestCase): os.chdir(self.original_working_dir) def _test_worker(self, sandbox): - status = worker.Worker(sandbox).run_tool("foo", tools.metadata) + loop = asyncio.get_event_loop() + worker_ = vigil.Runner(sandbox, False, False) + loop.run_until_complete(worker_.create_process()) + future = worker_.run_tool("foo", tools.metadata) + status = loop.run_until_complete(future) self.assertEqual(status, tools.Status.normal) result_path = os.path.join(tools.CACHE_PATH, "foo-metadata") self.assertTrue(os.path.exists(result_path))