eris/vigil

1037 lines
37 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2015-2016 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
"""\
Produces a set of reports for every file in a directory tree.
The reports are produced by many existing command-line tools.
The state of each report is also summarised by a status indicator.
The possible states are listed below.
A report is viewed by selecting its status indicator with the cursor.
Reports are recalculated whenever files are changed, added, or deleted, and so
are kept up to date. (optional)
The reports are cached in a directory named ".vigil" under the target
directory.
Usage: vigil <root_path>
e.g. # vigil my_project
Keys:
*h - Show the help screen. (toggle)
*d, *c, *j, *k - Move the cursor up, down, left and right.
*D, *C, *J, *K - Scroll the result pane up, down, left and right.
*t - Turn the result pane to portrait or landscape orientation. (toggle)
*l - Show the activity log. (toggle)
*n - Move to the next issue.
*N - Move to the next issue of the current tool.
*o - Order files by type, or by directory location. (toggle)
*p - Pause work. (toggle)
*w - Watch the filesystem for changes. (toggle)
*s - Change the appearance of result statuses. (toggle)
*q - Quit.
"""
import asyncio
import collections
import functools
import gc
import gzip
import importlib
import multiprocessing
import os
import pickle
import shutil
import signal
import subprocess
import sys
import threading
import time
import traceback
import psutil
import pyinotify
import fill3
import terminal
import termstr
import tools
_LOG_PATH = os.path.join(os.getcwd(), "vigil.log")
def _log_error(message=None):
message = traceback.format_exc() if message is None else message + "\n"
with open(_LOG_PATH, "a") as log_file:
log_file.write(message)
_CACHE_PATH = ".vigil"
def lru_cache_with_eviction(maxsize=128, typed=False):
versions = {}
make_key = functools._make_key
def evict(*args, **kwds):
key = make_key(args, kwds, typed)
if key in versions:
versions[key] += 1
def decorating_function(user_function):
def remove_version(*args, **kwds):
return user_function(*args[1:], **kwds)
new_func = functools.lru_cache(maxsize=maxsize, typed=typed)(
remove_version)
def add_version(*args, **kwds):
key = make_key(args, kwds, typed)
return new_func(*((versions.setdefault(key, 0),) + args), **kwds)
add_version.versions = versions
add_version.cache_info = new_func.cache_info
add_version.evict = evict
return functools.update_wrapper(add_version, user_function)
return decorating_function
def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL,
open=open):
tmp_path = path + ".tmp"
try:
with open(tmp_path, "wb") as file_:
pickle.dump(object_, file_, protocol=protocol)
except (OSError, KeyboardInterrupt):
os.remove(tmp_path)
else:
os.rename(tmp_path, path)
def status_to_str(status, is_status_simple):
if isinstance(status, int): # is a status enumeration
dict_ = (tools._STATUS_TO_TERMSTR_SIMPLE if is_status_simple
else tools._STATUS_TO_TERMSTR)
return dict_[status]
else:
return status
class Result:
def __init__(self, path, tool, is_stored_compressed=True):
self.path = path
self.tool = tool
self._open_func = gzip.open if is_stored_compressed else open
self.pickle_path = os.path.join(_CACHE_PATH,
path + "-" + tool.__name__)
self.scroll_position = (0, 0)
self.is_completed = False
self.reset()
def __del__(self):
try:
os.remove(self.pickle_path)
except FileNotFoundError:
pass
@property
@lru_cache_with_eviction(maxsize=50)
def result(self):
unknown_label = fill3.Text("?")
if self.is_placeholder:
return unknown_label
try:
with self._open_func(self.pickle_path, "rb") as pickle_file:
return pickle.load(pickle_file)
except FileNotFoundError:
return unknown_label
@result.setter
def result(self, value):
os.makedirs(os.path.dirname(self.pickle_path), exist_ok=True)
dump_pickle_safe(value, self.pickle_path, open=self._open_func)
Result.result.fget.evict(self)
def set_status(self, status, appearance_changed_event):
self.status = status
appearance_changed_event.set()
self.entry.appearance_cache = None
def run(self, log, appearance_changed_event, worker):
self.is_placeholder = False
tool_name = tools._tool_name_colored(self.tool, self.path)
path_colored = tools._path_colored(self.path)
log.log_message(["Running ", tool_name, " on ", path_colored, "."])
self.set_status(tools.Status.running, appearance_changed_event)
start_time = time.time()
new_status = worker.run_tool(self.path, self.tool)
Result.result.fget.evict(self)
end_time = time.time()
self.set_status(new_status, appearance_changed_event)
self.is_completed = True
log.log_message(
["Finished running ", tool_name, " on ", path_colored, ". ",
status_to_str(new_status, self.entry.summary.is_status_simple),
" %s secs" % round(end_time - start_time, 2)])
def reset(self):
self.is_placeholder = True
self.status = tools.Status.empty
def appearance_min(self):
return [status_to_str(self.status,
self.entry.summary.is_status_simple)]
def reverse_style(style):
return termstr.CharStyle(style.bg_color, style.fg_color, style.is_bold,
style.is_underlined)
class Entry(collections.UserList):
def __init__(self, path, results, summary, highlighted=None,
set_results=True):
collections.UserList.__init__(self, results)
self.path = path
self.summary = summary
self.highlighted = highlighted
self.widgets = self.data
if set_results:
# FIX: this is missed for entries appended later
for result in results:
result.entry = self
self.widget = fill3.Row(results)
self.appearance_cache = None
def appearance_min(self):
# 'appearance' local variable exists because appearance_cache can
# become None at any time.
appearance = self.appearance_cache
if appearance is None:
if self.highlighted is not None:
if self.summary.is_status_simple:
cursor = fill3.Text("")
else:
cursor = fill3.Style(self.widget[self.highlighted],
reverse_style)
self.widget[self.highlighted] = cursor
new_appearance = self.widget.appearance_min()
path = tools._path_colored(self.path)
padding = " " * (self.summary._max_path_length - len(path) + 1)
new_appearance[0] = path + padding + new_appearance[0]
self.appearance_cache = appearance = new_appearance
return appearance
def is_filename_excluded(filename):
return filename.startswith(".")
def codebase_files(path, skip_hidden_directories=True):
for (dirpath, dirnames, filenames) in os.walk(path):
if skip_hidden_directories:
filtered_dirnames = [dirname for dirname in dirnames
if not is_filename_excluded(dirname)]
dirnames[:] = filtered_dirnames
for filename in filenames:
if not is_filename_excluded(filename):
yield os.path.join(dirpath, filename)
def fix_paths(root_path, paths):
return [os.path.join(".", os.path.relpath(path, root_path))
for path in paths]
def change_background(str_, new_background):
def change_background_style(style):
new_bg = (new_background if style.bg_color == termstr.Color.black
else style.bg_color)
return termstr.CharStyle(style.fg_color, new_bg, style.is_bold,
style.is_underlined)
return termstr.TermStr(str_).transform_style(change_background_style)
class Summary:
def __init__(self, root_path, jobs_added_event):
self._root_path = root_path
self._jobs_added_event = jobs_added_event
self._view_widget = fill3.View.from_widget(self)
self.__cursor_position = (0, 0)
self.closest_placeholder_generator = None
self._lock = threading.Lock()
self._cache = {}
self.is_status_simple = False
self.is_directory_sort = True
self._max_width = None
self._max_path_length = None
self.sync_with_filesystem()
@property
def _cursor_position(self):
return self.__cursor_position
@_cursor_position.setter
def _cursor_position(self, new_position):
if new_position != self.__cursor_position:
self.__cursor_position = new_position
self.closest_placeholder_generator = None
def sync_with_filesystem(self, sync_tools=True, sync_paths=True):
if sync_tools:
importlib.reload(tools)
x, y = self._cursor_position
try:
old_path = self.get_selection().path
except AttributeError:
old_path = None
new_column = fill3.Column([])
new_cache = {}
if sync_paths:
paths = fix_paths(self._root_path,
codebase_files(self._root_path))
self._paths = paths
self.sort(self.is_directory_sort)
else:
paths = self._paths
jobs_added = False
new_cursor_position = (0, 0)
row_index = 0
result_total, completed_total = 0, 0
for path in paths:
full_path = os.path.join(self._root_path, path)
try:
key = (path, os.stat(full_path).st_ctime)
except FileNotFoundError:
continue
if path == old_path:
new_cursor_position = (x, row_index)
row = []
for tool in tools.tools_for_path(path):
cache_key = (key, tool.__name__, tool.__code__.co_code)
if cache_key in self._cache:
result = self._cache[cache_key]
result.tool = tool
else:
result = Result(path, tool)
jobs_added = True
if result.is_completed:
completed_total += 1
new_cache[cache_key] = result
row.append(result)
new_column.append(Entry(path, row, self))
row_index += 1
result_total += len(row)
max_width = max(len(row) for row in new_column)
max_path_length = max(len(path) for path in paths) - len("./")
self._column, self._cache, self._cursor_position, self.result_total, \
self.completed_total, self._max_width, self._max_path_length, \
self.closest_placeholder_generator = (
new_column, new_cache, new_cursor_position, result_total,
completed_total, max_width, max_path_length, None)
if jobs_added:
self._jobs_added_event.set()
# Delete the stale results from the disk now, to avoid accidently
# deleting a future result with the same filename. See Result.__del__.
gc.collect()
def placeholder_spiral(self):
x, y = self.cursor_position()
result = self._column[y][x]
if result.is_placeholder:
yield result
for lap in range(max(len(self._column), self._max_width)):
y -= 1
for dx, dy in [(1, 1), (-1, 1), (-1, -1), (1, -1)]:
for move in range(lap + 1):
x += dx
y += dy
try:
result = self._column[y][x]
except IndexError:
continue
if result.is_placeholder:
yield result
def get_closest_placeholder(self):
with self._lock:
try:
return self.closest_placeholder_generator.send(None)
except AttributeError:
self.closest_placeholder_generator = self.placeholder_spiral()
return self.closest_placeholder_generator.send(None)
def appearance_dimensions(self):
status_width = 1 if self.is_status_simple else 2
width = self._max_path_length + 1 + status_width * self._max_width
return width, len(self._column)
def appearance_interval(self, interval):
start_y, end_y = interval
x, y = self.cursor_position()
rows = fill3.Column(self._column.widgets)
rows[y] = Entry(rows[y].path, rows[y].widgets, self, highlighted=x,
set_results=False)
return rows.appearance_interval(interval)
def appearance(self, dimensions):
width, height = dimensions
x, y = self.cursor_position()
status_width = 1 if self.is_status_simple else 2
screen_x, screen_y = self._max_path_length + 1 + x * status_width, y
width, height = width - 1, height - 1 # Minus one for the scrollbars
scroll_y = (screen_y // height) * height
self._view_widget.position = ((screen_x // width) * width, scroll_y)
appearance = self._view_widget.appearance(dimensions)
appearance[screen_y - scroll_y] = change_background(
appearance[screen_y - scroll_y], termstr.Color.grey_50)
return appearance
def cursor_position(self):
x, y = self._cursor_position
return min(x, len(self._column[y])-1), y
def get_selection(self):
x, y = self.cursor_position()
return self._column[y][x]
def _move_cursor(self, dx, dy):
if dy == 0:
x, y = self.cursor_position()
self._cursor_position = ((x + dx) % len(self._column[y]), y)
elif dx == 0:
x, y = self._cursor_position
self._cursor_position = (x, (y + dy) % len(self._column))
else:
raise ValueError
def cursor_right(self):
self._move_cursor(1, 0)
def cursor_left(self):
self._move_cursor(-1, 0)
def cursor_up(self):
self._move_cursor(0, -1)
def cursor_down(self):
self._move_cursor(0, 1)
def _issue_generator(self):
x, y = self.cursor_position()
for index in range(len(self._column) + 1):
row_index = (index + y) % len(self._column)
row = self._column[row_index]
for index_x, result in enumerate(row):
if (result.status == tools.Status.failure and
not (row_index == y and index_x <= x and
index != len(self._column))):
yield result, (index_x, row_index)
def move_to_next_issue(self):
try:
issue, self._cursor_position = self._issue_generator().send(None)
except StopIteration:
pass
def move_to_next_issue_of_tool(self):
current_tool = self.get_selection().tool
for issue, position in self._issue_generator():
if issue.tool == current_tool:
self._cursor_position = position
return
def toggle_status_style(self):
self.is_status_simple = not self.is_status_simple
self.sync_with_filesystem(sync_tools=False, sync_paths=False)
def sort(self, is_directory_sort):
def directory_sort(path):
return (os.path.dirname(path), tools.splitext(path)[1],
os.path.basename(path))
def type_sort(path):
return (tools.splitext(path)[1], os.path.dirname(path),
os.path.basename(path))
key_func = directory_sort if is_directory_sort else type_sort
self._paths.sort(key=key_func)
self.is_directory_sort = is_directory_sort
self.sync_with_filesystem(sync_tools=False, sync_paths=False)
class Log:
GREY_BOLD_STYLE = termstr.CharStyle(termstr.Color.grey_100, is_bold=True)
GREEN_STYLE = termstr.CharStyle(termstr.Color.green)
def __init__(self, appearance_changed_event):
self._appearance_changed_event = appearance_changed_event
self.widget = fill3.Column([])
self.portal = fill3.Portal(self.widget)
self._appearance_cache = None
def log_message(self, message, timestamp=None, char_style=None):
if isinstance(message, list):
message = [part[1] if isinstance(part, tuple) else part
for part in message]
message = fill3.join("", message)
if char_style is not None:
message = termstr.TermStr(message, char_style)
timestamp = (time.strftime("%H:%M:%S", time.localtime())
if timestamp is None else timestamp)
label = fill3.Text(termstr.TermStr(timestamp, Log.GREY_BOLD_STYLE) +
" " + message)
self.widget.append(label)
self.widget.widgets = self.widget[-200:]
self._appearance_cache = None
self._appearance_changed_event.set()
def log_command(self, message, timestamp=None):
self.log_message(message, char_style=Log.GREEN_STYLE)
def appearance_min(self):
appearance = self._appearance_cache
if appearance is None:
self._appearance_cache = appearance = self.widget.appearance_min()
return appearance
def appearance(self, dimensions):
width, height = dimensions
full_appearance = self.appearance_min()
self.portal.position = (0, max(0, len(full_appearance) - height))
return self.portal.appearance(dimensions)
def _highlight_chars(str_, style, marker="*"):
parts = str_.split(marker)
highlighted_parts = [termstr.TermStr(part[0], style) + part[1:]
for part in parts[1:] if part != ""]
return fill3.join("", [parts[0]] + highlighted_parts)
class Help:
def __init__(self, summary, screen):
self.summary = summary
self.screen = screen
self.body = fill3.Placeholder()
self.view = fill3.View.from_widget(self.body)
self.widget = fill3.Border(self.view, title="Help")
self.usage = _highlight_chars(__doc__, Log.GREEN_STYLE)
portal = self.view.portal
self.key_map = {"h": self.exit_help, "d": portal.scroll_up,
"c": portal.scroll_down, "j": portal.scroll_left,
"k": portal.scroll_right, "q": self.exit_help}
def exit_help(self):
self.screen._is_help_visible = False
def on_keypressed(self):
try:
action = self.key_map[sys.stdin.read(1)]
except KeyError:
pass
else:
action()
def appearance(self, dimensions):
text = fill3.join(
"\n", [self.usage, "Statuses:"] +
[" " + status_to_str(status, self.summary.is_status_simple) +
" " + meaning for status, meaning in tools.STATUS_MEANINGS])
self.body.widget = fill3.Text(text)
return self.widget.appearance(dimensions)
class Listing:
def __init__(self, view):
self.view = view
self.last_dimensions = None
def appearance(self, dimensions):
self.last_dimensions = dimensions
return self.view.appearance(dimensions)
def add_watch_manager_to_mainloop(watch_manager, mainloop):
notifier = pyinotify.Notifier(watch_manager)
def on_inotify():
notifier.read_events()
notifier.process_events()
mainloop.add_reader(watch_manager.get_fd(), on_inotify)
def is_path_excluded(path):
return any(part.startswith(".") for part in path.split(os.path.sep))
class Screen:
def __init__(self, summary, log, appearance_changed_event, main_loop):
self._summary = summary
self._log = log
self._appearance_changed_event = appearance_changed_event
self._main_loop = main_loop
self._is_listing_portrait = True
self._is_log_visible = True
self._is_help_visible = False
self._is_watching_filesystem = False
self._is_paused = False
self.toggle_watch_filesystem()
self._make_widgets()
self._make_keymap()
def make_watch_manager(self):
def on_filesystem_change(event):
self._log.log_message("Filesystem changed.")
self._summary.sync_with_filesystem(sync_tools=False)
self._appearance_changed_event.set()
def on_tools_change(event):
self._log.log_message("Tools changed.")
self._summary.sync_with_filesystem(sync_paths=False)
self._appearance_changed_event.set()
watch_manager = pyinotify.WatchManager()
event_mask = (pyinotify.IN_CREATE | pyinotify.IN_DELETE |
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_ATTRIB)
watch_manager.add_watch(
self._summary._root_path, event_mask, rec=True, auto_add=True,
proc_fun=on_filesystem_change, exclude_filter=lambda path:
is_path_excluded(path))
watch_manager.add_watch(tools.__file__, event_mask,
proc_fun=on_tools_change)
self._watch_manager = watch_manager
add_watch_manager_to_mainloop(self._watch_manager, self._main_loop)
def _partition(self, widgets, height):
smaller_height = max(height // 4, 10)
return [height - smaller_height, smaller_height]
def _partition_2(self, widgets, height):
smaller_height = max(height // 4, 10)
return [smaller_height, height - smaller_height]
def _make_widgets(self):
self._help_widget = Help(self._summary, self)
root_path = os.path.basename(self._summary._root_path)
summary = fill3.Border(self._summary, title="Summary of " + root_path)
selected_widget = self._summary.get_selection()
self._view = fill3.View.from_widget(selected_widget.result)
self._listing = fill3.Border(Listing(self._view))
log = fill3.Border(self._log, title="Log")
port_log = fill3.Row([fill3.Column([summary, log], self._partition),
self._listing])
land_log = fill3.Column([fill3.Row([summary, log]), self._listing],
self._partition_2)
port_no_log = fill3.Row([summary, self._listing])
land_no_log = fill3.Column([summary, self._listing], self._partition_2)
self._layouts = [[land_no_log, port_no_log], [land_log, port_log]]
def _make_keymap(self):
key_map = {}
for keys, action in self._KEY_DATA:
for key in keys:
key_map[key] = action
self._key_map = key_map
def toggle_help(self):
self._is_help_visible = not self._is_help_visible
def toggle_log(self):
self._is_log_visible = not self._is_log_visible
def toggle_window_orientation(self):
self._is_listing_portrait = not self._is_listing_portrait
def cursor_up(self):
self._summary.cursor_up()
def cursor_down(self):
self._summary.cursor_down()
def cursor_right(self):
self._summary.cursor_right()
def cursor_left(self):
self._summary.cursor_left()
def _move_listing(self, dx, dy):
listing_width, listing_height = self._listing.widget.last_dimensions
selected_widget = self._summary.get_selection()
x, y = selected_widget.scroll_position
selected_widget.scroll_position = \
(max(x + dx * (listing_width // 2), 0),
max(y + dy * (listing_height // 2), 0))
def listing_up(self):
self._move_listing(0, -1)
def listing_down(self):
self._move_listing(0, 1)
def listing_right(self):
self._move_listing(1, 0)
def listing_left(self):
self._move_listing(-1, 0)
def move_to_next_issue(self):
self._summary.move_to_next_issue()
def move_to_next_issue_of_tool(self):
self._summary.move_to_next_issue_of_tool()
def edit_file(self):
path = self._summary.get_selection().path
path_colored = tools._path_colored(path)
self._log.log_message("Editing " + path_colored + " in emacs.")
subprocess.Popen(["emacsclient", path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def toggle_status_style(self):
self._summary.toggle_status_style()
def toggle_sort(self):
new_sort = not self._summary.is_directory_sort
sort_order = ("directory then type" if new_sort
else "type then directory")
self._log.log_command("Ordering files by %s." % sort_order)
self._summary.sort(new_sort)
def toggle_watch_filesystem(self):
self._is_watching_filesystem = not self._is_watching_filesystem
self._log.log_command("Watching the filesystem for changes."
if self._is_watching_filesystem else
"Stopped watching the filesystem.")
if self._is_watching_filesystem:
self._summary.sync_with_filesystem()
self.make_watch_manager()
else:
self._main_loop.remove_reader(self._watch_manager.get_fd())
self._watch_manager = None
def toggle_pause(self):
self._is_paused = not self._is_paused
self._log.log_command("Paused work." if self._is_paused else
"Continuing work.")
if self._is_paused:
for runner in self.runners:
runner.pause()
else:
for runner in self.runners:
runner.continue_()
def quit_(self):
raise KeyboardInterrupt
def on_mouse_event(self, event):
if event[0] not in ["mouse press", "mouse drag"]:
return
if event[1] == 4: # Mouse wheel up
self.listing_up()
self._appearance_changed_event.set()
return
if event[1] == 5: # Mouse wheel down
self.listing_down()
self._appearance_changed_event.set()
return
x, y = event[2:4]
border_width = 1
view_width, view_height = \
self._summary._view_widget.portal.last_dimensions
if x < border_width or y < border_width or x > view_width or \
y > view_height:
return
status_width = 1 if self._summary.is_status_simple else 2
view_x, view_y = self._summary._view_widget.portal.position
spacer = 1
column_index = (x - self._summary._max_path_length - spacer -
border_width + view_x) // status_width
row_index = y - border_width + view_y
if row_index >= len(self._summary._column):
return
row = self._summary._column[row_index]
if column_index < 0 or column_index >= len(row):
return
new_position = column_index, row_index
if new_position != self._summary._cursor_position:
self._summary._cursor_position = new_position
self._appearance_changed_event.set()
def on_keypressed(self, urwid_screen):
if self._is_help_visible:
self._help_widget.on_keypressed()
self._appearance_changed_event.set()
return
events = urwid_screen.get_input()
for event in events:
if type(event) == tuple:
self.on_mouse_event(event)
continue
try:
action = self._key_map[event]
except KeyError:
pass
else:
action(self)
self._appearance_changed_event.set()
_STATUS_BAR = _highlight_chars(
" *help *quit *d,*c,*j,*k:navigate *turn *log *edit *next *pause"
" *watch *order *statuses", Log.GREEN_STYLE)
@functools.lru_cache(maxsize=2)
def _get_status_bar_appearance(self, width, is_directory_sort,
is_watching_filesystem, is_paused,
progress_bar_size):
ordering_text = "directory" if is_directory_sort else "type "
watching_text = "watching" if is_watching_filesystem else "--------"
paused_text = "paused" if is_paused else "------"
indicators = " %s %s order:%s " % (paused_text, watching_text,
ordering_text)
spacing = " " * (width - len(self._STATUS_BAR) - len(indicators))
bar = (self._STATUS_BAR[:width - len(indicators)] + spacing +
indicators)[:width]
return [bar[:progress_bar_size].underline() + bar[progress_bar_size:]]
def appearance(self, dimensions):
width, height = dimensions
if self._is_help_visible:
return self._help_widget.appearance(dimensions)
widget = self._summary.get_selection()
view = self._listing.widget.view
view.position = widget.scroll_position
view.widget = widget.result
tool_name = tools._tool_name_colored(widget.tool, widget.path)
self._listing.title = (
tools._path_colored(widget.path) + " ─── " + tool_name + " " +
status_to_str(widget.status, self._summary.is_status_simple))
incomplete = self._summary.result_total - self._summary.completed_total
progress_bar_size = max(0, width * incomplete //
self._summary.result_total)
status_bar_appearance = self._get_status_bar_appearance(
width, self._summary.is_directory_sort,
self._is_watching_filesystem, self._is_paused, progress_bar_size)
return (self._layouts[self._is_log_visible][self._is_listing_portrait]
.appearance((width, height-len(status_bar_appearance))) +
status_bar_appearance)
_KEY_DATA = [
({"t"}, toggle_window_orientation), ({"l"}, toggle_log),
({"h"}, toggle_help), ({"d", "up"}, cursor_up),
({"c", "down"}, cursor_down), ({"j", "left"}, cursor_left),
({"k", "right"}, cursor_right), ({"D", "page up"}, listing_up),
({"C", "page down"}, listing_down), ({"J", "home"}, listing_left),
({"K", "end"}, listing_right), ({"o"}, toggle_sort),
({"n"}, move_to_next_issue), ({"N"}, move_to_next_issue_of_tool),
({"e"}, edit_file), ({"s"}, toggle_status_style),
({"w"}, toggle_watch_filesystem), ({"q"}, quit_),
({"p"}, toggle_pause)]
def get_cpu_temperature():
with open("/sys/class/thermal/thermal_zone0/temp", "r") as temp_file:
return int(temp_file.read()[:-4])
def regulate_temperature(log):
if get_cpu_temperature() >= 72:
log.log_message("The computer is too hot. Waiting to cool down...")
while get_cpu_temperature() > 66:
time.sleep(1)
log.log_message("The computer has cooled down. Continuing...")
def make_process_nicest(pid):
process = psutil.Process(pid)
process.nice(19)
process.ionice(psutil.IOPRIO_CLASS_IDLE)
class _Result(Result):
def __del__(self):
pass
def work_loop(parent_connection):
while True:
tool, path = parent_connection.recv()
result = _Result(path, tool)
status, result.result = tools.run_tool_no_error(path, tool)
parent_connection.send(status)
class Worker:
def __init__(self):
self.child_connection, parent_connection = multiprocessing.Pipe()
self.process = multiprocessing.Process(
target=work_loop, args=(parent_connection,), daemon=True)
make_process_nicest(self.process.pid)
self.process.start()
def run_tool(self, path, tool):
self.child_connection.send([tool, path])
return self.child_connection.recv()
def pause(self):
os.kill(self.process.pid, signal.SIGSTOP)
def continue_(self):
os.kill(self.process.pid, signal.SIGCONT)
def stop(self):
os.kill(self.process.pid, signal.SIGKILL)
class Runner:
def __init__(self):
self.result = None
self.is_running = True
self.worker = Worker()
def job_runner(self, summary, log, jobs_added_event,
appearance_changed_event):
while True:
jobs_added_event.wait()
while self.is_running:
# regulate_temperature(log) # My fan is broken
try:
self.result = summary.get_closest_placeholder()
except StopIteration:
log.log_message("All results are up to date.")
break
try:
self.result.run(log, appearance_changed_event, self.worker)
summary.completed_total += 1
except EOFError: # Occurs if the process is terminated
pass
jobs_added_event.clear()
def pause(self):
self.worker.pause()
def continue_(self):
self.worker.continue_()
_UPDATE_THREAD_STOPPED = False
def update_screen(main_widget, appearance_changed_event):
while True:
appearance_changed_event.wait()
appearance_changed_event.clear()
if _UPDATE_THREAD_STOPPED:
break
fill3.patch_screen(main_widget)
def main(root_path):
global _UPDATE_THREAD_STOPPED
os.chdir(root_path) # FIX: Don't change directory if possible.
loop = asyncio.get_event_loop()
jobs_added_event = threading.Event()
appearance_changed_event = threading.Event()
try:
pickle_path = os.path.join(_CACHE_PATH, ".summary.pickle")
with gzip.open(pickle_path, "rb") as file_:
screen = pickle.load(file_)
except FileNotFoundError:
summary = Summary(root_path, jobs_added_event)
log = Log(appearance_changed_event)
screen = Screen(summary, log, appearance_changed_event, loop)
else:
screen._appearance_changed_event = appearance_changed_event
screen._main_loop = loop
if screen._is_watching_filesystem:
screen.make_watch_manager()
summary = screen._summary
summary._lock = threading.Lock()
summary._jobs_added_event = jobs_added_event
log = screen._log
log._appearance_changed_event = appearance_changed_event
if screen._is_watching_filesystem:
summary.sync_with_filesystem()
log.log_message("Program started.")
jobs_added_event.set()
runners = [Runner() for index in range(multiprocessing.cpu_count() * 2)]
screen.runners = runners
for runner in runners:
args = (summary, log, jobs_added_event, appearance_changed_event)
threading.Thread(target=runner.job_runner, args=args,
daemon=True).start()
if screen._is_paused:
for runner in runners:
runner.pause()
def on_window_resize(n, frame):
appearance_changed_event.set()
signal.signal(signal.SIGWINCH, on_window_resize)
appearance_changed_event.set()
update_display_thread = threading.Thread(
target=update_screen, args=(screen, appearance_changed_event),
daemon=True)
with terminal.hidden_cursor():
with terminal.urwid_screen() as urwid_screen:
update_display_thread.start()
loop.add_reader(sys.stdin, screen.on_keypressed, urwid_screen)
try:
loop.run_forever()
except KeyboardInterrupt:
log.log_message("Program stopped.")
_UPDATE_THREAD_STOPPED = True
appearance_changed_event.set()
update_display_thread.join()
for runner in runners:
runner.worker.stop()
runner.is_running = False
for runner in runners:
runner.result.reset()
# Cannot pickle generators, locks, sockets or events.
(summary.closest_placeholder_generator, summary._lock,
summary._jobs_added_event, screen._appearance_changed_event,
screen._main_loop, screen._watch_manager, screen.runners,
log._appearance_changed_event) = [None] * 8
open_compressed = functools.partial(gzip.open, compresslevel=1)
dump_pickle_safe(screen, pickle_path, open=open_compressed)
def manage_cache(root_path):
cache_path = os.path.join(root_path, _CACHE_PATH)
timestamp_path = os.path.join(cache_path, ".creation-time")
if os.path.exists(cache_path) and \
os.stat(__file__).st_mtime > os.stat(timestamp_path).st_mtime:
print("Vigil has been updated, so clearing the cache and"
" recalculating all results...")
shutil.rmtree(cache_path)
if not os.path.exists(cache_path):
os.mkdir(cache_path)
open(timestamp_path, "w").close()
if __name__ == "__main__":
if len(sys.argv) == 2:
root_path = os.path.abspath(sys.argv[1])
with terminal.console_title("vigil: " + os.path.basename(root_path)):
manage_cache(root_path)
main(root_path)
else:
usage = __doc__.replace("*", "")
print(usage)