eris/eris/__main__.py

1256 lines
48 KiB
Python
Executable file

#!/usr/bin/env python3.11
# -*- coding: utf-8 -*-
"""Eris Codebase Monitor
Eris maintains an up-to-date set of reports for every file in a codebase.
A status indicator summarises the state of each report, and a report is viewed
by selecting this status indicator with the cursor.
The reports are cached in the codebase's root directory in a ".eris"
directory.
"""
import asyncio
import contextlib
import functools
import gzip
import hashlib
import importlib
import importlib.resources
import itertools
import math
import multiprocessing
import os
import pickle
import shutil
import signal
import subprocess
import sys
import time
import docopt
import fill3
import fill3.terminal as terminal
import lscolors
import pygments.styles
import pyinotify
import termstr
import eris
import eris.worker as worker
import eris.paged_list as paged_list
import eris.sorted_collection as sorted_collection
USAGE = """
Usage:
eris [options] <directory>
eris -h | --help
eris -i | --info
eris -a | --install-all-tools
eris --version
Example:
# eris my_project
Options:
-h, --help Show the full help.
-i, --info Show information about the available tools.
-a, --install-all-tools Install all tools.
-w COUNT, --workers=COUNT The number of processes working in parallel.
By default it is the number of cpus minus 1.
-e "COMMAND", --editor="COMMAND" The command used to start the editor, in
the *edit command. It may contain options.
-t THEME, --theme=THEME The pygment theme used for syntax
highlighting. Defaults to "native".
-c TYPE, --compression=TYPE The type of compression used in the cache:
gzip, lzma, bz2, or none. Defaults to gzip.
--config=FILE_NAME Use a custom config file instead of tools.toml.
"""
KEYS_DOC = """Keys:
arrow keys, page up/down, mouse - Move the cursor or scroll the result pane.
tab - Change the focus between summary and result pane.
q, esc - Quit.
h - Show the help screen. (toggle)
t - Turn the result pane to portrait or landscape. (toggle)
l - Show the activity log. (toggle)
e - Edit the current file with an editor defined by -e, $EDITOR or $VISUAL.
n - Move to the next issue.
N - Move to the next issue of the current tool.
s - Sort files by type, or by directory location. (toggle)
r - Refresh the currently selected report.
R - Refresh all reports of the current tool.
f - Resize the focused pane to the full screen. (toggle)
o - Open the current file with xdg-open.
"""
class Entry:
MAX_WIDTH = 0
def __init__(self, path, results, change_time, highlighted=None, set_results=True):
self.path = path
self.change_time = change_time
self.highlighted = highlighted
self.results = results
if set_results:
# FIX: this is missed for entries appended later
for result in results:
result.entry = self
self.widget = fill3.Row(results)
self.appearance_cache = None
self.last_width = None
def __eq__(self, other):
return self.path == other.path
def __len__(self):
return len(self.results)
def __getitem__(self, index):
return self.results[index]
def appearance(self):
if self.appearance_cache is None or self.last_width != Entry.MAX_WIDTH:
self.last_width = Entry.MAX_WIDTH
if self.highlighted is not None:
self.results[self.highlighted].is_highlighted = True
row_appearance = self.widget.appearance()
path = lscolors.path_colored(self.path)
padding = " " * (self.last_width - len(self.results) + 1)
self.appearance_cache = [row_appearance[0] + padding + path]
if self.highlighted is not None:
self.results[self.highlighted].is_highlighted = False
return self.appearance_cache
def as_html(self):
html_parts = []
styles = set()
for result in self.widget.widgets:
result_html, result_styles = result.as_html()
html_parts.append(result_html)
styles.update(result_styles)
path = lscolors.path_colored(self.path)
padding = " " * (Entry.MAX_WIDTH - len(self.widget.widgets) + 1)
path_html, path_styles = termstr.TermStr(padding + path).as_html()
return "".join(html_parts) + path_html, styles.union(path_styles)
def is_path_excluded(path):
return any(part.startswith(".") for part in path.split(os.path.sep))
def codebase_files(path, skip_hidden_directories=True):
for (dirpath, dirnames, filenames) in os.walk(path):
if skip_hidden_directories:
filtered_dirnames = [dirname for dirname in dirnames if not is_path_excluded(dirname)]
dirnames[:] = filtered_dirnames
for filename in filenames:
if not is_path_excluded(filename):
yield os.path.join(dirpath, filename)
def fix_paths(root_path, paths):
return (os.path.join(".", os.path.relpath(path, root_path))
for path in paths)
def highlight_str(line, highlight_color, transparency):
@functools.lru_cache(maxsize=500)
def blend_style(style):
return termstr.CharStyle(
termstr.blend_color(style.fg_rgb_color, highlight_color, transparency),
termstr.blend_color(style.bg_rgb_color, highlight_color, transparency),
is_bold=style.is_bold, is_italic=style.is_italic, is_underlined=style.is_underlined)
return termstr.TermStr(line).transform_style(blend_style)
def in_green(str_):
return termstr.TermStr(str_, termstr.CharStyle(termstr.Color.lime))
_UP, _DOWN, _LEFT, _RIGHT = (0, -1), (0, 1), (-1, 0), (1, 0)
def directory_sort(entry):
path = entry.path
return (os.path.dirname(path), tools.splitext(path)[1], os.path.basename(path))
def type_sort(entry):
path = entry.path
return (tools.splitext(path)[1], os.path.dirname(path), os.path.basename(path))
class Summary:
def __init__(self, root_path, jobs_added_event):
self._root_path = root_path
self._jobs_added_event = jobs_added_event
self._view_widget = fill3.View.from_widget(self)
self.is_directory_sort = True
self._old_entries = []
self.__cursor_position = (0, 0)
self.reset()
def reset(self):
Entry.MAX_WIDTH = 0
self._max_path_length = 0
self.result_total = 0
self.completed_total = 0
self.is_loaded = False
self.closest_placeholder_generator = None
sort_func = directory_sort if self.is_directory_sort else type_sort
self._entries = sorted_collection.SortedCollection([], key=sort_func)
def __getstate__(self):
state = self.__dict__.copy()
state["closest_placeholder_generator"] = None
state["_jobs_added_event"] = None
summary_path = os.path.join(tools.CACHE_PATH, "summary_dir")
open_compressed = functools.partial(gzip.open, compresslevel=1)
x, y = self.cursor_position()
entries = itertools.chain([self._entries[y]], itertools.islice(self._entries, y),
itertools.islice(self._entries, y+1, None)) if y != 0 else []
state["_old_entries"] = paged_list.PagedList(entries, summary_path, 2000, 1,
exist_ok=True, open_func=open_compressed)
state["_entries"] = None
state["__cursor_position"] = (x, 0)
return state
def __setstate__(self, state):
self.__dict__ = state
self.reset()
@property
def _cursor_position(self):
return self.__cursor_position
@_cursor_position.setter
def _cursor_position(self, new_position):
if new_position != self.__cursor_position:
self.__cursor_position = new_position
self.closest_placeholder_generator = None
def sort_entries(self):
key_func = directory_sort if self.is_directory_sort else type_sort
self._entries = sorted_collection.SortedCollection(self._entries, key=key_func)
self.closest_placeholder_generator = None
def add_entry(self, entry):
if entry in self._entries:
return
for result in entry:
self.result_total += 1
if result.is_completed:
self.completed_total += 1
Entry.MAX_WIDTH = max(len(entry), Entry.MAX_WIDTH)
self._max_path_length = max(len(entry.path) - len("./"), self._max_path_length)
entry_index = self._entries.insert(entry)
x, y = self._cursor_position
if entry_index <= y:
self.scroll(0, -1)
self._jobs_added_event.set()
if self.is_loaded:
self.closest_placeholder_generator = None
def on_file_added(self, path):
full_path = os.path.join(self._root_path, path)
try:
change_time = os.stat(full_path).st_ctime
except OSError:
return
row = [tools.Result(path, tool) for tool in tools.tools_for_path(path)]
entry = Entry(path, row, change_time)
self.add_entry(entry)
def on_file_deleted(self, path):
if os.path.exists(os.path.join(self._root_path, path)):
return
entry = Entry(path, [], None)
try:
index = self._entries.index(entry)
except ValueError:
return
x, y = self._cursor_position
if index < y:
self.scroll(0, 1)
for result in self._entries[index]:
if result.is_completed:
self.completed_total -= 1
self.result_total -= 1
result.delete()
row = self._entries[index]
del self._entries._keys[index]
del self._entries._items[index]
if len(row) == Entry.MAX_WIDTH:
Entry.MAX_WIDTH = max((len(entry) for entry in self._entries), default=0)
if (len(path) - 2) == self._max_path_length:
self._max_path_length = max(((len(entry.path) - 2) for entry in self._entries),
default=0)
x, y = self._cursor_position
if y == len(self._entries):
self._cursor_position = x, y - 1
self.closest_placeholder_generator = None
def on_file_modified(self, path):
entry = Entry(path, [], None)
try:
entry_index = self._entries.index(entry)
except ValueError:
return
entry = self._entries[entry_index]
for result in entry:
self.refresh_result(result, only_completed=False)
self.closest_placeholder_generator = None
return entry
@contextlib.contextmanager
def keep_selection(self):
try:
cursor_path = self.get_selection().path
except AttributeError:
yield
return
x, y = self._cursor_position
yield
for index, row in enumerate(self._entries):
if row.path == cursor_path:
self._cursor_position = (x, index)
return
if y >= len(self._entries):
self._cursor_position = (x, len(self._entries) - 1)
async def sync_with_filesystem(self, log=None):
start_time = time.time()
cache = {}
log.log_message("Started loading summary…")
for index, entry in enumerate(self._old_entries):
if index != 0 and index % 5000 == 0:
log.log_message(f"Loaded {index} files…")
await asyncio.sleep(0)
self.add_entry(entry)
if index % 1000 == 0:
fill3.APPEARANCE_CHANGED_EVENT.set()
cache[entry.path] = entry.change_time
duration = time.time() - start_time
log.log_message(f"Finished loading summary. {round(duration, 2)} secs")
self.is_loaded = True
self.closest_placeholder_generator = None
log.log_message("Started sync with filesystem…")
start_time = time.time()
all_paths = set()
for path in fix_paths(self._root_path, codebase_files(self._root_path)):
await asyncio.sleep(0)
all_paths.add(path)
if path in cache:
full_path = os.path.join(self._root_path, path)
change_time = os.stat(full_path).st_ctime
if change_time != cache[path]:
cache[path] = change_time
entry = self.on_file_modified(path)
entry.change_time = change_time
else:
self.on_file_added(path)
fill3.APPEARANCE_CHANGED_EVENT.set()
for path in cache.keys() - all_paths:
await asyncio.sleep(0)
self.on_file_deleted(path)
duration = time.time() - start_time
log.log_message(f"Finished sync with filesystem. {round(duration, 2)} secs")
def _sweep_up(self, x, y):
yield from reversed(self._entries[y][:x])
while True:
y = (y - 1) % len(self._entries)
yield from reversed(self._entries[y])
def _sweep_down(self, x, y):
yield from self._entries[y][x:]
while True:
y = (y + 1) % len(self._entries)
yield from self._entries[y]
def _sweep_combined(self, x, y):
for up_result, down_result in zip(self._sweep_up(x, y), self._sweep_down(x, y)):
yield down_result
yield up_result
def _placeholder_sweep(self):
x, y = self.cursor_position()
for index, result in enumerate(self._sweep_combined(x, y)):
if index > self.result_total:
break
if result.status == tools.Status.pending:
yield result
async def get_closest_placeholder(self):
if self.closest_placeholder_generator is None:
self.closest_placeholder_generator = self._placeholder_sweep()
try:
return self.closest_placeholder_generator.send(None)
except StopIteration:
raise StopAsyncIteration
def appearance_dimensions(self):
return self._max_path_length + 1 + Entry.MAX_WIDTH, len(self._entries)
def appearance_interval(self, interval):
start_y, end_y = interval
x, y = self.cursor_position()
self._entries[y].highlighted = x
self._entries[y].appearance_cache = None
appearance = fill3.Column(self._entries).appearance_interval(interval)
self._entries[y].highlighted = None
self._entries[y].appearance_cache = None
return appearance
def _set_scroll_position(self, cursor_x, cursor_y, summary_height):
scroll_x, scroll_y = new_scroll_x, new_scroll_y = self._view_widget.position
if cursor_y < scroll_y:
new_scroll_y = max(cursor_y - summary_height + 1, 0)
if (scroll_y + summary_height - 1) < cursor_y:
new_scroll_y = cursor_y
self._view_widget.position = new_scroll_x, new_scroll_y
def _highlight_cursor_row(self, appearance, cursor_y):
scroll_x, scroll_y = self._view_widget.position
highlighted_y = cursor_y - scroll_y
appearance[highlighted_y] = (highlight_str(appearance[highlighted_y][:-1],
termstr.Color.white, 0.8)
+ appearance[highlighted_y][-1])
return appearance
def appearance_for(self, dimensions):
width, height = dimensions
if len(self._entries) == 0:
return [" " * width for row in range(height)]
cursor_x, cursor_y = self.cursor_position()
width, height = width - 1, height - 1 # Minus one for the scrollbars
self._set_scroll_position(cursor_x, cursor_y, height)
return self._highlight_cursor_row(self._view_widget.appearance_for(dimensions), cursor_y)
def scroll(self, dx, dy):
scroll_x, scroll_y = self._view_widget.position
dy = min(dy, scroll_y)
self._view_widget.position = scroll_x, scroll_y - dy
self._move_cursor((0, -dy))
def cursor_position(self):
x, y = self._cursor_position
try:
return min(x, len(self._entries[y])-1), y
except IndexError:
return 0, 0
def get_selection(self):
x, y = self.cursor_position()
return self._entries[y][x]
def _move_cursor(self, vector):
if vector == (0, 0):
return
dx, dy = vector
if dy == 0:
x, y = self.cursor_position()
self._cursor_position = ((x + dx) % len(self._entries[y]), y)
elif dx == 0:
x, y = self._cursor_position
self._cursor_position = (x, (y + dy) % len(self._entries))
else:
raise ValueError
def cursor_right(self):
self._move_cursor(_RIGHT)
def cursor_left(self):
self._move_cursor(_LEFT)
def cursor_up(self):
self._move_cursor(_UP)
def cursor_down(self):
self._move_cursor(_DOWN)
def cursor_page_up(self):
view_width, view_height = self._view_widget.portal.last_dimensions
self.scroll(0, view_height)
def cursor_page_down(self):
view_width, view_height = self._view_widget.portal.last_dimensions
self.scroll(0, -view_height)
def cursor_home(self):
x, y = self._cursor_position
self._cursor_position = x, 0
def cursor_end(self):
x, y = self._cursor_position
self._cursor_position = x, len(self._entries) - 1
def _issue_generator(self):
x, y = self.cursor_position()
for index in range(len(self._entries) + 1):
row_index = (index + y) % len(self._entries)
row = self._entries[row_index]
for index_x, result in enumerate(row):
if (result.status == tools.Status.problem and
not (row_index == y and index_x <= x and index != len(self._entries))):
yield result, (index_x, row_index)
def move_to_next_issue(self):
with contextlib.suppress(StopIteration):
issue, self._cursor_position = self._issue_generator().send(None)
def move_to_next_issue_of_tool(self):
current_tool = self.get_selection().tool
for issue, position in self._issue_generator():
if issue.tool == current_tool:
self._cursor_position = position
return
def refresh_result(self, result, only_completed=True):
if result.is_completed or not only_completed:
if result.is_completed:
self.completed_total -= 1
result.reset()
result.delete()
self.closest_placeholder_generator = None
self._jobs_added_event.set()
def refresh_tool(self, tool):
for row in self._entries:
for result in row:
if result.tool == tool:
self.refresh_result(result)
def clear_running(self):
for row in self._entries:
for result in row:
if result.status == tools.Status.running:
self.refresh_result(result)
def as_html(self):
html_parts = []
styles = set()
for row in self._entries:
html_row, styles_row = row.as_html()
html_parts.append(html_row)
styles.update(styles_row)
return ("<style>a { text-decoration:none; }</style><pre>" +
"<br>".join(html_parts) + "</pre>"), styles
class Log:
_GREY_BOLD_STYLE = termstr.CharStyle(termstr.Color.grey_100, is_bold=True)
_GREEN_STYLE = termstr.CharStyle(termstr.Color.lime)
def __init__(self):
self.lines = []
self._appearance = None
def log_message(self, message, timestamp=None, char_style=None):
if isinstance(message, list):
message = [part[1] if isinstance(part, tuple) else part for part in message]
message = termstr.join("", message)
if char_style is not None:
message = termstr.TermStr(message, char_style)
timestamp = time.strftime("%H:%M:%S", time.localtime()) if timestamp is None else timestamp
line = termstr.TermStr(timestamp, Log._GREY_BOLD_STYLE) + " " + message
if not sys.stdout.isatty():
print(line, flush=True)
return
self.lines.append(line)
self._appearance = None
fill3.APPEARANCE_CHANGED_EVENT.set()
def log_command(self, message, timestamp=None):
self.log_message(message, char_style=Log._GREEN_STYLE)
def appearance_for(self, dimensions):
if self._appearance is None or fill3.appearance_dimensions(self._appearance) != dimensions:
width, height = dimensions
del self.lines[:-height]
self._appearance = fill3.appearance_resize(self.lines, dimensions)
return self._appearance
def highlight_chars(str_, style, marker="*"):
parts = str_.split(marker)
highlighted_parts = [termstr.TermStr(part[0], style) + part[1:]
for part in parts[1:] if part != ""]
return termstr.join("", [parts[0]] + highlighted_parts)
def get_status_help():
return termstr.join("\n", ["Statuses:"] + [" " + tools.STATUS_TO_TERMSTR[status] + " " + meaning
for status, meaning in tools.STATUS_MEANINGS])
class Help:
def __init__(self, summary, screen):
self.summary = summary
self.screen = screen
help_text = termstr.join("\n", [__doc__, KEYS_DOC, get_status_help()])
self.view = fill3.View.from_widget(fill3.Text(help_text))
self.widget = fill3.Border(self.view, title="Help")
portal = self.view.portal
self.key_map = {
"h": self._exit_help, terminal.UP: portal.scroll_up,
terminal.DOWN: portal.scroll_down, terminal.LEFT: portal.scroll_left,
terminal.RIGHT: portal.scroll_right, "q": self._exit_help,
terminal.ESC: self._exit_help}
def _exit_help(self):
self.screen._is_help_visible = False
def on_mouse_input(self, term_code):
event = terminal.decode_mouse_input(term_code)
match event[1]:
case terminal.MOUSE_WHEEL_UP:
self.view.portal.scroll_up()
fill3.APPEARANCE_CHANGED_EVENT.set()
case terminal.MOUSE_WHEEL_DOWN:
self.view.portal.scroll_down()
fill3.APPEARANCE_CHANGED_EVENT.set()
def on_keyboard_input(self, term_code):
action = self.key_map.get(term_code) or self.key_map.get(term_code.lower())
if action is not None:
action()
fill3.APPEARANCE_CHANGED_EVENT.set()
def appearance_for(self, dimensions):
return self.widget.appearance_for(dimensions)
class Listing:
def __init__(self, view):
self.view = view
self.last_dimensions = None
def appearance_for(self, dimensions):
self.last_dimensions = dimensions
return self.view.appearance_for(dimensions)
class Screen:
def __init__(self, summary, log):
self._summary = summary
self._log = log
self._is_summary_focused = True
self.workers = None
self._is_listing_portrait = True
self._is_log_visible = True
self._is_help_visible = False
self._is_fullscreen = False
self._make_widgets()
self._last_mouse_position = 0, 0
def __getstate__(self):
state = self.__dict__.copy()
state["workers"] = None
return state
def make_workers(self, worker_count, compression):
workers = []
for index in range(worker_count):
worker_ = worker.Worker(compression)
workers.append(worker_)
future = worker_.job_runner(self, self._summary, self._log,
self._summary._jobs_added_event)
worker_.future = future
self.workers = workers
def stop_workers(self):
for worker_ in self.workers:
if worker_.result is not None:
worker_.result.reset()
worker_.kill()
@staticmethod
def _partition(percentage, widgets, length):
smaller_length = max(int(length * (percentage / 100)), 10)
return [smaller_length, length - smaller_length]
def _make_widgets(self):
self._help_widget = Help(self._summary, self)
root_path = os.path.basename(self._summary._root_path)
summary = fill3.Border(self._summary, title="Summary of " + root_path)
self._summary_border = summary
try:
selected_widget = self._summary.get_selection()
result_widget = selected_widget.result
except IndexError:
result_widget = fill3.Text("Nothing selected")
self._view = fill3.View.from_widget(result_widget)
self._listing = fill3.Border(Listing(self._view))
log = fill3.Border(self._log, title="Log", characters=Screen._DIMMED_BORDER)
quarter_cut = functools.partial(self._partition, 25)
golden_cut = functools.partial(self._partition, 38.198)
three_quarter_cut = functools.partial(self._partition, 75)
port_log = fill3.Row([fill3.Column([summary, log], three_quarter_cut), self._listing],
golden_cut)
land_log = fill3.Column([fill3.Row([summary, log]), self._listing], quarter_cut)
port_no_log = fill3.Row([summary, self._listing], golden_cut)
land_no_log = fill3.Column([summary, self._listing], quarter_cut)
self._layouts = [[land_no_log, port_no_log], [land_log, port_log]]
self._set_focus()
def toggle_help(self):
self._is_help_visible = not self._is_help_visible
def toggle_log(self):
self._is_log_visible = not self._is_log_visible
def toggle_window_orientation(self):
self._is_listing_portrait = not self._is_listing_portrait
def _move_listing(self, vector):
dx, dy = vector
selected_widget = self._summary.get_selection()
x, y = selected_widget.scroll_position
widget_width, widget_height = fill3.appearance_dimensions(
selected_widget.result.appearance())
listing_width, listing_height = self._listing.widget.last_dimensions
listing_width -= 1 # scrollbars
listing_height -= 1
x = min(x + dx, max(widget_width - listing_width, 0))
y = min(y + dy, max(widget_height - listing_height, 0))
x = max(0, x)
y = max(0, y)
selected_widget.scroll_position = x, y
def cursor_up(self):
self._summary.cursor_up() if self._is_summary_focused else self._move_listing(_UP)
def cursor_down(self):
self._summary.cursor_down() if self._is_summary_focused else self._move_listing(_DOWN)
def cursor_right(self):
self._summary.cursor_right() if self._is_summary_focused else self._move_listing(_RIGHT)
def cursor_left(self):
self._summary.cursor_left() if self._is_summary_focused else self._move_listing(_LEFT)
def cursor_page_up(self):
self._summary.cursor_page_up() if self._is_summary_focused else self.listing_page_up()
def cursor_page_down(self):
self._summary.cursor_page_down() if self._is_summary_focused else self.listing_page_down()
def cursor_end(self):
self._summary.cursor_end() if self._is_summary_focused else self._page_listing(_RIGHT)
def cursor_home(self):
self._summary.cursor_home() if self._is_summary_focused else self._page_listing(_LEFT)
def _page_listing(self, vector):
dx, dy = vector
listing_width, listing_height = self._listing.widget.last_dimensions
self._move_listing((dx * (listing_width // 2), dy * (listing_height // 2)))
def listing_page_up(self):
self._page_listing(_UP)
def listing_page_down(self):
self._page_listing(_DOWN)
def move_to_next_issue(self):
self._summary.move_to_next_issue()
def move_to_next_issue_of_tool(self):
self._summary.move_to_next_issue_of_tool()
def edit_file(self):
if self.editor_command is None:
self._log.log_message("An editor has not been defined. "
"See option -e.")
else:
path = self._summary.get_selection().path
path_colored = lscolors.path_colored(path)
line_num = self._summary.get_selection().entry[0].scroll_position[1] + 1
self._log.log_message([in_green("Editing "), path_colored,
in_green(f" at line {line_num}")])
subprocess.Popen(f"{self.editor_command} +{line_num} {path}", shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def toggle_status_style(self):
self._summary.toggle_status_style(self._log)
def toggle_order(self):
self._summary.is_directory_sort = not self._summary.is_directory_sort
sort_order = ("directory then type" if self._summary.is_directory_sort
else "type then directory")
self._log.log_command(f"Sorting files by {sort_order}.")
with self._summary.keep_selection():
self._summary.sort_entries()
def quit_(self):
fill3.SHUTDOWN_EVENT.set()
def refresh(self):
selection = self._summary.get_selection()
tool_name = tools.tool_name_colored(selection.tool, selection.path)
path_colored = lscolors.path_colored(selection.path)
self._log.log_message([in_green("Refreshing "), tool_name, in_green(" result of "),
path_colored, in_green("")])
self._summary.refresh_result(selection)
def refresh_tool(self):
selection = self._summary.get_selection()
tool_name = tools.tool_name_colored(selection.tool, selection.path)
self._log.log_message([in_green("Refreshing all results of "), tool_name, in_green("")])
self._summary.refresh_tool(selection.tool)
_DIMMED_BORDER = [termstr.TermStr(part, fg_color=termstr.Color.grey_100)
for part in fill3.Border.THIN]
def _set_focus(self):
focused, unfocused = fill3.Border.THICK, Screen._DIMMED_BORDER
self._summary_border.set_style(focused if self._is_summary_focused else unfocused)
self._listing.set_style(unfocused if self._is_summary_focused else focused)
def toggle_focus(self):
self._is_summary_focused = not self._is_summary_focused
self._set_focus()
def toggle_fullscreen(self):
self._is_fullscreen = not self._is_fullscreen
def xdg_open(self):
path = self._summary.get_selection().path
path_colored = lscolors.path_colored(path)
self._log.log_message([in_green("Opening "), path_colored, in_green("")])
subprocess.Popen(["xdg-open", path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def save(self):
worker.Worker.unsaved_jobs_total = 0
pickle_path = os.path.join(tools.CACHE_PATH, "summary.pickle")
open_compressed = functools.partial(gzip.open, compresslevel=1)
tools.dump_pickle_safe(self, pickle_path, open=open_compressed)
def _select_entry_at_position(self, x, y, view_width, view_height):
border_width = 1
if x < border_width or y < border_width or x > view_width or y > view_height:
return
view_x, view_y = self._summary._view_widget.portal.position
column_index = x - border_width + view_x
row_index = y - border_width + view_y
if row_index >= len(self._summary._entries):
return
row = self._summary._entries[row_index]
if column_index < 0 or column_index >= len(row):
return
self._summary._cursor_position = column_index, row_index
def _is_switching_focus(self, x, y, view_width, view_height):
return (not self._is_fullscreen and
(self._is_listing_portrait and
(x > view_width and self._is_summary_focused or x <= view_width and
not self._is_summary_focused) or not self._is_listing_portrait and
(y > view_height and self._is_summary_focused or y <= view_height and
not self._is_summary_focused)))
def on_mouse_input(self, term_code):
if self._is_help_visible:
self._help_widget.on_mouse_input(term_code)
return
event = terminal.decode_mouse_input(term_code)
if event[0] not in [terminal.MOUSE_PRESS, terminal.MOUSE_DRAG]:
return
x, y = event[2:4]
if event[0] == terminal.MOUSE_DRAG:
last_x, last_y = self._last_mouse_position
dx, dy = x - last_x, y - last_y
if self._is_summary_focused:
self._summary.scroll(dx, dy)
else:
self._move_listing((-dx, -dy))
else: # Mouse press
if event[1] == terminal.MOUSE_WHEEL_UP:
self.listing_page_up()
elif event[1] == terminal.MOUSE_WHEEL_DOWN:
self.listing_page_down()
else:
view_width, view_height = self._summary._view_widget.portal.last_dimensions
if self._is_switching_focus(x, y, view_width, view_height):
self.toggle_focus()
else:
self._select_entry_at_position(
x, y, view_width, view_height)
self._last_mouse_position = x, y
fill3.APPEARANCE_CHANGED_EVENT.set()
def on_keyboard_input(self, term_code):
if self._is_help_visible:
self._help_widget.on_keyboard_input(term_code)
return
action = Screen._KEY_MAP.get(term_code) or Screen._KEY_MAP.get(term_code.lower())
if action is not None:
action(self)
fill3.APPEARANCE_CHANGED_EVENT.set()
def _fix_listing(self):
widget = self._summary.get_selection()
view = self._listing.widget.view
view.position = widget.scroll_position
x, y = view.position
view.widget = widget.result
tool_name = tools.tool_name_colored(widget.tool, widget.path)
divider = " " + self._listing.top * 2 + " "
self._listing.title = (lscolors.path_colored(widget.path) + divider + tool_name + " " +
tools.STATUS_TO_TERMSTR[widget.status] + divider + "line " + str(y+1))
_STATUS_BAR = highlight_chars(" *help *quit *t*a*b:focus *turn *log *edit *next *sort"
" *refresh *fullscreen *open", Log._GREEN_STYLE)
@functools.cache
def _get_partial_bar_chars(self, bar_transparency):
bar_color = termstr.blend_color(termstr.Color.black, termstr.Color.white, bar_transparency)
return [termstr.TermStr(char, fg_color=bar_color, bg_color=termstr.Color.black)
for char in fill3.ScrollBar._PARTIAL_CHARS[1]]
def _get_status_bar_appearance(self, width, progress_bar_size):
bar_transparency = 0.7
bar = self._STATUS_BAR.center(width)[:width]
fraction, whole = math.modf(progress_bar_size)
whole = int(whole)
if whole < len(bar) and bar[whole].data == " ":
left_part = bar[:whole]
right_part = (self._get_partial_bar_chars(bar_transparency)[int(fraction * 8)]
+ bar[whole+1:])
else:
progress_bar_size = round(progress_bar_size)
left_part = bar[:progress_bar_size]
right_part = bar[progress_bar_size:]
return [highlight_str(left_part, termstr.Color.white, bar_transparency) + right_part]
def _get_status_bar(self, width):
incomplete = self._summary.result_total - self._summary.completed_total
progress_bar_size = (width if self._summary.result_total == 0 else
max(0, width * incomplete / self._summary.result_total))
return self._get_status_bar_appearance(width, progress_bar_size)
def appearance_for(self, dimensions):
if len(self._summary._entries) > 0:
self._fix_listing()
if self._is_help_visible:
body = self._help_widget
elif self._is_fullscreen:
body = self._summary_border if self._is_summary_focused else self._listing
else:
body = self._layouts[self._is_log_visible][self._is_listing_portrait]
width, height = max(dimensions[0], 10), max(dimensions[1], 20)
result = body.appearance_for((width, height-1)) + self._get_status_bar(width)
return (result if (width, height) == dimensions
else fill3.appearance_resize(result, dimensions))
_KEY_MAP = {"t": toggle_window_orientation, "l": toggle_log, "h": toggle_help,
terminal.UP: cursor_up, terminal.DOWN: cursor_down, terminal.LEFT: cursor_left,
terminal.RIGHT: cursor_right, terminal.PAGE_DOWN: cursor_page_down,
terminal.PAGE_UP: cursor_page_up, "s": toggle_order, terminal.HOME: cursor_home,
terminal.END: cursor_end, "n": move_to_next_issue, "N": move_to_next_issue_of_tool,
"e": edit_file, "q": quit_, terminal.ESC: quit_, terminal.CTRL_C: quit_,
"r": refresh, "R": refresh_tool, "\t": toggle_focus, "f": toggle_fullscreen,
"o": xdg_open}
def setup_inotify(root_path, loop, on_filesystem_event, exclude_filter):
watch_manager = pyinotify.WatchManager()
event_mask = (pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_CLOSE_WRITE |
pyinotify.IN_ATTRIB | pyinotify.IN_MOVED_FROM | pyinotify.IN_MOVED_TO)
watch_manager.add_watch(root_path, event_mask, rec=True, auto_add=True,
proc_fun=on_filesystem_event, exclude_filter=exclude_filter,
quiet=False)
return pyinotify.AsyncioNotifier(watch_manager, loop, callback=lambda notifier: None)
def load_state(pickle_path, jobs_added_event, root_path):
is_first_run = True
try:
with gzip.open(pickle_path, "rb") as file_:
screen = pickle.load(file_)
except (FileNotFoundError, AttributeError):
summary = Summary(root_path, jobs_added_event)
log = Log()
screen = Screen(summary, log)
else:
is_first_run = False
summary = screen._summary
summary._jobs_added_event = jobs_added_event
summary._root_path = root_path
summary.clear_running()
log = screen._log
return summary, screen, log, is_first_run
def on_filesystem_event(event, summary, root_path):
path = list(fix_paths(root_path, [event.pathname]))[0]
if is_path_excluded(path[2:]):
return
inotify_actions = {pyinotify.IN_CREATE: summary.on_file_added,
pyinotify.IN_MOVED_TO: summary.on_file_added,
pyinotify.IN_DELETE: summary.on_file_deleted,
pyinotify.IN_MOVED_FROM: summary.on_file_deleted,
pyinotify.IN_ATTRIB: summary.on_file_modified,
pyinotify.IN_CLOSE_WRITE: summary.on_file_modified}
if event.mask not in inotify_actions:
return
try:
inotify_actions[event.mask](path)
except Exception:
tools.log_error()
raise KeyboardInterrupt
fill3.APPEARANCE_CHANGED_EVENT.set()
async def main(title, root_path, worker_count=None, editor_command=None, theme=None,
compression=None):
loop = asyncio.get_running_loop()
if worker_count is None:
worker_count = max(multiprocessing.cpu_count() - 1, 1)
if theme is None:
theme = "native"
if compression is None:
compression = "gzip"
os.environ["PYGMENT_STYLE"] = theme
pickle_path = os.path.join(tools.CACHE_PATH, "summary.pickle")
jobs_added_event = asyncio.Event()
summary, screen, log, is_first_run = load_state(pickle_path, jobs_added_event, root_path)
screen.editor_command = editor_command
fill3.APPEARANCE_CHANGED_EVENT = asyncio.Event()
log.log_message("Program started.")
jobs_added_event.set()
def callback(event):
on_filesystem_event(event, summary, root_path)
notifier = setup_inotify(root_path, loop, callback, is_path_excluded)
try:
log.log_message(f"Starting workers ({worker_count}) …")
screen.make_workers(worker_count, compression)
loop.create_task(summary.sync_with_filesystem(log))
for worker_ in screen.workers:
loop.create_task(worker_.future)
try:
if sys.stdout.isatty() or "unittest" in sys.modules:
await fill3.tui(title, screen)
log.log_message("Program stopped.")
else:
shutdown_event = asyncio.Event()
with (fill3.signal_handler(loop, signal.SIGINT, shutdown_event.set),
fill3.signal_handler(loop, signal.SIGTERM, shutdown_event.set)):
await shutdown_event.wait()
finally:
screen.stop_workers()
finally:
notifier.stop()
if summary.is_loaded:
screen.save()
@contextlib.contextmanager
def chdir(path):
old_cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(old_cwd)
def source_checksum():
resources = ["__main__.py", "tools.py"]
if "ERIS_CONFIG" not in os.environ:
resources.append("tools.toml")
checksum_paths = []
for resource in resources:
with importlib.resources.path(eris, resource) as resource_path:
checksum_paths.append(resource_path)
if "ERIS_CONFIG" in os.environ:
checksum_paths.append(os.environ["ERIS_CONFIG"])
sha256 = hashlib.sha256()
for path in checksum_paths:
with open(path, "rb") as source_file:
sha256.update(source_file.read())
return sha256.hexdigest()
def manage_cache(root_path):
cache_path = os.path.join(root_path, tools.CACHE_PATH)
checksum_path = os.path.join(cache_path, "source_checksum")
if os.path.exists(cache_path):
try:
with open(checksum_path, "r") as checksum_file:
cache_checksum = checksum_file.read()
except FileNotFoundError:
cache_checksum = None
if source_checksum() != cache_checksum:
print("Eris has changed, recalculating all results…")
shutil.rmtree(cache_path)
if not os.path.exists(cache_path):
os.mkdir(cache_path)
with open(checksum_path, "w") as checksum_file:
checksum_file.write(source_checksum())
gitignore_path = os.path.join(cache_path, ".gitignore")
with open(gitignore_path, "w") as gitignore_file:
gitignore_file.write("*")
@contextlib.contextmanager
def print_to_pager(pager_command=["less", "-RFEX"]):
try:
with subprocess.Popen(pager_command, stdin=subprocess.PIPE, text=True) as pager_process:
with contextlib.redirect_stdout(pager_process.stdin):
yield
except FileNotFoundError:
yield
def print_tool_info():
extensions_for_tool = {}
for extensions, tools_ in tools.TOOLS_FOR_EXTENSIONS:
for extension in extensions:
for tool in tools_:
extensions_for_tool.setdefault(tool, {extension}).add(extension)
with print_to_pager():
for tool in sorted(tools.tools_all(), key=lambda t: t.__name__):
print(termstr.TermStr(tool.__name__, is_bold=True) if tools.is_tool_available(tool)
else termstr.TermStr(tool.__name__, fg_color=termstr.Color.red) + " (not available)")
if hasattr(tool, "command"):
print(f"command: {tool.command} foo.{extensions[0]}")
else:
print("function:", "eris.tools." + tool.__name__)
print("url:", tool.url)
extensions = list(extensions_for_tool.get(tool, {"*"}))
print("extensions:", ", ".join(extensions))
print("")
def install_all_tools():
tools_ = tools.dependencies()
os_release_lines = open("/etc/os-release").readlines()
install_command = ["apt", "-y", "install"] # debian / ubuntu
if "ID=fedora\n" in os_release_lines:
tools_.remove("lua-check")
renames = {"lua5.3": "lua", "python3-bandit": "bandit", "ruby3.1": "ruby",
"xz-utils": "xz"}
tools_ = [renames.get(tool, tool) for tool in tools_]
install_command = ["dnf", "-y", "install"]
elif "ID=arch\n" in os_release_lines:
tools_.remove("rakudo")
tools_.remove("python3-pdfminer") # pdf2txt is not in arch
tools_.remove("perl-doc") # perldoc is in perl but not in the path
tools_.remove("7zip")
renames = {"genisoimage": "cdrkit", "lua5.3": "lua", "ruby3.1": "ruby", "xz-utils": "xz",
"g++": "gcc", "golang-go": "go", "lua-check": "luacheck", "php-cli": "php",
"pylint": "python-pylint", "python3-bandit": "bandit", "python3-mypy": "mypy"}
tools_ = [renames.get(tool, tool) for tool in tools_]
tools_ = ["python-" + tool[len("python3-"):] if tool.startswith("python3-") else tool
for tool in tools_]
install_command = ["pacman", "--noconfirm", "-S"]
elif "ID=alpine\n" in os_release_lines:
tools_.remove("python3-bandit")
tools_.remove("wabt")
renames = {"genisoimage": "cdrkit", "lua5.3": "lua", "ruby3.1": "ruby", "xz-utils": "xz",
"g++": "gcc", "golang-go": "go", "lua-check": "luacheck", "php-cli": "php",
"pylint": "py3-pylint", "tidy": "tidyhtml"}
tools_ = [renames.get(tool, tool) for tool in tools_]
tools_ = ["py3-" + tool[len("python3-"):] if tool.startswith("python3-") else tool
for tool in tools_]
install_command = ["apk", "add"]
subprocess.run(["sudo"] + install_command + sorted(tools_), check=True)
def check_arguments():
global tools
cmdline_help = __doc__ + USAGE.replace("*", "")
arguments = docopt.docopt(cmdline_help, default_help=False)
if arguments["--help"]:
print(cmdline_help)
sys.exit(0)
if arguments["--version"]:
print(eris.__version__)
sys.exit(0)
if arguments["--config"] is not None:
config_path = arguments["--config"]
if not os.path.exists(config_path):
print("File does not exist:", config_path)
sys.exit(1)
os.environ["ERIS_CONFIG"] = config_path
import eris.tools as tools
if arguments["--install-all-tools"]:
print("Installing all tools… (requires sudo)")
install_all_tools()
print("Done.")
sys.exit(0)
if arguments["--info"]:
print_tool_info()
sys.exit(0)
worker_count = None
try:
if arguments["--workers"] is not None:
worker_count = int(arguments["--workers"])
if worker_count == 0:
print("There must be at least one worker.")
sys.exit(1)
except ValueError:
print("--workers requires a number.")
sys.exit(1)
root_path = os.path.abspath(arguments["<directory>"])
if not os.path.exists(root_path):
print("File does not exist:", root_path)
sys.exit(1)
if not os.path.isdir(root_path):
print("File is not a directory:", root_path)
sys.exit(1)
if arguments["--theme"] is not None:
themes = list(pygments.styles.get_all_styles())
if arguments["--theme"] not in themes:
print("--theme must be one of:", " ".join(themes))
sys.exit(1)
if arguments["--compression"] is not None:
compressions = ["gzip", "lzma", "bz2", "none"]
if arguments["--compression"] not in compressions:
print("--compression must be one of:", " ".join(compressions))
sys.exit(1)
editor_command = (arguments["--editor"] or os.environ.get("EDITOR", None)
or os.environ.get("VISUAL", None))
return root_path, worker_count, editor_command, arguments["--theme"], arguments["--compression"]
def inotify_watches_exceeded():
print("Error: This codebase has too many directories to be monitored.")
print(" Fix by increasing the kernel parameter user.max_inotify_watches "
"to exceed the number of directories.")
print(" e.g. 'sudo sysctl user.max_inotify_watches=200000'")
def entry_point():
root_path, worker_count, editor_command, theme, compression = check_arguments()
manage_cache(root_path)
title = "eris: " + os.path.basename(root_path)
with chdir(root_path): # FIX: Don't change directory if possible.
try:
asyncio.run(main(title, root_path, worker_count, editor_command, theme, compression))
except pyinotify.WatchManagerError:
inotify_watches_exceeded()
if __name__ == "__main__":
entry_point()
import eris.tools as tools # Webserver needs this to unpickle the summary.