Initial commit

This commit is contained in:
Andrew Hamilton 2015-12-14 18:03:11 +00:00
parent 23e2f8e676
commit fc4b2ced58
25 changed files with 4130 additions and 0 deletions

216
BUGS Normal file
View file

@ -0,0 +1,216 @@
Current
- Changing the status style with 'watching' off can result in recalculations.
- In stterm the previous console title isn't being restored.
Seems to be a bug in stterm, because its okay in gnome-terminal.
- Some jobs always are recalculated when restarting vigil.
e.g. vigil.py pylint, BUGS metadata, BUGS _pygments
- Sometimes when quitting: "close failed in object destructor",
"sys.excepthook is missing"
- If a pending file is deleted, while not watching the filesystem,
then tools fail when they can't find the file. Also, even if watching, there
would be a race. Do what?
- The scrollbars in the help screen don't work with the arrow keys.
Current (tool related)
- disassemble.py is not always found
- disassemble doesn't work for python3
- gut had an error with utf-8. A traceback printed directly on the screen,
garbling the interface.
- Some file's display is garbled. e.g. perldoc of FieldHash.pm
- Scrolling right on a result from disassemble_pyc (or pydoc run on termstr.py)
causes screen corruption.
Fixed
- The interface is hanging during large tool computations.
- Your cwd relative to the code base path should not be important. Currently it
causes the cached paths not to be found.
- Clicking the mouse button causes a traceback.
(Started when the keyhandler was added)
- The cursor disappears when on the placeholders "."
- The filenames are only showing their basenames.
- After moving with the mouse, the new position is not remembered. So that
moving with keys moves from the position you last had when moving with keys.
- Not working correctly: Let rows in the table have different numbers of
columns. If navigating between rows, from one row, to another with less
columns, change the column of the cursor.
- Sometimes when moving with the mouse the wrong result appears. The column
doesn't change.
- on_file_saved is responding to everything including when the cache is saved
- It seems that on_file_saved won't work properly with subdirectories. The event
path is always a basename.
- Tabs are appearing as question marks in the source code.
- When changing a file and the results are updated... The updates don't show on
the screen, you need to move the cursor over them to force them to be shown.
And in this time the CPU is 100%.
- An exception shows when the cache changes while it is saving.
No more autosave.
- Fix problems caused when displaying long path names.
<- So far only fixed by truncating the file name.
- Bug in _closes_result when using min().
- urwid is not coping with large files well.
- The result does not change to "?", only the statuses. (urwid implementation)
- The screen does not change from the placeholders if you never move the
cursor. (urwid implementation)
- Have seen a hang. It was running the 'file' program on 'table3.py' at the
time. Still responded to 'q'. On next run all the results were in the cache.
Maybe the hang was only in the display. (urwid implementation)
- Quiting out sometimes hangs. (urwid implementation)
- Often the program crashes early on. (urwid implementation)
- One time the program was slow after running for a long time, and after
restarting it was definately quicker. (urwid implementation)
- Views crash when scrolled too far.
- Ensure the terminal is in a good state if the process is interrupted.
- The listing's title is missing when the program starts.
- Its possible to be calculating the same result more than once at the same time.
- The scrollbar disappears on long summaries, when moving to the next page down.
<- urwid related
- Sometimes the cache isn't being saved
<- changed cache implementation
- signal_process_tree is signalling the root process when it shouldn't.
<- not using signal_process_tree at the moment.
- "./table.py t *.py" shouldn't show the py files in the parent directory of t.
<- Not doing globbing at the moment.
- Running out of space while saving the cache leaves a partially written .tmp file.
<- File is deleted
- If a file becomes executable this is not detected and it will have the wrong
color.
- When quitting out any currently running jobs are forgotten, and forever left in
the running state after restart.
- Quiting out is hanging.
- Quiting out is not producing an error code of 0. i.e. $? != 0 If all the
results are calculated it does give 0 when quiting out.
- If a tool is interrupted its result will be incorrect, and worse cached.
- The interface is very slow and almost unresponsive on larger projects.
<- The whole summary was mistakenly being rendered.
<- Changes to the cache weren't excluded from events caught by pyinotify..
- Sometimes when quiting out the screen is not restored
- Sometimes when starting up the whole screen is not drawn, only the first diff.
<- The screen update thread was being started before the blessings fullscreen
context manager.
- Currently any issue thats on the current row, to the left of the cursor, won't
be found, even if its the last one.
- The metadata result isn't formatted correctly.
- The log is truncated, and is too short on large displays.
- The "All results are up to date" message doesn't show immediately.
<- The most recent n messages weren't being shown.
Also it ideally wouldn't be repeated by each worker thread.
<- still being repeated
- Sometimes two results are yellow simultaneously, even though theres only
one worker thread.
- Caused when the pep8 column for some reason is the same as its neighbour
pflask!
<- The key in sync_with_filesystem was sometimes the same for different tools.
Fixed by including the tool name in the key.
- All rows in the filename column need to change width together, or always stay
the same size.
<- For now its always a fixed arbitrary size.
- Shouldn't reuse "." to mean not applicable. Its hard to tell which jobs are
really pending.
<- Changed to a grey block
- The scrollbar is invisible on very large pages.
<- Now its always at least one character in size.
- While the metadata is being calculated the width of the metadata column changes.
- If a file has many tools the statuses are truncated on the right. Probably
should scroll over with the cursor. Or possibly wrap the statuses onto the
next line?
<- Fixed by scrolling over. Haven't tried wrapping yet.
- The cursor can dissappear behind the scrollbar when scrolling to the right in
the summary view.
<- Off by one... Forgot the extra space added at the end of the filename.
- lib/jquery.min.js (from luakit) takes a long time to display (by pygments) and
the output is corrupted.
<- Not a bug with vigil, and I can't see the corruption.
- Files which are broken symbolic links only show errors.
<- Decided to not list broken symlinks in the summary view.
- The filenames are all white!
<- ljust on a termstr returns a str. Fixed by avoiding ljust, but should
add ljust to termstr.
- The cursor sometimes moves down one line while programs are being run.
Probably a problem in sync_with_filesystem. Happened while leaving the
cursor on pyc files.
- Its not working in the linux console. The terminal codes are producing
garbled output there. Its probably the 24 bit color terminal codes.
<- It wasn't about colour.
<- The encoding was latin, it should have been utf-8.
- Jobs are still being started during shutdown
- Sometimes jobs hang. Started after the switch to multiprocessing with pausing.
Accidently pausing?
<- Fixed by removing the join which can deadlock and is unnecessary.
- Sometimes quiting hangs. At least with the q key. Test ctrl-c also. Started
after the join was removed when running jobs.
<- Fixed by making the multiprocess Process a deemon thread. But is the reset
method really terminating the process?
- Switching status style doesn't work sometimes after restart.
- next_test.py passes in the console but fails when piped through 'less'.
<- Fixed when changed to terminal_codes.
- Old results aren't being deleted from the cache.
- When the terminal window is resized the contents aren't immediately resized.
- The length of the grey highlight bar isn't always the full width of the
view window. Its only as long as the widest row currently showing.
- sync_with_filesystem is being run when changes occur in ".vigil" directories
in sub-projects.
- Filenames are always colored white in gnome-terminal. This used to work, and
was probablty broken when blessings was replaced.
<- False alarm. Went away!
- When sorting by ext then path, the filesystem is always changing for some
reason.
<- The filesystem change was the log changing. Moved the log elsewhere.
- When a sync_from_filesystem occurs the files are always sorted one way.
Which could be a change.
- While the watch manager is disconnected from the mainloop, events are being
queued up, and are all played out when the watch manager is reconnected.
- First character in help is green
- Switching 'watching' on, is slow. At least update the indicator text quickly.
- Something in ~/repos/pygame breaks convert_lscolor_code_to_charstyle
<- Files with ".conf" extensions were breaking convert_lscolor_code_to_charstyle
- When syncing, why does the tool name in the title of the result pane briefly
change colour?
<- Can't see this any more because you can't manually sync. Also doesn't show
when toggling the 'watching' switch.
- terminal.italic is None in the linux console. Then code_for_term fails.
- Vigil won't clear the cache if the previous vigil was still running and making
changes after the new vigil was installed.
<- Created a ".creation-time" timestamp for when the cache was created.
- Have all jobs calculated, and job runners waiting, then find new jobs with
syncing-with-fs... The job runners aren't then being started to calculate these
jobs.
- Sometimes the job runners stop and say all results are done, but I can see
outstanding pending results. A restarted vigil processes them.
- Getting screen corruption after using curses for input. The corruption occurs
when viewing particular reports. Must be in a different mode so characters
are interpreted differently.
<- This went away, for now...
- The mouse sometimes stops working. At this time you can also highlight parts
of the screen. Maybe we've left raw mode?
<- This also went away.
- Theres corruption the first time you type a key or press the mouse button.
The screen gets cleared and only the diff lines are shown. Eventually with
more diffs the whole screen is showing.
- If can't fix properly... a hack could be to programatically type a key early
on.
<- This was some problem with my use of curses. Fixed by using urwid's raw_display
instead.
- When the screen changes the cursor can be seen flying around quickly. It also
ends up on the right hand side of the screen.
<- I forgot I began relying on curses to hide the cursor. Put back the
'hidden cursor' contextmanager.
On hold, run-tool related
- Sometimes there is a blank line at the end of the result in run-tool, and
probably in table.py. Sometimes not.
- In run-tool, if the tool fails, the returncode is always 0.
- "python ./run-tool gut table.py" results in "The file must be within the
codebase.", but it is in.
Won't fix
- If the summary window is narrower then max_path_length the paths are never
visible.
<- Only a problem for very narrow windows.
- There is no color in lxterminal, only shades of grey.
<- Only testing gnome-terminal & stterm at the moment.
- Sometimes a lot (or all?) of the results are "?" even with correct statuses.
<- Happens when the cache is deleted from underneath a running vigil.

394
TODO Normal file
View file

@ -0,0 +1,394 @@
Todo
- Have at least one golden test for every tool.
- Maybe could re-use linguist's example test files for many tools.
- Seperate tool specific code from infrastructure in the tools module.
- Add command line options, e.g. -h
- Docstrings
- Boilerplate: Readme, usage, man page?, docs?, setup.py?, wheel?, __pkginfo__.py?
- Publish. pypi and github will do.
- Maybe also use cxfreeze, py2exe, pyrun or zipapp? Is zipapp the standard way?
- Use cookiecutter?
- Need to use conventional version numbers for pypi. See pep0440.
- Add ESC as an alternative to 'q' for quit. If looking at Help, ESC should just
exit the help screen.
- Add means to pause and unpause all current jobs.
- Have a sandbox for unsafe (or all) tools.
- Determine if ".py" files are python2 or python3 to run the right tool.
- Statuses' pretty names and variable names don't match.
- Report on python doctests. (also coverage of)
- Check if class Entry is really working correctly as a collections.UserList.
- Don't let the user scroll indefinately below the end of the page.
Done
- Use inotify to keep the results up to date.
- Show a table of statuses.
- Let a cursor move over the table.
- Have a result pane, which always shows the result corresponding the
current position in the table.
- Use color in the status.
- Use syntax highlighting when showing source code.
- Don't show raw binary. Prefer at least a hex dump.
- When the program is first run, calculate all the results in the background.
Have the statuses and results update while the you can still navigate around
the table.
- If a traceback occurs when running a tool, catch it and show the traceback as
the tool's result.
- syntax highlight the traceback
- Add color to files based on LS_COLORS.
- Have a key for toggling the split screen between vertical and horizontal.
- Results need to be classified into at least success or failure.
- Have a tool summarizing the file metadata, including checksums.
- If a python script has a shebang line rely on that when running it.
Let it fail if the script isn't executable.
- Change to python3.
+ python2 is still working
+ dropped python2 for now
- Add a command to just run a tool on a file.
- Write some tests for run-tool.
- Use memoization.
- Make the input filename, tool, and contents (or hash of contents) the key.
- Therefore different versions of the same file can be in the cache at
the same time, and switching between the versions requires no recalculation.
+ Undone: only one version is stored
- Have a toggleable activity log.
- Add a job queue.
D Run the jobs closest to the cursor first.
D Make the maximum number of running jobs be the number of processes.
S Run all jobs at a lower priority then the viewer.
- Make sure unittests start running with very little latency, and as quickly
as possible.
- Let tools be prioritised. <- The result with focus is always run first.
- Maybe should run an extra concurrent job if other jobs are already running?
- Don't show the whole path. Show one relative to the base directory.
- Use tools' paths relative to the codebase root, so that the codebase root
can be moved without invalidating the cache.
- Colourise the tool names.
- If the tool is used on one type of file, use the same color.
- If the tool is associated with many file types, show the colour of the file
currently being worked on.
- If the tool is generic use a particular color. White bold.
- Be able to focus on a list of files within the codebase.
+ Undone: shows whole directory
- Have a help screen
- Add header lines to some panes in the interface.
- A line above the result pane containing the path, tool, and status.
- A line above the activity log. Containing just "Activity log".
- Stop the log from getting focus.
- When a file is changed and all the results are recalculated the tool currently
in focus should have priority.
- Highlight the row the cursor is in so it's easier to find.
- Rename Columns widget to Row and Rows widget to Column.
- Optionally hide scrollbars when full size.
- Can the interface latency be improved? Atm its slugish when jobs are running.
Could some (or all) jobs be momentarily paused when a key is pressed to free
a cpu? Does changing their priority help?
- Use WidgetPlaceHolder in result widgets so that they can be changed in place,
and so that the check in run_tool-after_run is not needed.
- When running tools on a file in a subdirectory cd to the subdirectory first.
<- Should avoid this.
- Have means to jump to the next failure.
- Watch for new or deleted files
- If dump_pickle_safe fails then delete the tmp file.
- Use multiprocessing module, or concurrent.futures, or asyncio?
+ Used asyncio and concurrent.futures. Is faster.
+ Used multiprocessing
- Store all the statuses together in one pickle. It should never be too big.
This could greatly speedup startup. Would probably help to still have each
status stored redundantly with its result.
+ Didn't store statuses redundantly. Each result was stored in its own gzipped
pickle.
- When looking for the next issue wrap around and find ones above when there
are none below.
- Use the entire stat of a file to decide if the file has changed.
- Existing status indicators don't change when viewed in other terminal types.
They should all change together depending on the terminal.
+ You can toggle between two status styles.
- Add a legend to explain the meaning of the different colored
status indicators.
- Be able to move to the next issue of the tool currently selected.
- Take the part of blessings being used.
- Cache os.get_terminal_size if necessary.
<- Didn't cache, seems to be fast enough.
- Color the dirname of paths with the directory color.
- Rename to vigil
<- Also created a symlink vigil.py for vigil_test.py to import.
- Let the entries be sorted by directory then type, and type then directory.
- Change sort order to show all files in a directory before any of the files of the
subdirectories. Is currently interleaved.
- Be able to control whether the filesystem is being watched.
- Add a status bar showing state of switches, and maybe combine with a progress
bar. e.g.
h:help q:quit w:Watching p:Working s:sync n:next r:rotate
l:log -:statuses d,c,j,k:navigate
- Add color to the help text
- Name it 'vigil' ?
- Have a progress bar for the work queue.
- Have a progress bar showing the ratio of calculated to uncalculated results.
<- Did this type of progress bar
- Pare down fill3 and rename it
<- Didn't rename it yet
- Scroll speed is arbitrary and small. Scroll jumps should be the length of
the page or half the length of the page.
<- Is half the length of the page
- Force recalculation of vigil's whole cache when vigil.py changes.
- For now assume ".py" extension means python3.
- So only use python3 tools
- common.py is not needed anymore without run-tool, merge its contents back
into vigil and termstr.
- Add navigation keys (d,c,j,k) to help screen.
- Removed FIX code involving arbitrary numbers.
- Get python3-coverage tool working.
- Make sure you can always see the cursor with simple statuses.
- Make the indent of filenames the same for all files in the same directory.
- Give the help page scroll bars.
+ Also made portals, and views scrollable.
- Make arrow keys work for navigation.
- Make 'page up', 'page down', 'home' and 'end' work for navigation.
- Let the mouse select statuses.
A-syntax, B-tests, C-auto docs, D-lint, E-coverage, F-profile, G-tidy, H-import deps
A B C D E F G H
python s s s s s s s s
perl s s s
html s - - - s
css -
c s s
c++ s
php s l l
java .class s
java .java s
javascript l
c#
objective-c
ruby
lua
julia
legend: s = started, l = looked, - = not applicable
Ideas
- See gucharmap ✔ ✘ ● ◯ ▐▌ 🀆  ◆ ■  □ ▒
- Some tool ideas:
- code tests
- coverage
D linting: pylint, pyflakes, pychecker, pycharm?
- python profile: cprofile, line-profiler?, yappi,
pyinstrument, /usr/lib/python3.4/trace.py, cprofilev
- Also maybe trace alloc?
- And my tools: gut, spanish translations
- git diff, annotate
- C++ tools: CppCat?, PVS-Studio, Cppcheck, Visual Studio
- Let a test function define a file type. Not just the filename extension.
Use the 'file' tool's type.
(see hgviewlib)
- Let run-tool optionally use the cache.
- Have run-tool also optionally show the status of the result.
- Have run-tool optionally show colour.
- chdir shouldn't be used with multi-threading. openat and fstatat can help.
- Show all binary files with a binary viewer tool? Like a hex editor?
- Use jp2a which turns jpegs into ascii
- Ignore other input while help screen is showing.
- See http://cynic.cc/blog//posts/2015-05-16_coursera-dl_activities/
for current practices of installation and testing.
- See https://www.atlassian.com/git/tutorials/comparing-workflows git tutorial.
- Other languages: lisp, bash shell, sql, lua, haskell, cobol, dart, julia, go, rust, D
- Other file types: core dumps, subtitles, pictures, elf binaries, shared libraries,
library archives, metadata for databases, metadata for audio/video,
contents of compressed archives, pdf2text, doc2text, html2text, ebook2text,
csv file?
- Check these tools: astyle, indent, uncrustify, xmlindent, csstidy, flake8,
frosted, pep257, pyroma, dodgy, jedi, pep8-naming, graphite, propector, mypy, vmprof
- eslint for javascript?
- epydoc for python
- readelf
- pinfer from mypy
- for c, c++: libasan, liblsan, libtsan, libubsan, coverty?
- for ruby: flog
- for po, pot files: dennis
- for go: "go report card"
- markdown -> text
- for bash or shell scripts: ShellCheck
- ffprobe for detecting media types
- Can these job queue systems help? celery, gearman, joblib, pathos,
parallel python, dask, spark
- Need rpc? pyro4, protocol buffers, thrift
- Maybe use psutil python module instead of subprocess
- Make graphical output possible
- Use an existing web browser.
- Use gnome with an embedded browser.
- The left hand panel could be a vte widget, instead of a gnome table.
- Serve as a web app.
- Find image-to-ascii to make a best effort with images.
- Install tools and their dependencies, on demand, in the background.
- Ignore emacs backup files? i.e. files ending in ~
- Only import tools if needed? And only apt-get install deps if necessary?
- When a status changes from a fail to success show a smiley (or vice versa) in
the activity log message.
- Optionally make a sound based on the success or failure of a run.
- Only make a success or fail sound when the status changes
(instead of every time)
- Let the focus move diagonally as well.
- Have an option to turn off all automatic work.
- Have a command so a tool can be re-run at any time.
i.e. for when not trusting the cache
- Somehow make python test modules another type of file, so they are grouped
together, and seperately from the other python files. (A subtype?)
- Internationalization
- Let results depend on other results? Could depend on .pyc for example?
- Make the tools configurable.
- Make editing found problems easy:
i.e. Editing at spots found by code checkers or linters.
- Integrate editor?
- Open file in editor at the spot?
- Use linguist as a generic tool.
- POLA, only put in the sandbox what the tool needs. e.g. only the file.
Make a chroot with one file?
- Be able to add new tools within the interface, and have a
github 'send pull request' button, to suggest the new tool.
- Store extra metadata about results: e.g. tool version, start time, finish time,
CPU used (sys, user).
- If not installing all dependencies at once, install them grouped by file type, e.g.
programming language. Also in that case its probably best to have the tool
functions in a package with the tools divided into one module per file type.
- Have a Cache widget with an update_appearance_min method
- termstr should fail when called with methods str has that it doesn't
- Pause jobs during screen updates?
- Don't pause jobs during all screen updates. Only pause during screen updates
that came from key presses.
- Colourise all directory listings. e.g. file listings of archives
- Kill all subprocesses if the process is interrupted.
- Make it possible for a git user to switch branches without needing to
recalculate all the results. i.e. cache old results
- Use biglist to store the summary table to speed up start & quit times for
very big projects.
- Have a progress bar when loading all results. Should only be seen on results
large enough to require more time to load.
- Make a terminal widget, so that editors can be embedded.
- looked at:
- libvterm
- gate one's terminal.py <- mixed with html
- pyte <- problems with color
- stterm's st.c <- mixed with X
- urwid's terminal widget
- Make sure theres no problems if its run twice concurrently on the same
directory.
- Make a read-only virtual fuse filesytem containing the results.
- Have a way to show a diff between two chosen results.
- Make a code widget that lets you change the color theme.
- Maybe use importlib.util.LazyLoader for a faster startup
- Have an estimated time till completion with the progress bar.
- Try to drop the dependency on pyinotify.
- Try to make the saving of the result to disk occur in the multiprocessing
process.
- Use appearance_interval on Text widgets to speed up display of large files,
by only loading the necessary part for the interval.
- Somehow process a whole directory of projects seperately.
- Have a headless option so you can script calculating all the results for
many projects.
- python-guacamole deals with 24bit color & conversions
- Let the status bar keys summary be used as a list of buttons on tablets.
- Try doing autosave again.
- It musn't save if there are no changes.
- Use Gnuroot (debian) for Android. See: ("http://www.techrepublic.com/article"
"/use-gnuroot-to-install-a-gnulinux-distribution-on-your-android-device/")
- Let the mouse move the scrollbars?
- Let the mouse click on the actions in the status bar?
- Let the mouse click the filenames. Try to show a result from the same type
of tool.
Shelved
- Have a way to concatenate tool's results together
- Could be used to join all the metadata of a file together
<- I just made a specific metadata tool that shows all the info.
- Is colorlog helpful?
<- Made a colorful log before I found out.
- Possibly show the size of the work queue.
<- No obvious place to put it. The user has a sense of outstanding work
from the percentage of unknown statuses(?) visible.
<- This could be a progress bar.
- Optionally let the focus wrap around when moving off the table.
<- I don't know how to make urwid allow that, or to force it.
- Also have h,j,k,l as arrow keys.
<- 'l' is for toggling the log. Normal arrow keys should be enough.
- See all the LS_COLORS even if you don't have them in your environment.
<- Its better for the colors to be consistent for people, than to possibly
have more.
<- Should mention LS_COLORS for people.
- Include directories in the file listing? Directories are files too.
- Don't waste space between the status pane and the result pane. Have a minimum
width of the result pane, but let it grow wider if the the status pane is
narrow.
<- Was complicated by the addition on the activity log, which needs more
width.
- Use libmagic directly from python instead of 'file'
<- Easier and simpler to stay with file
- Have optional summary view of subprocesses. (Or jobs) - OS does it well enough
- Undo hack of urwid.display_common.
- The hack worked around a possible bug which limited colors to 88 instead of
256. See display_common.py:824
- Retain a fs cache for a fast startup. Use file timestamps.
D Add autosave
- Ignore out of date results on startup.
- Schedule jobs for new or changed files.
- Have a seperate cache for each file's results. Only save the cache once
when all the results have been calculated. Only load the cache once when
the file changes.
<- Something similar was done instead: A gzipped file for every result, and
one pickled file of all the results' statuses together, including some
application state.
- Use sqlite for the cache. Use a seperate sqlite db for each file?
- Use a key-value store for the cache. Use a seperate store for each file?
- Let columns be sorted.
- For the filename have at least: sort by file type then directory and
sort by directory then type.
<- Sorting columns doesn't really make sense since a column can contain
results from different tools. Instead let all rows be sorted in different
ways.
- Make sure doctests work
<- huh, where were the doctests?
- Have a 'raw' tool that shows the contents of the file but doesn't store it
in the cache?
- python help summary
- Have run_tool_cached cache exceptions too. And raise them the next time.
<- run_tool_cached isn't being used now
- Watch for a change to tools.py and reload and recalculate.
- Run untrusted tools in a sandbox: Docker? aufs? SECOMP? pflask? lxc? AppArmor?
SELinux? systemd-nspawn? chroot? sandstorm? firejail? lxd?
- Be able to choose the sandbox, or none.
<- At the moment all tools are in ubuntu so are trusted
<- But some tools like unittest run scripts. That needs sandboxing.
- Seperate tools into fast and slow. Prioritise all fast ones before slow ones.
- Maybe proceed in rounds based on distance. So close distance fast, close
distance slow, medium distance fast, medium distance slow, further distance
fast, further distance slow.
- Have one class or function for the storage layer, with switchable
implementation.
- Have a reference implementation with the simplest implementation,
and performance unimportant.
- Have a good implementation.
- Have tests which are run on both.
- Also make the visibility of the result and summary panes toggleable.
- When running tools minimize side effects. e.g. use "python -B" to supress
generation of .pyc files.
- Have a cache of the appearance of the entire screen as you move around.
Only works if the log isn't showing, and the results are all calculated.
- Allow negative position coordinates in Portal.
<- I don't need it
- Use multiple cores to update the screen faster. Currently one is used.
<- Don't have multiple cores to test this
- Show help info about the current tool in the help page.
- Show the code of the current tool in the help page.
<- Any info about the tool won't be on the help page.
- Ensure the model and view is seperated.
- And make another view use the same model.
<- Not seperated. Well it is seperated in the sense that the view code only exists
in 'appearance' methods, otherwise think of everything as models.
This worked great.
- Try to minimize the width of the summary pane to avoid wasting space.
The maximum width would still be half the screen.
<- Unfortunately that would make the width of the log window too small. Maybe its
still worth it though.

470
fill3.py Normal file
View file

@ -0,0 +1,470 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import collections
import itertools
import os
import terminal
import termstr
def appearance_is_valid(appearance):
"""An appearance is a list of strings of equal length.
An empty list is valid. Empty strings are not allowed."""
return (all(isinstance(line, (str, termstr.TermStr)) and len(line) > 0
for line in appearance) and
len(set(len(line) for line in appearance)) < 2)
def appearance_resize(appearance, dimensions, pad_char=" "):
width, height = dimensions
result = [line[:width].ljust(width, pad_char)
for line in appearance[:height]]
if len(result) < height:
result.extend([pad_char * width] * (height - len(result)))
return result
def appearance_dimensions(appearance):
try:
return len(appearance[0]), len(appearance)
except IndexError:
return 0, 0
def join(seperator, parts):
"""Returns a string if all the parts and the seperator are plain strings.
In other words it returns a TermStr if anything is a TermStr."""
if parts == []:
return ""
try:
return seperator.join(parts)
except TypeError:
return termstr.TermStr(seperator).join(parts)
def join_horizontal(appearances):
heights = set(len(appearance) for appearance in appearances)
assert len(heights) == 1, heights
return [join("", parts) for parts in zip(*appearances)]
def even_widths(column_widgets, width):
column_count = len(column_widgets)
widths = []
for index, column_widget in enumerate(column_widgets):
start_pos = int(round(float(width) / column_count * index))
end_pos = int(round(float(width) / column_count * (index+1)))
widths.append(end_pos - start_pos)
return widths
class Row(collections.UserList):
def __init__(self, widgets, widths_func=even_widths):
collections.UserList.__init__(self, widgets)
self.widgets = self.data
self.widths_func = widths_func
def appearance(self, dimensions):
width, height = dimensions
widths = self.widths_func(self.widgets, width)
assert sum(widths) == width, (sum(widths), width)
return join_horizontal([column_widget.appearance((item_width, height))
for column_widget, item_width
in zip(self.widgets, widths)])
def appearance_min(self):
appearances = [column_widget.appearance_min()
for column_widget in self.widgets]
dimensions = [appearance_dimensions(appearance)
for appearance in appearances]
max_height = max(height for width, height in dimensions)
return join_horizontal([
appearance_resize(appearance, (width, max_height))
for appearance, (width, height) in zip(appearances, dimensions)])
def even_partition(row_widgets, height):
row_count = len(row_widgets)
heights = []
for index, row_widget in enumerate(row_widgets):
start_pos = int(round(float(height) / row_count * index))
end_pos = int(round(float(height) / row_count * (index+1)))
heights.append(end_pos - start_pos)
return heights
def join_vertical(appearances):
result = []
for appearance in appearances:
result.extend(appearance)
return result
class Column(collections.UserList):
def __init__(self, widgets, partition_func=even_partition,
background_char=" "):
collections.UserList.__init__(self, widgets)
self.widgets = self.data
self.partition_func = partition_func
self.background_char = background_char
def appearance(self, dimensions):
width, height = dimensions
if len(self.widgets) == 0: # FIX: Really allow zero widgets?
return [self.background_char * width] * height
heights = self.partition_func(self.widgets, height)
assert sum(heights) == height, (sum(heights), height)
return join_vertical([row_widget.appearance((width, item_height))
for row_widget, item_height
in zip(self.widgets, heights)])
def _appearance_list(self, widgets):
if widgets == []:
return []
appearances = [row_widget.appearance_min() for row_widget in widgets]
dimensions = [appearance_dimensions(appearance)
for appearance in appearances]
max_width = max(width for width, height in dimensions)
padded_appearances = [
appearance_resize(appearance, (max_width, height))
for appearance, (width, height) in zip(appearances, dimensions)]
result = []
for appearance in padded_appearances:
result.extend(appearance)
return result
def appearance_interval(self, interval):
start_y, end_y = interval
return self._appearance_list(self.widgets[start_y:end_y])
def appearance_min(self):
return self._appearance_list(self.widgets)
class Filler:
def __init__(self, widget):
self.widget = widget
def appearance(self, dimensions):
return appearance_resize(self.widget.appearance_min(), dimensions)
class ScrollBar:
_GREY_BACKGROUND_STYLE = termstr.CharStyle(bg_color=termstr.Color.grey_100)
_GREY_BLOCK = termstr.TermStr(" ", _GREY_BACKGROUND_STYLE)
def __init__(self, is_horizontal, interval=(0, 0), bar_char=_GREY_BLOCK,
background_char=" "):
self._is_horizontal = is_horizontal
self.interval = interval
self.bar_char = bar_char
self.background_char = background_char
def appearance(self, dimensions):
width, height = dimensions
assert width == 1 or height == 1, (width, height)
length = width if self._is_horizontal else height
assert all(0 <= fraction <= 1 for fraction in self.interval), \
self.interval
start_index, end_index = [int(fraction * length)
for fraction in self.interval]
if start_index == end_index and end_index < length:
end_index += 1
bar = (self.background_char * start_index +
self.bar_char * (end_index - start_index) +
self.background_char * (length - end_index))
return [bar] if self._is_horizontal else [char for char in bar]
class Portal:
def __init__(self, widget, position=(0, 0), background_char=" "):
self.widget = widget
self.position = position
self.background_char = background_char
self.last_dimensions = 0, 0
def _scroll_half_pages(self, dx, dy):
x, y = self.position
width, height = self.last_dimensions
self.position = (max(x + dx * (width // 2), 0),
max(y + dy * (height // 2), 0))
def scroll_up(self):
self._scroll_half_pages(0, -1)
def scroll_down(self):
self._scroll_half_pages(0, 1)
def scroll_left(self):
self._scroll_half_pages(-1, 0)
def scroll_right(self):
self._scroll_half_pages(1, 0)
def appearance(self, dimensions):
width, height = dimensions
x, y = self.position
try:
appearance = self.widget.appearance_interval((y, y+height))
except AttributeError:
appearance = self.widget.appearance_min()[y:y+height]
self.last_dimensions = dimensions
return appearance_resize([row[x:x+width] for row in appearance],
dimensions, self.background_char)
class View:
def __init__(self, portal, horizontal_scrollbar, vertical_scrollbar,
hide_scrollbars=True):
self.portal = portal
self.horizontal_scrollbar = horizontal_scrollbar
self.vertical_scrollbar = vertical_scrollbar
self.hide_scrollbars = hide_scrollbars
@classmethod
def from_widget(cls, widget):
return cls(Portal(widget), ScrollBar(is_horizontal=True),
ScrollBar(is_horizontal=False))
@property
def position(self):
return self.portal.position
@position.setter
def position(self, position):
self.portal.position = position
@property
def widget(self):
return self.portal.widget
@widget.setter
def widget(self, widget):
self.portal.widget = widget
def appearance(self, dimensions):
width, height = dimensions
try:
full_width, full_height = (self.portal.widget.
appearance_dimensions())
except AttributeError:
full_appearance = self.portal.widget.appearance_min()
full_width, full_height = appearance_dimensions(full_appearance)
if full_width == 0 or full_height == 0:
return self.portal.appearance(dimensions)
x, y = self.portal.position
hide_scrollbar_vertical = (self.hide_scrollbars and
full_height <= height and y == 0)
hide_scrollbar_horizontal = (self.hide_scrollbars and
full_width <= width and x == 0)
if not hide_scrollbar_horizontal:
full_width = max(full_width, x + width)
self.horizontal_scrollbar.interval = (x / full_width,
(x + width) / full_width)
height -= 1
if not hide_scrollbar_vertical:
full_height = max(full_height, y + height)
self.vertical_scrollbar.interval = (y / full_height,
(y + height) / full_height)
width -= 1
portal_appearance = self.portal.appearance((width, height))
if hide_scrollbar_vertical:
result = portal_appearance
else:
scrollbar_v_appearance = self.vertical_scrollbar.appearance(
(1, height))
result = join_horizontal([portal_appearance,
scrollbar_v_appearance])
if not hide_scrollbar_horizontal:
scrollbar_h_appearance = self.horizontal_scrollbar.appearance(
(width, 1))
result.append(scrollbar_h_appearance[0] +
("" if hide_scrollbar_vertical else " "))
return result
class Text:
def __init__(self, text, pad_char=" "):
lines = text.splitlines()
if len(lines) == 0:
self.text = []
elif len(lines) == 1:
self.text = [text]
else:
max_width = max(len(line) for line in lines)
height = len(lines)
self.text = appearance_resize(lines, (max_width, height), pad_char)
def appearance_min(self):
return self.text
def appearance(self, dimensions):
return appearance_resize(self.appearance_min(), dimensions)
class Table:
def __init__(self, table, pad_char=" "):
self._widgets = table
self._pad_char = pad_char
def appearance_min(self):
if self._widgets == []:
return []
appearances = [[cell.appearance_min() for cell in row]
for row in self._widgets]
row_heights = [0] * len(self._widgets)
column_widths = [0] * len(self._widgets[0])
for y, row in enumerate(appearances):
for x, appearance in enumerate(row):
width, height = appearance_dimensions(appearance)
row_heights[y] = max(row_heights[y], height)
column_widths[x] = max(column_widths[x], width)
return join_vertical([join_horizontal(
[appearance_resize(appearance, (column_widths[x], row_heights[y]),
pad_char=self._pad_char)
for x, appearance in enumerate(row)])
for y, row in enumerate(appearances)])
def parse_rgb(hex_rgb):
if hex_rgb.startswith("#"):
hex_rgb = hex_rgb[1:]
return tuple(eval("0x"+hex_rgb[index:index+2]) for index in [0, 2, 4])
def char_style_for_token_type(token_type, pygment_style):
token_style = pygment_style.style_for_token(token_type)
fg_color = (None if token_style["color"] is None
else parse_rgb(token_style["color"]))
bg_color = (None if token_style["bgcolor"] is None
else parse_rgb(token_style["bgcolor"]))
return termstr.CharStyle(fg_color, bg_color, token_style["bold"],
token_style["italic"], token_style["underline"])
def pygments_to_termstr(tokens, pygment_style):
return termstr.TermStr("").join(
termstr.TermStr(text, char_style_for_token_type(
token_type, pygment_style))
for token_type, text in tokens)
class Code:
def __init__(self, tokens, style):
code = pygments_to_termstr(tokens, style).split("\n")
max_width = max(len(line) for line in code)
height = len(code)
# bg_color = parse_rgb(style.background_color)
# bg_style = termstr.CharStyle(1, bg_color)
# pad_char = termstr.TermStr(" ", bg_style)
pad_char = " "
self.code = appearance_resize(code, (max_width, height), pad_char)
def appearance_min(self):
return self.code
def appearance(self, dimensions):
return appearance_resize(self.appearance_min(), dimensions)
class Border:
THIN = ["", "", "", "", "", "", "", ""]
THICK = ["", "", "", "", "", "", "", ""]
ROUNDED = ["", "", "", "", "", "", "", ""]
DOUBLE = ["", "", "", "", "", "", "", ""]
HEAVY_INNER = ["", "", "", "", "", "", "", ""]
HEAVY_OUTER = ["", "", "", "", "", "", "", ""]
INNER = ["", "", "", "", " ", " ", " ", " "]
def __init__(self, widget, title=None, characters=THIN):
self.widget = widget
self.title = title
(self.top, self.bottom, self.left, self.right, self.top_left,
self.bottom_left, self.bottom_right, self.top_right) = characters
def _add_border(self, body_content):
content_width, content_height = appearance_dimensions(body_content)
if self.title is None:
title_bar = self.top * content_width
else:
padded_title = (" " + self.title + " ")[:content_width]
title_bar = padded_title.center(content_width, self.top)
result = [self.top_left + title_bar + self.top_right]
result.extend(self.left + line + self.right for line in body_content)
result.append(self.bottom_left + self.bottom * content_width +
self.bottom_right)
return result
def appearance_min(self):
return self._add_border(self.widget.appearance_min())
def appearance(self, dimensions):
width, height = dimensions
return self._add_border(self.widget.appearance((width-2, height-2)))
class Placeholder:
def __init__(self, widget=None):
self.widget = widget
def appearance_min(self):
return self.widget.appearance_min()
def appearance(self, dimensions):
return self.widget.appearance(dimensions)
class Style:
def __init__(self, widget, style_transform_func):
self.widget = widget
self.style_transform_func = style_transform_func
def _transform_appearance(self, appearance):
return [termstr.TermStr(line).transform_style(
self.style_transform_func) for line in appearance]
def appearance_min(self):
return self._transform_appearance(self.widget.appearance_min())
def appearance(self, dimensions):
return self._transform_appearance(self.widget.appearance(dimensions))
def draw_screen(widget):
appearance = widget.appearance(os.get_terminal_size())
print(terminal.move(0, 0), *appearance, sep="", end="", flush=True)
_last_appearance = []
def patch_screen(widget):
global _last_appearance
appearance = widget.appearance(os.get_terminal_size())
zip_func = (itertools.zip_longest
if len(appearance) > len(_last_appearance) else zip)
changed_lines = (str(terminal.move(0, row_index)) + line
for row_index, (line, old_line)
in enumerate(zip_func(appearance, _last_appearance))
if line != old_line)
print(*changed_lines, sep="", end="", flush=True)
_last_appearance = appearance

133
fill3_test.py Executable file
View file

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import unittest
import fill3 as fill
# import pygments.lexers
# import pygments.styles.default
# import pygments.styles.emacs
class WidgetTests(unittest.TestCase):
TEXT_A = fill.Text("A")
TEXT_B = fill.Text("B")
def assert_string(self, appearance, expected_string):
self.assertEqual(str(fill.join("\n", appearance)), expected_string)
def test_rows_widget(self):
rows = fill.Row([self.TEXT_A, self.TEXT_B])
self.assert_string(rows.appearance_min(), "AB")
rows = fill.Row([fill.Filler(self.TEXT_A),
fill.Filler(self.TEXT_B)])
self.assert_string(rows.appearance((4, 1)), "A B ")
def test_columns_widget(self):
columns = fill.Column([self.TEXT_A, self.TEXT_B])
self.assert_string(columns.appearance_min(), "A\n"
"B")
def test_text_widget(self):
self.assert_string(self.TEXT_A.appearance_min(), "A")
text = "foo\nbar"
self.assert_string(fill.Text(text).appearance_min(), "foo\n"
"bar")
def test_portal_widget(self):
row = fill.Row([fill.Text("foo"), fill.Text("bar")])
portal = fill.Portal(row, (1, 0))
self.assert_string(portal.appearance((5, 1)), "oobar")
portal.position = (0, 10)
self.assert_string(portal.appearance((1, 1)), " ")
def test_border_widget(self):
contents = fill.Filler(self.TEXT_A)
self.assert_string(fill.Border(contents).appearance((3, 3)), "┌─┐\n"
"│A│\n"
"└─┘")
for empty_contents in [fill.Filler(fill.Text("")), fill.Column([])]:
self.assert_string(fill.Border(empty_contents).appearance((2, 2)),
"┌┐\n"
"└┘")
self.assert_string(fill.Border(fill.Column([])).appearance_min(),
"┌┐\n"
"└┘")
self.assert_string(fill.Border(empty_contents).appearance((3, 3)),
"┌─┐\n"
"│ │\n"
"└─┘")
text = fill.Text("abcdef")
self.assert_string(fill.Border(text, title="AB").appearance((8, 3)),
"┌─ AB ─┐\n"
"│abcdef│\n"
"└──────┘")
# def test_pygments_widget(self):
# text = "print('hello world')"
# tokens = pygments.lex(text, pygments.lexers.PythonLexer())
# code = fill.Code(tokens, pygments.styles.default.DefaultStyle)
# self.assert_string(code.appearance_min(),
# "\x1b[1m\x1b[38;2;0;128;0m\x1b[48;2;248;248;248m"
# "print\x1b(B\x1b[m\x1b[315m\x1b[48;2;248;248;248m"
# "(\x1b(B\x1b[m\x1b[38;2;186;33;33m\x1b[48;2;248;248;248m"
# "'hello world'\x1b(B\x1b[m\x1b[315m\x1b[48;2;248;248;248m"
# ")\x1b(B\x1b[m\x1b[315m\x1b[40m\n\x1b(B\x1b[m\x1b[31m"
# "\x1b[48;2;248;248;248m \x1b[0m")
def test_placeholder_widget(self):
placeholder = fill.Placeholder(self.TEXT_A)
self.assert_string(placeholder.appearance_min(), "A")
placeholder.widget = self.TEXT_B
self.assert_string(placeholder.appearance_min(), "B")
def test_scroll_bar(self):
scroll_bar = fill.ScrollBar(is_horizontal=True, bar_char="#")
self.assertEqual(scroll_bar.interval, (0, 0))
self.assert_string(scroll_bar.appearance((1, 1)), "#")
scroll_bar.interval = (0, 0.5)
self.assert_string(scroll_bar.appearance((2, 1)), "# ")
scroll_bar.interval = (0, 0.1)
self.assert_string(scroll_bar.appearance((2, 1)), "# ")
scroll_bar.interval = (0.25, 0.75)
self.assert_string(scroll_bar.appearance((4, 1)), " ## ")
scroll_bar = fill.ScrollBar(is_horizontal=False, bar_char="#")
self.assertEqual(scroll_bar.interval, (0, 0))
self.assert_string(scroll_bar.appearance((1, 1)), "#")
scroll_bar.interval = (0, 0.5)
self.assert_string(scroll_bar.appearance((1, 2)), "#\n"
" ")
scroll_bar.interval = (0, 0.1)
self.assert_string(scroll_bar.appearance((1, 2)), "#\n"
" ")
scroll_bar.interval = (0.25, 0.75)
self.assert_string(scroll_bar.appearance((1, 4)), " \n"
"#\n"
"#\n"
" ")
def test_table_widget(self):
table = fill.Table([])
self.assert_string(table.appearance_min(), "")
table = fill.Table([[self.TEXT_A]])
self.assert_string(table.appearance_min(), "A")
table = fill.Table([[self.TEXT_A, self.TEXT_B]])
self.assert_string(table.appearance_min(), "AB")
table = fill.Table([[self.TEXT_A, self.TEXT_B],
[self.TEXT_B, self.TEXT_A]])
self.assert_string(table.appearance_min(), "AB\n"
"BA")
label_foo = fill.Text("FOO")
table = fill.Table([[label_foo, self.TEXT_B],
[self.TEXT_B, self.TEXT_A]])
self.assert_string(table.appearance_min(), "FOOB\n"
"B A")
if __name__ == "__main__":
unittest.main()

40
golden-files/help Normal file
View file

@ -0,0 +1,40 @@
┌──────────────── Help ────────────────┐
│Produces a set of reports for every f │
│The reports are produced by many exis │
 │
│The state of each report is also summ │
│The possible states are listed below. │
 │
│A report is viewed by selecting its s │
 │
│Reports are recalculated whenever fil │
│are kept up to date. (optional)  │
 │
│The reports are cached in a directory │
│directory.  │
 │
│Usage: vigil <root_path>  │
 │
│e.g. # vigil my_project  │
 │
│Keys:  │
h - Show the help screen. (toggle)  │
d, c, j, k - Move the cursor up, do │
D, C, J, K - Scroll the result pane │
t - Turn the result pane to portrai │
l - Show the activity log. (toggle) │
n - Move to the next issue.  │
N - Move to the next issue of the c │
o - Order files by type, or by dire │
w - Watch the filesystem for change │
s - Change the appearance of result │
q - Quit.  │
 │
│Statuses:  │
│  Normal  │
  No problems  │
  Problems  │
  Not applicable │
  Running │
  │
└──────────────────────────────────────┘

20
golden-files/initial Normal file
View file

@ -0,0 +1,20 @@
(B┌──── Summary ─────┐┌ foo.py ─── (Bmetada(B┐
(B. (B. . . . .(B││? │
│ ││ │
│ ││ │
│ ││ │
│ ││ │
│ ││ │
│ ││ │
(B (B ││ │
└──────────────────┘│ │
┌── Activity log ──┐│ │
│ ││ │
│ ││ │
│ ││ │
│ ││ │
│ ││ │
│ ││ │
│ ││ │
│ ││ │
└──────────────────┘└──────────────────┘

View file

@ -0,0 +1,20 @@
11:11:11(B foo
11:11:11(B bar


0
golden-files/log-initial Normal file
View file

View file

@ -0,0 +1 @@
11:11:11 foo

View file

@ -0,0 +1,2 @@
11:11:11 foo
11:11:11 bar

View file

View file

@ -0,0 +1 @@
(Bfoo

View file

@ -0,0 +1 @@
(Bfoo(Bbar

View file

@ -0,0 +1,22 @@
left-right:
(B┌───────────── Summa┌──────────────────┐
(B. (B. . . . . (B│ │
└───────────────────│ │
┌┐ │ │
└┘ │ │
│ │
│ │
│ │
│ │
└──────────────────┘
top-bottom:
(B┌───────────── Summary ─────────────┐┌┐
(B. (B. . . . . . . . . . . . . (B│└┘
└───────────────────────────────────┘
┌──┐
│ │
└──┘


70
golden.py Normal file
View file

@ -0,0 +1,70 @@
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import optparse
import os.path
import shutil
import subprocess
import sys
import tempfile
import unittest
def _accept_actual(failed):
for actual_str, golden_path in failed:
with open(golden_path, "w") as golden_file:
golden_file.write(actual_str)
print("Changed golden file: %s" % golden_path)
def _run_meld_gui(failed):
temp_dir = tempfile.mkdtemp()
try:
golden_dir = os.path.join(temp_dir, "golden")
actual_dir = os.path.join(temp_dir, "actual")
os.mkdir(golden_dir)
os.mkdir(actual_dir)
for actual_str, golden_file in failed:
name = os.path.basename(golden_file)
actual_path = os.path.join(actual_dir, name)
with open(actual_path, "w") as actual:
actual.write(actual_str)
os.symlink(os.path.abspath(golden_file),
os.path.join(golden_dir, name))
subprocess.call(["meld", actual_dir, golden_dir])
finally:
shutil.rmtree(temp_dir)
_FAILED = set()
def assertGolden(actual, golden_path):
with open(golden_path, "r") as golden_file:
expected = golden_file.read()
if actual != expected:
_FAILED.add((actual, golden_path))
raise unittest.TestCase.failureException(
'Output does not match golden file: %r\nUse "--diff" or'
' "--accept" to update the golden file.' % golden_path)
def main():
parser = optparse.OptionParser()
parser.add_option("-a", "--accept", action="store_true",
dest="should_accept_actual")
parser.add_option("-d", "--diff", action="store_true", dest="should_diff")
options, args = parser.parse_args()
# unitest.main doesn't expect these arguments, so remove them.
for argument in ["-a", "--accept", "-d", "--diff"]:
if argument in sys.argv:
sys.argv.remove(argument)
try:
unittest.main()
finally:
if len(_FAILED) > 0:
if options.should_accept_actual:
_accept_actual(_FAILED)
if options.should_diff:
_run_meld_gui(_FAILED)

130
lscolors.py Normal file
View file

@ -0,0 +1,130 @@
# Copyright (C) 2011, 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import os
import os.path
import stat
import syslog
FILE_KEY = "fi"
DIRECTORY_KEY = "di"
OTHER_WRITABLE_KEY = "ow"
EXECUTABLE_KEY = "ex"
SETUID_KEY = "su"
SETGUID_KEY = "sg"
SYMLINK_KEY = "ln"
ORPHAN_KEY = "or"
PIPE_KEY = "pi"
CHARACTER_DEVICE_KEY = "cd"
BLOCK_DEVICE_KEY = "bd"
STICKY_KEY = "st"
STICKY_OTHER_WRITABLE_KEY = "tw"
SOCKET_KEY = "so"
MISSING_KEY = "mi"
MULTI_HARDLINK_KEY = "mh"
def parse_ls_colors(ls_codes):
color_codes = {}
for entry in ls_codes.split(":"):
if "=" not in entry:
continue
entry_key, entry_value = entry.split("=")
if entry_key.startswith("*."):
entry_key = entry_key[1:]
color_codes[entry_key] = entry_value
assert color_codes != {}, color_codes
return color_codes
DEFAULT_COLOR_CODES = \
{BLOCK_DEVICE_KEY: '01;33', SYMLINK_KEY: '01;36',
STICKY_OTHER_WRITABLE_KEY: '30;42', DIRECTORY_KEY: '01;34',
SETUID_KEY: '37;41', CHARACTER_DEVICE_KEY: '01;33', SOCKET_KEY: '01;35',
EXECUTABLE_KEY: '01;32', STICKY_KEY: '37;44',
OTHER_WRITABLE_KEY: '34;42', PIPE_KEY: '33', SETGUID_KEY: '30;43',
ORPHAN_KEY: '40;31;01'}
def get_color_codes(environment):
if "LS_COLORS" in environment:
try:
return parse_ls_colors(environment["LS_COLORS"])
except:
syslog.syslog("Syntax error in LS_COLORS environment variable. "
"Using default colors.")
return DEFAULT_COLOR_CODES
def color_key_for_path(path, color_codes, is_link_target=True):
# see print_color_indicator in the file 'ls.c' in the coreutils codebase
if not os.path.lexists(path):
return MISSING_KEY
elif os.path.islink(path):
if is_link_target:
try:
link_path = os.path.join(os.path.dirname(path),
os.readlink(path))
file_stat = os.stat(link_path)
except OSError:
return ORPHAN_KEY
else:
return SYMLINK_KEY
else:
file_stat = os.stat(path)
mode = file_stat.st_mode
if stat.S_ISREG(mode):
if mode & stat.S_ISUID and SETUID_KEY in color_codes:
return SETUID_KEY
elif mode & stat.S_ISGID and SETGUID_KEY in color_codes:
return SETGUID_KEY
elif ((mode & stat.S_IXUSR or mode & stat.S_IXGRP or
mode & stat.S_IXOTH) and EXECUTABLE_KEY in color_codes):
return EXECUTABLE_KEY
elif file_stat.st_nlink > 1 and MULTI_HARDLINK_KEY in color_codes:
return MULTI_HARDLINK_KEY
else:
return FILE_KEY
elif stat.S_ISDIR(mode):
if (mode & stat.S_ISVTX and mode & stat.S_IWOTH and
STICKY_OTHER_WRITABLE_KEY in color_codes):
return STICKY_OTHER_WRITABLE_KEY
elif (mode & stat.S_IWOTH) != 0 and OTHER_WRITABLE_KEY in color_codes:
return OTHER_WRITABLE_KEY
elif (mode & stat.S_ISVTX) != 0 and STICKY_KEY in color_codes:
return STICKY_KEY
else:
return DIRECTORY_KEY
for test_function, color_key in [(stat.S_ISFIFO, PIPE_KEY),
(stat.S_ISSOCK, SOCKET_KEY),
(stat.S_ISBLK, BLOCK_DEVICE_KEY),
(stat.S_ISCHR, CHARACTER_DEVICE_KEY)]:
if test_function(mode):
return color_key
return ORPHAN_KEY
def color_code_for_path(path, color_codes):
def get_extension(basename, color_codes):
parts = basename.split(".")
if len(parts) == 2:
extension = "." + parts[1]
if extension in color_codes:
return extension
elif len(parts) > 2:
for extension in color_codes:
if extension.startswith(".") and \
basename.endswith(extension):
return extension
target_link = color_codes.get(SYMLINK_KEY, None)
color_key = color_key_for_path(path, color_codes,
target_link == "target")
if color_key == FILE_KEY:
filename = os.path.basename(path)
if "." in filename:
extension = get_extension(filename, color_codes)
if extension is not None:
color_key = extension
return color_codes.get(color_key, None)

273
lscolors_test.py Executable file
View file

@ -0,0 +1,273 @@
#!/usr/bin/env python3
# Copyright (C) 2011, 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import os
import os.path
import shutil
import stat
import subprocess
import tempfile
import unittest
import lscolors
class TempDirTestCase(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.temp_dir)
class ParseLsColorsTestCase(unittest.TestCase):
def test_parse_ls_colors(self):
self.assertRaises(AssertionError, lscolors.parse_ls_colors, "")
self.assertRaises(AssertionError, lscolors.parse_ls_colors, "::")
self.assertEqual(lscolors.parse_ls_colors("*.awk=38;5;148;1"),
{".awk": "38;5;148;1"})
self.assertEqual(lscolors.parse_ls_colors("*.tar.gz=38;5;148;1"),
{".tar.gz": "38;5;148;1"})
self.assertEqual(
lscolors.parse_ls_colors("*.awk=38;5;148;1:di=38;5;30"),
{".awk": "38;5;148;1", "di": "38;5;30"})
class ColorKeyForFileTestCase(TempDirTestCase):
COLOR_CODES = {lscolors.OTHER_WRITABLE_KEY: "other writable",
lscolors.EXECUTABLE_KEY: "executable",
lscolors.ORPHAN_KEY: "orphan",
lscolors.SETGUID_KEY: "setguid",
lscolors.SETUID_KEY: "setuid",
lscolors.STICKY_KEY: "sticky",
lscolors.STICKY_OTHER_WRITABLE_KEY: "sticky other writable",
lscolors.MULTI_HARDLINK_KEY: "multi hardlink",
lscolors.CHARACTER_DEVICE_KEY: "character device",
lscolors.BLOCK_DEVICE_KEY: "block device"}
def test_color_key_for_path_without_extension(self):
executable_path = os.path.join(self.temp_dir, "foo")
open(executable_path, "w").close()
self.assertEqual(
lscolors.color_key_for_path(executable_path, self.COLOR_CODES),
lscolors.FILE_KEY)
def test_color_key_for_path_with_extension(self):
awk_path = os.path.join(self.temp_dir, "test.awk")
open(awk_path, "w").close()
self.assertEqual(
lscolors.color_key_for_path(awk_path, self.COLOR_CODES),
lscolors.FILE_KEY)
def test_color_key_for_path_with_double_extension(self):
tar_gz_path = os.path.join(self.temp_dir, "test.tar.gz")
open(tar_gz_path, "w").close()
self.assertEqual(
lscolors.color_key_for_path(tar_gz_path, self.COLOR_CODES),
lscolors.FILE_KEY)
def test_color_code_for_directory(self):
self.assertEqual(
lscolors.color_key_for_path(self.temp_dir, self.COLOR_CODES),
lscolors.DIRECTORY_KEY)
def test_color_code_for_directory_thats_other_writable(self):
mode = os.stat(self.temp_dir).st_mode
os.chmod(self.temp_dir, mode | stat.S_IWOTH)
self.assertEqual(
lscolors.color_key_for_path(self.temp_dir, self.COLOR_CODES),
lscolors.OTHER_WRITABLE_KEY)
def test_color_code_for_executable(self):
executable_path = os.path.join(self.temp_dir, "a")
open(executable_path, "w").close()
os.chmod(executable_path, stat.S_IEXEC)
self.assertEqual(
lscolors.color_key_for_path(executable_path, self.COLOR_CODES),
lscolors.EXECUTABLE_KEY)
def test_color_code_for_executable_with_extension(self):
executable_path = os.path.join(self.temp_dir, "a.awk")
open(executable_path, "w").close()
os.chmod(executable_path, stat.S_IEXEC)
self.assertEqual(
lscolors.color_key_for_path(executable_path, self.COLOR_CODES),
lscolors.EXECUTABLE_KEY)
def test_color_code_for_setguid(self):
setguid_path = os.path.join(self.temp_dir, "a")
open(setguid_path, "w").close()
os.chmod(setguid_path, stat.S_ISGID)
self.assertEqual(
lscolors.color_key_for_path(setguid_path, self.COLOR_CODES),
lscolors.SETGUID_KEY)
def test_color_code_for_setuid(self):
setuid_path = os.path.join(self.temp_dir, "a")
open(setuid_path, "w").close()
os.chmod(setuid_path, stat.S_ISUID)
self.assertEqual(
lscolors.color_key_for_path(setuid_path, self.COLOR_CODES),
lscolors.SETUID_KEY)
def test_color_code_for_broken_symlink(self):
symlink_path = os.path.join(self.temp_dir, "b")
os.symlink(os.path.join(self.temp_dir, "a"), symlink_path)
self.assertEqual(
lscolors.color_key_for_path(symlink_path, self.COLOR_CODES),
lscolors.ORPHAN_KEY)
def test_color_code_for_good_symlink(self):
symlink_path = os.path.join(self.temp_dir, "b")
awk_path = os.path.join(self.temp_dir, "test.awk")
open(awk_path, "w").close()
os.symlink(awk_path, symlink_path)
self.assertEqual(
lscolors.color_key_for_path(symlink_path, self.COLOR_CODES),
lscolors.FILE_KEY)
def test_color_code_for_pipe(self):
pipe_path = os.path.join(self.temp_dir, "a")
os.mkfifo(pipe_path)
self.assertEqual(
lscolors.color_key_for_path(pipe_path, self.COLOR_CODES),
lscolors.PIPE_KEY)
def test_color_code_for_character_device(self):
character_device_path = "/dev/tty"
self.assertEqual(
lscolors.color_key_for_path(character_device_path,
self.COLOR_CODES),
lscolors.CHARACTER_DEVICE_KEY)
def test_color_code_for_block_device(self):
block_device_path = "/dev/loop0"
self.assertEqual(
lscolors.color_key_for_path(block_device_path, self.COLOR_CODES),
lscolors.BLOCK_DEVICE_KEY)
def test_color_code_for_sticky_directory(self):
mode = os.stat(self.temp_dir).st_mode
os.chmod(self.temp_dir, mode | stat.S_ISVTX)
self.assertEqual(
lscolors.color_key_for_path(self.temp_dir, self.COLOR_CODES),
lscolors.STICKY_KEY)
def test_color_code_for_sticky_and_other_writable(self):
mode = os.stat(self.temp_dir).st_mode
os.chmod(self.temp_dir, mode | stat.S_ISVTX | stat.S_IWOTH)
self.assertEqual(
lscolors.color_key_for_path(self.temp_dir, self.COLOR_CODES),
lscolors.STICKY_OTHER_WRITABLE_KEY)
def test_color_code_for_socket(self):
socket_path = "/dev/log"
self.assertEqual(
lscolors.color_key_for_path(socket_path, self.COLOR_CODES),
lscolors.SOCKET_KEY)
def test_color_code_for_missing_file(self):
missing_path = os.path.join(self.temp_dir, "a")
self.assertEqual(
lscolors.color_key_for_path(missing_path, self.COLOR_CODES),
lscolors.MISSING_KEY)
def test_color_code_for_multi_hardlink(self):
a_path = os.path.join(self.temp_dir, "a")
open(a_path, "w").close()
b_path = os.path.join(self.temp_dir, "b")
os.link(a_path, b_path)
self.assertEqual(
lscolors.color_key_for_path(a_path, self.COLOR_CODES),
lscolors.MULTI_HARDLINK_KEY)
class ColorCodeForFileTestCase(TempDirTestCase):
AWK_COLOR = "awk color"
TAR_GZ_COLOR = "tar gz color"
COLOR_CODES = {
".awk": AWK_COLOR, ".tar.gz": TAR_GZ_COLOR}
def test_color_code_for_path_without_extension(self):
file_path = os.path.join(self.temp_dir, "foo")
open(file_path, "w").close()
self.assertEqual(
lscolors.color_code_for_path(file_path, {"fi": "file color"}),
"file color")
def test_color_code_for_path_with_extension(self):
awk_path = os.path.join(self.temp_dir, "test.awk")
open(awk_path, "w").close()
self.assertEqual(
lscolors.color_code_for_path(awk_path, self.COLOR_CODES),
self.AWK_COLOR)
def test_color_code_for_path_with_double_extension(self):
tar_gz_path = os.path.join(self.temp_dir, "test.tar.gz")
open(tar_gz_path, "w").close()
self.assertEqual(
lscolors.color_code_for_path(tar_gz_path, self.COLOR_CODES),
self.TAR_GZ_COLOR)
def parse_ls_line(line):
parts = line.split("\x1b[")
if len(parts) == 1:
return (None, line)
for part in parts:
end_color_code = part.find("m")
if end_color_code < (len(part) - 1):
return tuple(part.split("m", 1))
class ParseLsLineTestCase(unittest.TestCase):
def test_parse_ls_line(self):
self.assertEqual(parse_ls_line(
"\x1b[0m\x1b[38;5;254m\x1b[m\x1b[38;5;30mhello\x1b[0m\n"),
("38;5;30", "hello"))
def test_against_ls(root_path, environment):
process = subprocess.Popen(
["ls", "--color=always", "-R", root_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=environment)
stdout, stderr = process.communicate()
color_codes = lscolors.get_color_codes(environment)
for line in stdout.splitlines():
line = line.strip()
if line == "":
continue
if line.endswith(":"):
current_directory = line[:-1]
continue
ls_color_code, filename = parse_ls_line(line)
path = os.path.join(current_directory, filename)
if os.path.exists(path): # Some paths are already gone. e.g. in /proc
color_code = lscolors.color_code_for_path(path, color_codes)
if color_code != ls_color_code:
print("%s %r %r" % (path, color_code, ls_color_code))
RICH_COLOR_CODES = (
"bd=38;5;68:ca=38;5;17:cd=38;5;113;1:di=38;5;30:do=38;5;127:"
"ex=38;5;166;1:pi=38;5;126:fi=38;5;253:ln=target:mh=38;5;220;1:"
"no=38;5;254:or=48;5;196;38;5;232;1:ow=38;5;33;1:sg=38;5;137;1:"
"su=38;5;137:so=38;5;197:st=48;5;235;38;5;118;1:tw=48;5;235;38;5;139;1:"
"*.BAT=38;5;108:*.PL=38;5;160:*.asm=38;5;240;1:*.awk=38;5;148;1:"
"*.bash=38;5;173:*.bat=38;5;108:*.c=38;5;110:*.cfg=1:*.coffee=38;5;94;1:"
"*.conf=1:*.cpp=38;5;24;1:*.cs=38;5;74;1:*.css=38;5;91:*.csv=38;5;78:"
"*.diff=48;5;197;38;5;232:*.enc=38;5;192;3")
if __name__ == "__main__":
unittest.main()
# root_path = "/"
# test_against_ls(root_path, {"LS_COLORS": RICH_COLOR_CODES})
# test_against_ls(root_path, {}) # Test using default colors

112
terminal.py Normal file
View file

@ -0,0 +1,112 @@
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import contextlib
import curses
import os
import select
import sys
import termios
import tty
import urwid
import urwid.raw_display
curses.setupterm(os.environ.get("TERM", "unknown"), sys.stdout.fileno())
def _get_code(capability):
code = curses.tigetstr(capability)
return code.decode("latin1") if code is not None else code
normal = _get_code("sgr0")
bold = _get_code("bold")
italic = _get_code("sitm")
shadow = _get_code("sshm")
standout = _get_code("smso")
subscript = _get_code("ssubm")
superscript = _get_code("ssupm")
underline = _get_code("smul")
enter_fullscreen = _get_code("smcup")
exit_fullscreen = _get_code("rmcup")
hide_cursor = _get_code("civis")
normal_cursor = _get_code("cnorm")
clear = _get_code("clear")
save = _get_code("sc")
restore = _get_code("rc")
# reverse:rev, blink:blink, dim:dim, flash:flash
_fg_color = curses.tigetstr("setaf")
_bg_color = curses.tigetstr("setab")
_move = curses.tigetstr("cup")
def fg_color(color_number):
return curses.tparm(_fg_color, color_number).decode("latin1")
def bg_color(color_number):
return curses.tparm(_bg_color, color_number).decode("latin1")
def fg_rgb_color(rgb):
# Is there a better way?
return "\x1b[38;2;%i;%i;%im" % rgb
def bg_rgb_color(rgb):
return "\x1b[48;2;%i;%i;%im" % rgb
def move(x, y):
return curses.tparm(_move, y, x).decode("latin1")
@contextlib.contextmanager
def fullscreen():
if enter_fullscreen is None:
try:
yield
finally:
sys.stdout.write(clear)
else:
sys.stdout.write(enter_fullscreen)
try:
yield
finally:
sys.stdout.write(exit_fullscreen)
@contextlib.contextmanager
def hidden_cursor():
sys.stdout.write(hide_cursor)
try:
yield
finally:
sys.stdout.write(normal_cursor)
@contextlib.contextmanager
def console_title(title):
sys.stdout.write(save)
sys.stdout.write("\033]0;%s\007" % title)
try:
yield
finally:
sys.stdout.write(restore)
@contextlib.contextmanager
def urwid_screen():
screen = urwid.raw_display.Screen()
screen.set_mouse_tracking(True)
screen.start()
try:
yield screen
finally:
screen.stop()

223
termstr.py Normal file
View file

@ -0,0 +1,223 @@
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import collections
import weakref
import terminal
def cache_first_result(user_function):
def decorator(self, *args, **kwds):
try:
return self._cache
except AttributeError:
self._cache = user_function(self, *args, **kwds)
return self._cache
return decorator
class Color:
black = (0, 0, 0)
white = (255, 255, 255)
red = (255, 0, 0)
green = (0, 255, 0)
blue = (0, 0, 255)
yellow = (255, 255, 0)
grey_50 = (50, 50, 50)
grey_100 = (100, 100, 100)
class CharStyle:
_POOL = weakref.WeakValueDictionary()
def __new__(cls, fg_color=None, bg_color=None, is_bold=False,
is_italic=False, is_underlined=False):
if fg_color is None:
fg_color = Color.white
if bg_color is None:
bg_color = Color.black
key = (fg_color, bg_color, is_bold, is_italic, is_underlined)
try:
return CharStyle._POOL[key]
except KeyError:
obj = object.__new__(cls)
obj.fg_color, obj.bg_color, obj.is_bold, obj.is_italic, \
obj.is_underlined = key
return CharStyle._POOL.setdefault(key, obj)
def __getnewargs__(self):
return (self.fg_color, self.bg_color, self.is_bold, self.is_italic,
self.is_underlined)
def __repr__(self):
attributes = []
if self.is_bold:
attributes.append("b")
if self.is_italic:
attributes.append("i")
if self.is_underlined:
attributes.append("u")
return ("<CharStyle: fg:%s bg:%s attr:%s>" %
(self.fg_color, self.bg_color, ",".join(attributes)))
@cache_first_result
def code_for_term(self):
fg_func = (terminal.fg_color if isinstance(self.fg_color, int)
else terminal.fg_rgb_color)
bg_func = (terminal.bg_color if isinstance(self.bg_color, int)
else terminal.bg_rgb_color)
bold_code = terminal.bold if self.is_bold else ""
italic_code = terminal.italic if self.is_italic else ""
underline_code = terminal.underline if self.is_underlined else ""
return "".join([terminal.normal, fg_func(self.fg_color),
bg_func(self.bg_color), bold_code, italic_code,
underline_code])
def join_lists(lists):
result = []
for list_ in lists:
result.extend(list_)
return result
class TermStr(collections.UserString):
def __init__(self, data, style=CharStyle()):
if isinstance(data, self.__class__):
self.data = data.data
self.style = data.style
else:
self.data = data
self.style = (style if isinstance(style, tuple)
else (style,) * len(data))
def __eq__(self, other):
return (self is other or
(isinstance(other, self.__class__) and
self.data == other.data and self.style == other.style))
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.data, self.style))
@cache_first_result
def _partition_style(self):
if self.data == "":
return []
last_style, last_index = None, 0
result = []
for index, style in enumerate(self.style):
if style != last_style:
if last_style is not None:
result.append(
(last_style, self.data[last_index:index], last_index))
last_style, last_index = style, index
result.append(
(last_style, self.data[last_index:len(self.style)], last_index))
return result
def __str__(self):
return "".join(join_lists(
[style.code_for_term(), str_]
for style, str_, position in self._partition_style()) +
[terminal.normal])
def __repr__(self):
return "<TermStr: %r>" % self.data
def __add__(self, other):
if isinstance(other, str):
other = TermStr(other)
return self.__class__(self.data + other.data, self.style + other.style)
def __radd__(self, other):
if isinstance(other, str):
other = TermStr(other)
return self.__class__(other.data + self.data, other.style + self.style)
def __mul__(self, n):
return self.__class__(self.data*n, self.style*n)
__rmul__ = __mul__
def __getitem__(self, index):
return self.__class__(self.data[index], self.style[index])
def join(self, parts):
parts = [TermStr(part) if isinstance(part, str) else part
for part in parts]
joined_style = join_lists(self.style + part.style for part in parts)
return self.__class__(self.data.join(part.data for part in parts),
tuple(joined_style[len(self.style):]))
def _split_style(self, parts, sep_length):
result = []
cursor = 0
for part in parts:
style_part = self.style[cursor:cursor+len(part)]
result.append(self.__class__(part, style_part))
cursor += (len(part) + sep_length)
return result
def split(self, sep=None, maxsplit=-1):
return self._split_style(self.data.split(sep, maxsplit), len(sep))
def splitlines(self, keepends=0):
# FIX. Fails when a line seperator isn't one character in length.. \r\n
sep_length = 0 if keepends else len("\n")
return self._split_style(self.data.splitlines(keepends), sep_length)
def capitalize(self):
return self.__class__(self.data.capitalize(), self.style)
def lower(self):
return self.__class__(self.data.lower(), self.style)
def swapcase(self):
return self.__class__(self.data.swapcase(), self.style)
def title(self):
return self.__class__(self.data.title(), self.style)
def upper(self):
return self.__class__(self.data.upper(), self.style)
def ljust(self, width, fillchar=" "):
return self + self.__class__(fillchar * (width - len(self.data)))
def rjust(self, width, fillchar=" "):
return self.__class__(fillchar * (width - len(self.data))) + self
def center(self, width, fillchar=" "):
left_width = (width - len(self.data)) // 2
if left_width < 1:
return self
return (self.__class__(fillchar * left_width) + self +
self.__class__(fillchar *
(width - left_width - len(self.data))))
# Below are extra methods useful for termstrs.
def transform_style(self, transform_func):
new_style = tuple(join_lists([transform_func(style)] * len(str_)
for style, str_, position
in self._partition_style()))
return self.__class__(self.data, new_style)
def bold(self):
def make_bold(style):
return CharStyle(style.fg_color, style.bg_color, is_bold=True,
is_underlined=style.is_underlined)
return self.transform_style(make_bold)
def underline(self):
def make_underlined(style):
return CharStyle(style.fg_color, style.bg_color,
is_bold=style.is_bold, is_underlined=True)
return self.transform_style(make_underlined)

121
termstr_test.py Executable file
View file

@ -0,0 +1,121 @@
#!/usr/bin/env python3
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import pickle
import unittest
from termstr import TermStr, CharStyle
import termstr
class CacheFirstResultTestCase(unittest.TestCase):
def test_cache_first_result_decorator(self):
class A:
@termstr.cache_first_result
def a(self, foo):
return foo
a = A()
self.assertEqual(a.a(3), 3)
self.assertEqual(a.a(4), 3)
class B:
@termstr.cache_first_result
def b(self, foo):
return foo
b = B()
self.assertEqual(b.b(5), 5)
class CharStyleTests(unittest.TestCase):
def setUp(self):
self.style = CharStyle()
def test_default_char_style(self):
self.assertEqual(self.style.fg_color, termstr.Color.white)
self.assertEqual(self.style.bg_color, termstr.Color.black)
self.assertEqual(self.style.is_bold, False)
self.assertEqual(self.style.is_underlined, False)
def test_pickle_char_style(self):
style = CharStyle()
loaded_style = pickle.loads(pickle.dumps(style))
self.assertEqual(style, loaded_style)
self.assertTrue(style is loaded_style)
def test_repr(self):
self.assertEqual(repr(self.style),
"<CharStyle: fg:(255, 255, 255) bg:(0, 0, 0) attr:>")
def test_code_for_term(self):
self.assertEqual(self.style.code_for_term(),
"\x1b[0m\x1b[38;2;255;255;255m\x1b[48;2;0;0;0m")
class TermStrTests(unittest.TestCase):
def test_termstr(self):
foo = TermStr("foo")
foobar = TermStr("foobar")
bold_style = CharStyle(3, 5, is_bold=True)
foo_bold = TermStr("foo", bold_style)
self.assertEqual(repr(foo_bold), "<TermStr: 'foo'>")
self.assertEqual(foo + "bar", TermStr("foobar"))
self.assertEqual(foo + TermStr("bar"),
TermStr("foobar"))
self.assertEqual("bar" + foo, TermStr("barfoo"))
self.assertFalse(foo == foo_bold)
self.assertFalse(foo_bold == foo)
self.assertFalse("foo" == foo_bold)
self.assertTrue("food" != foo_bold)
self.assertFalse(foo != foo)
self.assertTrue(foo != foo_bold)
self.assertFalse(foo_bold == "foo")
self.assertTrue(foo_bold != "food")
self.assertEqual(foobar[:2], TermStr("fo"))
self.assertEqual(foobar[2:], TermStr("obar"))
self.assertEqual(foobar[::2], TermStr("foa"))
self.assertEqual(foobar[3], TermStr("b"))
self.assertEqual(foo_bold[1], TermStr("o", bold_style))
self.assertTrue(foo.startswith("fo"))
self.assertTrue(foo.endswith("oo"))
self.assertEqual(foo.index("o"), 1)
self.assertTrue("fo" in foo)
self.assertEqual(foo.find("oo"), 1)
self.assertEqual(TermStr("fo") * 2, TermStr("fofo"))
self.assertEqual(2 * TermStr("fo"), TermStr("fofo"))
self.assertEqual(foobar.split("b"), [TermStr("foo"),
TermStr("ar")])
self.assertEqual(foo.join(["C", "D"]), TermStr("CfooD"))
self.assertEqual(foo.join(["C", TermStr("D")]),
TermStr("CfooD"))
self.assertEqual(foo.join([]), TermStr(""))
self.assertEqual(foo.join(["C"]), TermStr("C"))
bar = TermStr("bar", bold_style)
self.assertEqual((foo + "\n" + bar).splitlines(), [foo, bar])
self.assertEqual((foo + "\n" + bar).splitlines(keepends=True),
[TermStr("foo\n"), bar])
self.assertEqual(foo.ljust(5), foo + TermStr(" "))
self.assertEqual(foo.rjust(5), TermStr(" ") + foo)
self.assertEqual(TermStr("FOO").lower(), foo)
self.assertEqual(TermStr("FOO", bold_style).lower(), foo_bold)
self.assertEqual(TermStr("FOO").swapcase(), foo)
self.assertEqual(TermStr("FOO", bold_style).swapcase(), foo_bold)
phrase = TermStr("foo bar")
self.assertEqual(phrase.title(), TermStr("Foo Bar"))
self.assertEqual(phrase.capitalize(), TermStr("Foo bar"))
self.assertEqual(foo.upper(), TermStr("FOO"))
self.assertEqual(foo_bold.center(0), foo_bold)
self.assertEqual(foo_bold.center(7),
TermStr(" ") + foo_bold + TermStr(" "))
self.assertEqual(foo_bold.ljust(0), foo_bold)
self.assertEqual(foo_bold.ljust(5), foo_bold + TermStr(" "))
self.assertEqual(foo_bold.rjust(0), foo_bold)
self.assertEqual(foo_bold.rjust(5), TermStr(" ") + foo_bold)
if __name__ == "__main__":
unittest.main()

8
test-all Executable file
View file

@ -0,0 +1,8 @@
#!/bin/bash
for test in *_test.py; do
echo "Testing $test ..."
./${test} 2>&1
echo
done

663
tools.py Normal file
View file

@ -0,0 +1,663 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import dis
import functools
import hashlib
import io
import math
import os
import os.path
import pickle
import pprint
import pwd
import stat
import subprocess
import tempfile
import time
import lscolors
import pygments
import pygments.lexers
import pygments.styles
import traceback
import fill3
import termstr
class Status:
success = 1
failure = 2
info = 3
error = 4
placeholder = 5
running = 6
empty = 7
paused = 8
_STATUS_COLORS = [(Status.success, termstr.Color.green),
(Status.failure, termstr.Color.red),
(Status.info, termstr.Color.white),
(Status.placeholder, termstr.Color.grey_100),
(Status.running, termstr.Color.yellow)]
STATUS_MEANINGS = [
(Status.info, "Normal"), (Status.success, "No problems"),
(Status.failure, "Problems"), (Status.placeholder, "Not applicable"),
(Status.running, "Running"), (Status.empty, "Pending"),
(Status.error, "Error")]
_STATUS_TO_TERMSTR = {
status: termstr.TermStr("", termstr.CharStyle(fg_color=color))
for status, color in _STATUS_COLORS}
_STATUS_TO_TERMSTR[Status.error] = termstr.TermStr(
"E ", termstr.CharStyle(fg_color=termstr.Color.red))
_STATUS_TO_TERMSTR[Status.empty] = ". "
_STATUS_TO_TERMSTR_SIMPLE = {
status: termstr.TermStr(" ", termstr.CharStyle(bg_color=color))
for status, color in _STATUS_COLORS}
_STATUS_TO_TERMSTR_SIMPLE[Status.error] = termstr.TermStr(
"E", termstr.CharStyle(bg_color=termstr.Color.red))
_STATUS_TO_TERMSTR_SIMPLE[Status.empty] = "."
LS_COLOR_CODES = lscolors.get_color_codes(os.environ)
def _convert_lscolor_code_to_charstyle(lscolor_code):
if lscolor_code is None:
return termstr.CharStyle()
parts = lscolor_code.split(";")
if len(parts) == 1:
# Is this correct?
is_bold = parts[0] == "1"
fg_color = termstr.Color.white
else:
is_bold = len(parts) == 4 and parts[3] == "1"
fg_color = int(parts[2])
return termstr.CharStyle(fg_color, is_bold=is_bold)
def sandbox_command(command):
# Deps: firejail http://l3net.wordpress.com/projects/firejail/
# return ["firejail", "--overlay", "-c"] + command
# return ["firejail", "-c"] + command
return command
def fix_input(input_):
input_str = input_.decode("utf-8") if isinstance(input_, bytes) else input_
return input_str.replace("\t", " " * 4)
def _do_command(command, **kwargs):
stdout, stderr = "", ""
try:
process = subprocess.Popen(sandbox_command(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, **kwargs)
stdout, stderr = process.communicate()
except subprocess.CalledProcessError:
pass
return fix_input(stdout), fix_input(stderr), process.returncode
def _run_command(path, command, status_text=Status.success):
status, output = status_text, ""
try:
process = subprocess.Popen(sandbox_command(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
output = stdout + stderr
except subprocess.CalledProcessError:
status = Status.failure
if process.returncode != 0:
status = Status.failure
return status, fill3.Text(fix_input(output))
def _syntax_highlight_code(text, path):
lexer = pygments.lexers.get_lexer_for_filename(path, text)
tokens = pygments.lex(text, lexer)
native_style = pygments.styles.get_style_by_name("native")
return fill3.Code(tokens, native_style)
def pygments_(path):
with open(path) as file_:
try:
text = file_.read()
except UnicodeDecodeError:
return Status.placeholder, fill3.Text("Not unicode")
else:
try:
source_widget = _syntax_highlight_code(fix_input(text), path)
except pygments.util.ClassNotFound:
return Status.placeholder, fill3.Text("No lexer found")
return Status.info, source_widget
pygments_.dependencies = ["python3-pygments"]
def linguist(path):
# Dep: ruby?, ruby-dev, libicu-dev, cmake, "gem install github-linguist"
return _run_command(path, ["linguist", path], Status.info)
def mp3info(path):
stdout, stderr, returncode = _do_command(["mp3info", "-x", path])
source_widget = fill3.Text(stdout)
return Status.info, source_widget
mp3info.dependencies = ["mp3info"]
def _permissions_in_octal(permissions):
result = []
for part_index in range(3):
index = part_index * 3 + 1
part = permissions[index:index+3]
digit = sum(2 ** (2 - index) for index, element in enumerate(part)
if element != "-")
result.append(str(digit))
return "".join(result)
def _pretty_bytes(bytes):
if bytes == 0:
return "0 B"
units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
unit_index = int(math.floor(math.log(bytes, 1024)))
power = math.pow(1024, unit_index)
conversion = round(bytes/power, 2)
return "%s %s" % (conversion, units[unit_index])
def md5(path): # Deps: coreutils
# stdout, stderr, returncode = _do_command(["md5sum", path])
# stdout = stdout.decode("utf-8")
# return stdout.split()[0]
with open(path, "rb") as file:
return hashlib.md5(file.read()).hexdigest()
def _filemode(mode):
"""Convert a file's mode to a string of the form '-rwxrwxrwx'."""
perm = []
for table in stat._filemode_table:
for bit, char in table:
if mode & bit == bit:
perm.append(char)
break
else:
perm.append("-")
return "".join(perm)
def metadata(path): # Deps: file, coreutils
def _detail(value, unit):
return (" (%s)" % value if unit is None else " (%s %s)" %
(value, unit))
is_symlink = "yes" if os.path.islink(path) else "no"
stat_result = os.stat(path)
permissions = _filemode(stat_result.st_mode)
hardlinks = str(stat_result.st_nlink)
group = [pwd.getpwuid(stat_result.st_gid).pw_name,
_detail(stat_result.st_gid, "gid")]
owner = [pwd.getpwuid(stat_result.st_uid).pw_name,
_detail(stat_result.st_uid, "uid")]
modified, created, access = [
[time.asctime(time.gmtime(seconds)), _detail(int(seconds), "secs")]
for seconds in (stat_result.st_mtime, stat_result.st_ctime,
stat_result.st_atime)]
size = [_pretty_bytes(stat_result.st_size),
_detail(stat_result.st_size, "bytes")]
stdout, stderr, returncode = _do_command(
["file", "--dereference", "--brief", "--uncompress", "--mime", path])
mime_type = stdout
stdout, stderr, returncode = _do_command(
["file", "--dereference", "--brief", "--uncompress", path])
file_type = stdout
md5sum = md5(path)
stdout, stderr, returncode = _do_command(["sha1sum", path])
sha1sum = stdout.split()[0]
permissions_value = [permissions,
_detail(_permissions_in_octal(permissions), None)]
text = []
for line in [
("size", size), ("permissions", permissions_value), None,
("modified time", modified), ("creation time", created),
("access time", access), None,
("owner", owner), ("group", group), None,
("hardlinks", hardlinks), ("symlink", is_symlink), None,
("md5", md5sum), ("sha1", sha1sum), None,
("mime type", mime_type.strip()),
("file type", file_type.strip())]:
if line is None:
text.append("\n")
else:
name, value = line
text.append("%-15s: %s\n" % (name, "".join(value)))
return (Status.info, fill3.Text("".join(text)))
def pylint3(path):
return _run_command(path, ["python3", "-m", "pylint", "--errors-only",
path])
pylint3.dependencies = {"pylint3"}
def pyflakes(path):
return _run_command(path, ["python3", "-m", "pyflakes", path])
pyflakes.dependencies = {"pyflakes"}
def pep8(path):
return _run_command(path, ["python3", "-m", "pep8", path])
pep8.dependencies = {"python3-pep8"}
def _has_shebang_line(path):
with open(path, "rb") as file_:
return file_.read(2) == "#!"
_python_console_lexer = pygments.lexers.PythonConsoleLexer()
def unittests(path):
if str(path).endswith("_test.py"):
cmd = [path] if _has_shebang_line(path) else ["python3", path]
stdout, stderr, returncode = _do_command(["timeout", "20"] + cmd)
markup = pygments.lex(stderr, _python_console_lexer)
status = Status.success if returncode == 0 else Status.failure
native_style = pygments.styles.get_style_by_name("native")
code = fill3.Code(markup, native_style)
return status, code
else:
return Status.placeholder, fill3.Text("No tests.")
unittests.dependencies = {"python3"}
def gut(path):
status, output = Status.info, ""
try:
output = subprocess.check_output(
["/home/ahamilton/code/python-gut/gut.py", path])
except subprocess.CalledProcessError:
status = Status.failure
source_widget = _syntax_highlight_code(fix_input(output), path)
return status, source_widget
def pydoc3(path):
status, output = Status.info, ""
try:
output = subprocess.check_output(
["timeout", "20", "pydoc3", path])
output = fix_input(output)
except subprocess.CalledProcessError:
status = Status.placeholder
if not output.startswith("Help on module"):
status = Status.placeholder
return status, fill3.Text(output)
pydoc3.dependencies = {"python3"}
def modulefinder(path):
return _run_command(
path, ["python3", "-m", "modulefinder", path], Status.info)
modulefinder.dependencies = {"python3"}
def python_syntax(path):
return _run_command(path, ["python3", "-m", "py_compile", path])
python_syntax.dependencies = {"python3"}
def disassemble_pyc(path):
bytecode = open(path, "rb").read()
stringio = io.StringIO()
dis.dis(bytecode, file=stringio)
stringio.seek(0)
return Status.info, fill3.Text(stringio.read())
# def disassemble_pyc(path): # Deps: found on internet
# code_path = os.path.dirname(sys.argv[0])
# disassemble_path = os.path.join(code_path, "disassemble.py")
# return _run_command(path, ["python", disassemble_path, path],
# Status.info)
def perldoc(path):
stdout, stderr, returncode = _do_command(["perldoc", path])
return ((Status.info, fill3.Text(stdout)) if returncode == 0
else (Status.placeholder, fill3.Text(stderr)))
perldoc.dependencies = {"perl-doc"}
def python_tidy(path): # Deps: found on internet?
stdout, stderr, returncode = _do_command(["python", "python-tidy.py",
path])
return Status.info, _syntax_highlight_code(stdout, path)
def mccabe(path):
command = ["python3", "/usr/lib/python3/dist-packages/mccabe.py", path]
return _run_command(path, command, Status.info)
mccabe.dependencies = {"python3-mccabe"}
def perltidy(path):
stdout, stderr, returncode = _do_command(["perltidy", "-st", path])
return Status.info, _syntax_highlight_code(stdout, path)
perltidy.dependencies = {"perltidy"}
def perl_syntax(path):
return _run_command(path, ["perl", "-c", path])
perl_syntax.dependencies = {"perl"}
def objdump_headers(path):
return _run_command(path, ["objdump", "--all-headers", path], Status.info)
objdump_headers.dependencies = {"binutils"}
def objdump_disassemble(path):
stdout, stderr, returncode = _do_command(
["objdump", "--disassemble", "--reloc", "--dynamic-reloc", path])
import pygments.lexers.asm
lexer = pygments.lexers.asm.ObjdumpLexer()
return Status.success, fill3.Text(list(pygments.lex(stdout, lexer)))
objdump_disassemble.dependencies = {"binutils"}
def readelf(path):
return _run_command(path, ["readelf", "--all", path], Status.info)
readelf.dependencies = {"binutils"}
def dump_pickle(path):
with open(path, "rb") as file_:
object_ = pickle.load(file_)
return Status.info, fill3.Text(pprint.pformat(object_.__dict__))
def unzip(path):
return _run_command(path, ["unzip", "-l", path], Status.info)
unzip.dependencies = {"unzip"}
def tar_gz(path):
return _run_command(path, ["tar", "ztvf", path], Status.info)
tar_gz.dependencies = {"tar"}
def tar_bz2(path):
return _run_command(path, ["tar", "jtvf", path], Status.info)
tar_bz2.dependencies = {"tar"}
def csv(path):
return _run_command(path, ["head", "--lines=20", path], Status.info)
csv.dependencies = {"coreutils"}
def nm(path):
return _run_command(path, ["nm", "--demangle", path], Status.info)
nm.dependencies = {"binutils"}
def pdf2txt(path):
return _run_command(path, ["pdf2txt", path], Status.info)
pdf2txt.dependencies = {"python-pdfminer"}
def html2text(path):
return _run_command(path, ["html2text", path], Status.info)
html2text.dependencies = {"html2text"}
def html_syntax(path):
# Maybe only show errors
stdout, stderr, returncode = _do_command(["tidy", path])
status = Status.success if returncode == 0 else Status.failure
return status, fill3.Text(stderr)
html_syntax.dependencies = {"tidy"}
def tidy(path):
stdout, stderr, returncode = _do_command(["tidy", path])
return Status.info, fill3.Text(stdout)
tidy.dependencies = {"tidy"}
def bcpp(path):
stdout, stderr, returncode = _do_command(["bcpp", "-fi", path])
status = Status.info if returncode == 0 else Status.failure
source_widget = _syntax_highlight_code(stdout, path)
return status, source_widget
bcpp.dependencies = {"bcpp"}
def uncrustify(path):
with tempfile.TemporaryDirectory() as temp_dir:
config_path = os.path.join(temp_dir, "uncrustify.cfg")
stdout, stderr, returncode = _do_command(
["uncrustify", "--detect", "-f", path, "-o", config_path])
if returncode != 0:
raise AssertionError
stdout, stderr, returncode = _do_command(
["uncrustify", "-c", config_path, "-f", path])
status = Status.info if returncode == 0 else Status.failure
source_widget = _syntax_highlight_code(stdout, path)
return status, source_widget
uncrustify.dependencies = {"uncrustify"}
def php5_syntax(path):
return _run_command(path, ["php", "--syntax-check", path])
php5_syntax.dependencies = {"php5"}
def flog(path): # Deps: "gem install flog"
return _run_command(path, ["flog", path], Status.info)
# def csstidy(path): # Deps: csstidy
# stdout, stderr, returncode = _do_command(["csstidy", path])
# status = Status.info if returncode == 0 else Status.failure
# source_widget = _syntax_highlight_code(stdout, path)
# return status, source_widget
def python3_coverage(path):
test_path = path[:-(len(".py"))] + "_test.py"
if os.path.exists(test_path):
with tempfile.TemporaryDirectory() as temp_dir:
coverage_path = os.path.join(temp_dir, "coverage")
env = os.environ.copy()
env["COVERAGE_FILE"] = coverage_path
stdout, stderr, returncode = _do_command(
["timeout", "20", "python3-coverage", "run", test_path],
env=env)
assert returncode == 0, returncode
stdout, stderr, returncode = _do_command(
["python3-coverage", "annotate", "--directory", temp_dir,
os.path.normpath(path)], env=env)
with open(os.path.join(temp_dir, path + ",cover"), "r") as f:
stdout = f.read()
return Status.info, fill3.Text(stdout)
else:
return Status.placeholder, fill3.Text("No corresponding test file: " +
os.path.normpath(test_path))
python3_coverage.dependencies = {"python3-coverage"}
def profile(path):
stdout, stderr, returncode = _do_command(
["timeout", "20", "python3", "-m", "cProfile", "--sort=cumulative",
path])
return Status.info, fill3.Text(stdout)
profile.dependencies = {"python3"}
def _jlint_tool(tool_type, path):
stdout, stderr, returncode = _do_command([tool_type, path])
status = (Status.success
if b"Verification completed: 0 reported messages." in stdout
else Status.failure)
return status, fill3.Text(stdout)
def antic(path):
return _jlint_tool("antic", path)
antic.dependencies = {"jlint"}
def jlint(path):
return _jlint_tool("jlint", path)
jlint.dependencies = {"jlint"}
def splint(path):
stdout, stderr, returncode = _do_command(["splint", "-preproc", path])
status = Status.success if returncode == 0 else Status.failure
return status, fill3.Text(stdout + stderr)
splint.dependencies = {"splint"}
def generic_tools():
return [metadata, pygments_]
def tools_for_extension():
return {
"py": [python_syntax, unittests, pydoc3, python3_coverage, profile,
pep8, pyflakes, pylint3, gut, modulefinder], # mccabe
"pyc": [disassemble_pyc],
"pl": [perl_syntax, perldoc, perltidy],
"pm": [perl_syntax, perldoc, perltidy],
"java": [antic, uncrustify],
"class": [jlint],
"c": [splint, uncrustify],
"h": [splint, uncrustify],
"o": [objdump_headers, objdump_disassemble, readelf],
"mp3": [mp3info],
"pickle": [dump_pickle],
"zip": [unzip],
"tar.gz": [tar_gz],
"tgz": [tar_gz],
"tar.bz2": [tar_bz2],
"csv": [csv],
"a": [nm],
"so": [nm],
"pdf": [pdf2txt],
"html": [html_syntax, tidy, html2text],
"cpp": [bcpp, uncrustify],
"php": [php5_syntax],
"rb": [flog]
# "css": [csstidy]
}
def tools_all():
tools_ = set(generic_tools())
for tool_list in tools_for_extension().values():
tools_.update(set(tool_list))
return tools_
def dependencies():
dependencies_all = set()
for tool in tools_all():
try:
dependencies_all.update(tool.dependencies)
except AttributeError:
continue
return dependencies_all
# def _extensions_for_tool(tools_for_extension):
# result = {}
# for extension, tools in tools_for_extension.items():
# for tool in tools:
# if tool in result:
# result[tool].append(extension)
# else:
# result[tool] = [extension]
# return result
def splitext(path):
root, ext = os.path.splitext(path)
if "." in root:
for compound_ext in [".tar.gz", ".tar.bz2"]:
if path.endswith(compound_ext):
return path[:-len(compound_ext)], path[-len(compound_ext):]
return root, ext
def tools_for_path(path):
root, ext = splitext(path)
extra_tools = [] if ext == "" else tools_for_extension().get(ext[1:], [])
# return [Tool(tool) for tool in (generic_tools() + extra_tools)]
return generic_tools() + extra_tools
def _get_python_traceback_lexer():
return pygments.lexers.PythonTracebackLexer()
def _get_python_console_lexer():
return pygments.lexers.PythonConsoleLexer()
def run_tool_no_error(path, tool):
try:
status, result = tool(path)
except:
# Maybe use code.InteractiveInterpreter.showtraceback() ?
tokens = pygments.lex(traceback.format_exc(),
_get_python_traceback_lexer())
native_style = pygments.styles.get_style_by_name("native")
status, result = Status.error, fill3.Code(tokens, native_style)
return status, result
@functools.lru_cache(maxsize=100)
def _path_colored(path):
color_code = lscolors.color_code_for_path(path, LS_COLOR_CODES)
char_style = _convert_lscolor_code_to_charstyle(color_code)
path = path[2:]
dirname, basename = os.path.split(path)
if dirname == "":
return termstr.TermStr(basename, char_style)
else:
dirname = dirname + os.path.sep
color_code = lscolors.color_code_for_path(dirname, LS_COLOR_CODES)
dir_style = _convert_lscolor_code_to_charstyle(color_code)
return (termstr.TermStr(dirname, dir_style) +
termstr.TermStr(basename, char_style))
@functools.lru_cache(maxsize=100)
def _tool_name_colored(tool, path):
if tool in generic_tools():
char_style = termstr.CharStyle((255, 255, 255), (0, 0, 0),
is_bold=True)
else:
# extensions = _extensions_for_tool(tools_for_extension())[tool]
# color_code = (
# LS_COLOR_CODES.get("." + extensions[0], None)
# if len(extensions) == 1
# else lscolors.color_code_for_path(path, LS_COLOR_CODES))
color_code = lscolors.color_code_for_path(path, LS_COLOR_CODES)
char_style = _convert_lscolor_code_to_charstyle(color_code)
return termstr.TermStr(tool.__name__, char_style)

974
vigil Executable file
View file

@ -0,0 +1,974 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
"""\
Produces a set of reports for every file in a directory tree.
The reports are produced by many existing command-line tools.
The state of each report is also summarised by a status indicator.
The possible states are listed below.
A report is viewed by selecting its status indicator with the cursor.
Reports are recalculated whenever files are changed, added, or deleted, and so
are kept up to date. (optional)
The reports are cached in a directory named ".vigil" under the target
directory.
Usage: vigil <root_path>
e.g. # vigil my_project
Keys:
*h - Show the help screen. (toggle)
*d, *c, *j, *k - Move the cursor up, down, left and right.
*D, *C, *J, *K - Scroll the result pane up, down, left and right.
*t - Turn the result pane to portrait or landscape orientation. (toggle)
*l - Show the activity log. (toggle)
*n - Move to the next issue.
*N - Move to the next issue of the current tool.
*o - Order files by type, or by directory location. (toggle)
*w - Watch the filesystem for changes. (toggle)
*s - Change the appearance of result statuses. (toggle)
*q - Quit.
"""
import asyncio
import collections
import curses
import functools
import gc
import gzip
import importlib
import multiprocessing
import os
import pickle
import shutil
import signal
import subprocess
import sys
import threading
import time
import traceback
import pyinotify
import fill3
import terminal
import termstr
import tools
def _log_error(message=None):
message = traceback.format_exc() if message is None else message + "\n"
with open("/home/ahamilton/vigil.log", "a") as log_file:
log_file.write(message)
_CACHE_PATH = ".vigil"
def lru_cache_with_eviction(maxsize=128, typed=False):
versions = {}
make_key = functools._make_key
def evict(*args, **kwds):
key = make_key(args, kwds, typed)
if key in versions:
versions[key] += 1
def decorating_function(user_function):
def remove_version(*args, **kwds):
return user_function(*args[1:], **kwds)
new_func = functools.lru_cache(maxsize=maxsize, typed=typed)(
remove_version)
def add_version(*args, **kwds):
key = make_key(args, kwds, typed)
return new_func(*((versions.setdefault(key, 0),) + args), **kwds)
add_version.versions = versions
add_version.cache_info = new_func.cache_info
add_version.evict = evict
return functools.update_wrapper(add_version, user_function)
return decorating_function
def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL,
open=open):
tmp_path = path + ".tmp"
try:
with open(tmp_path, "wb") as file_:
pickle.dump(object_, file_, protocol=protocol)
except OSError:
os.remove(tmp_path)
else:
os.rename(tmp_path, path)
def multiprocessing_process(func, *args, **kwargs):
def wrapper(child_conn, func, args, **kwargs):
result = func(*args, **kwargs)
child_conn.send(result)
child_conn.close()
parent_conn, child_conn = multiprocessing.Pipe()
process = multiprocessing.Process(
target=wrapper, args=(child_conn, func, args), kwargs=kwargs,
daemon=True)
process.start()
process.result_conn = parent_conn
return process
def status_to_str(status, is_status_simple):
if isinstance(status, int): # is a status enumeration
dict_ = (tools._STATUS_TO_TERMSTR_SIMPLE if is_status_simple
else tools._STATUS_TO_TERMSTR)
return dict_[status]
else:
return status
class Result:
def __init__(self, path, tool, is_stored_compressed=True):
self.path = path
self.tool = tool
self._open_func = gzip.open if is_stored_compressed else open
self.pickle_path = os.path.join(_CACHE_PATH,
path + "-" + tool.__name__)
self.scroll_position = (0, 0)
self.is_completed = False
self.reset()
def __del__(self):
try:
os.remove(self.pickle_path)
except FileNotFoundError:
pass
@property
@lru_cache_with_eviction(maxsize=50)
def result(self):
unknown_label = fill3.Text("?")
if self.is_placeholder:
return unknown_label
try:
with self._open_func(self.pickle_path, "rb") as pickle_file:
return pickle.load(pickle_file)
except FileNotFoundError:
return unknown_label
@result.setter
def result(self, value):
os.makedirs(os.path.dirname(self.pickle_path), exist_ok=True)
dump_pickle_safe(value, self.pickle_path, open=self._open_func)
Result.result.fget.evict(self)
def set_status(self, status, appearance_changed_event):
self.status = status
appearance_changed_event.set()
self.entry.appearance_cache = None
def run(self, log, appearance_changed_event):
self.is_placeholder = False
tool_name = tools._tool_name_colored(self.tool, self.path)
path_colored = tools._path_colored(self.path)
log.log_message(["Running ", tool_name, " on ", path_colored, "."])
self.set_status(tools.Status.running, appearance_changed_event)
start_time = time.time()
self.process = multiprocessing_process(
tools.run_tool_no_error, self.path, self.tool)
new_status, result = self.process.result_conn.recv()
self.status, self.result = new_status, result
self.process = None
end_time = time.time()
self.set_status(new_status, appearance_changed_event)
self.is_completed = True
log.log_message(
["Finished running ", tool_name, " on ", path_colored, ". ",
status_to_str(new_status, self.entry.summary.is_status_simple),
" %s secs" % round(end_time - start_time, 2)])
def reset(self):
self.is_placeholder = True
self.status = tools.Status.empty
try:
self.process.terminate()
except AttributeError:
pass
self.process = None
def appearance_min(self):
return [status_to_str(self.status,
self.entry.summary.is_status_simple)]
def reverse_style(style):
return termstr.CharStyle(style.bg_color, style.fg_color, style.is_bold,
style.is_underlined)
class Entry(collections.UserList):
def __init__(self, path, results, summary, highlighted=None,
set_results=True):
collections.UserList.__init__(self, results)
self.path = path
self.summary = summary
self.highlighted = highlighted
self.widgets = self.data
if set_results:
# FIX: this is missed for entries appended later
for result in results:
result.entry = self
self.widget = fill3.Row(results)
self.appearance_cache = None
def appearance_min(self):
# 'appearance' local variable exists because appearance_cache can
# become None at any time.
appearance = self.appearance_cache
if appearance is None:
if self.highlighted is not None:
if self.summary.is_status_simple:
cursor = fill3.Text("●")
else:
cursor = fill3.Style(self.widget[self.highlighted],
reverse_style)
self.widget[self.highlighted] = cursor
new_appearance = self.widget.appearance_min()
path = tools._path_colored(self.path)
padding = " " * (self.summary._max_path_length - len(path) + 1)
new_appearance[0] = path + padding + new_appearance[0]
self.appearance_cache = appearance = new_appearance
return appearance
def is_filename_excluded(filename):
return filename.startswith(".")
def codebase_files(path, skip_hidden_directories=True):
for (dirpath, dirnames, filenames) in os.walk(path):
if skip_hidden_directories:
filtered_dirnames = [dirname for dirname in dirnames
if not is_filename_excluded(dirname)]
dirnames[:] = filtered_dirnames
for filename in filenames:
if not is_filename_excluded(filename):
yield os.path.join(dirpath, filename)
def fix_paths(root_path, paths):
return [os.path.join(".", os.path.relpath(path, root_path))
for path in paths]
def change_background(str_, new_background):
def change_background_style(style):
new_bg = (new_background if style.bg_color == termstr.Color.black
else style.bg_color)
return termstr.CharStyle(style.fg_color, new_bg, style.is_bold,
style.is_underlined)
return termstr.TermStr(str_).transform_style(change_background_style)
class Summary:
def __init__(self, root_path, jobs_added_event):
self._root_path = root_path
self._jobs_added_event = jobs_added_event
self._view_widget = fill3.View.from_widget(self)
self.__cursor_position = (0, 0)
self.closest_placeholder_generator = None
self._lock = threading.Lock()
self._cache = {}
self.is_status_simple = False
self.is_directory_sort = True
self._max_width = None
self._max_path_length = None
self.sync_with_filesystem()
@property
def _cursor_position(self):
return self.__cursor_position
@_cursor_position.setter
def _cursor_position(self, new_position):
if new_position != self.__cursor_position:
self.__cursor_position = new_position
self.closest_placeholder_generator = None
def sync_with_filesystem(self, sync_tools=True, sync_paths=True):
if sync_tools:
importlib.reload(tools)
x, y = self._cursor_position
try:
old_path = self.get_selection().path
except AttributeError:
old_path = None
new_column = fill3.Column([])
new_cache = {}
if sync_paths:
paths = fix_paths(self._root_path,
codebase_files(self._root_path))
self._paths = paths
self.sort(self.is_directory_sort)
else:
paths = self._paths
jobs_added = False
new_cursor_position = (0, 0)
row_index = 0
result_total, completed_total = 0, 0
for path in paths:
full_path = os.path.join(self._root_path, path)
try:
key = (path, os.stat(full_path).st_ctime)
except FileNotFoundError:
continue
if path == old_path:
new_cursor_position = (x, row_index)
row = []
for tool in tools.tools_for_path(path):
cache_key = (key, tool.__name__, tool.__code__.co_code)
if cache_key in self._cache:
result = self._cache[cache_key]
result.tool = tool
else:
result = Result(path, tool)
jobs_added = True
if result.is_completed:
completed_total += 1
new_cache[cache_key] = result
row.append(result)
new_column.append(Entry(path, row, self))
row_index += 1
result_total += len(row)
max_width = max(len(row) for row in new_column)
max_path_length = max(len(path) for path in paths) - len("./")
self._column, self._cache, self._cursor_position, self.result_total, \
self.completed_total, self._max_width, self._max_path_length, \
self.closest_placeholder_generator = (
new_column, new_cache, new_cursor_position, result_total,
completed_total, max_width, max_path_length, None)
if jobs_added:
self._jobs_added_event.set()
# Delete the stale results from the disk now, to avoid accidently
# deleting a future result with the same filename. See Result.__del__.
gc.collect()
def placeholder_spiral(self):
x, y = self.cursor_position()
result = self._column[y][x]
if result.is_placeholder:
yield result
for lap in range(max(len(self._column), self._max_width)):
y -= 1
for dx, dy in [(1, 1), (-1, 1), (-1, -1), (1, -1)]:
for move in range(lap + 1):
x += dx
y += dy
try:
result = self._column[y][x]
except IndexError:
continue
if result.is_placeholder:
yield result
def get_closest_placeholder(self):
with self._lock:
try:
return self.closest_placeholder_generator.send(None)
except AttributeError:
self.closest_placeholder_generator = self.placeholder_spiral()
return self.closest_placeholder_generator.send(None)
def appearance_dimensions(self):
status_width = 1 if self.is_status_simple else 2
width = self._max_path_length + 1 + status_width * self._max_width
return width, len(self._column)
def appearance_interval(self, interval):
start_y, end_y = interval
x, y = self.cursor_position()
rows = fill3.Column(self._column.widgets)
rows[y] = Entry(rows[y].path, rows[y].widgets, self, highlighted=x,
set_results=False)
return rows.appearance_interval(interval)
def appearance(self, dimensions):
width, height = dimensions
x, y = self.cursor_position()
status_width = 1 if self.is_status_simple else 2
screen_x, screen_y = self._max_path_length + 1 + x * status_width, y
width, height = width - 1, height - 1 # Minus one for the scrollbars
scroll_y = (screen_y // height) * height
self._view_widget.position = ((screen_x // width) * width, scroll_y)
appearance = self._view_widget.appearance(dimensions)
appearance[screen_y - scroll_y] = change_background(
appearance[screen_y - scroll_y], termstr.Color.grey_50)
return appearance
def cursor_position(self):
x, y = self._cursor_position
return min(x, len(self._column[y])-1), y
def get_selection(self):
x, y = self.cursor_position()
return self._column[y][x]
def _move_cursor(self, dx, dy):
if dy == 0:
x, y = self.cursor_position()
self._cursor_position = ((x + dx) % len(self._column[y]), y)
elif dx == 0:
x, y = self._cursor_position
self._cursor_position = (x, (y + dy) % len(self._column))
else:
raise ValueError
def cursor_right(self):
self._move_cursor(1, 0)
def cursor_left(self):
self._move_cursor(-1, 0)
def cursor_up(self):
self._move_cursor(0, -1)
def cursor_down(self):
self._move_cursor(0, 1)
def _issue_generator(self):
x, y = self.cursor_position()
for index in range(len(self._column) + 1):
row_index = (index + y) % len(self._column)
row = self._column[row_index]
for index_x, result in enumerate(row):
if (result.status == tools.Status.failure and
not (row_index == y and index_x <= x and
index != len(self._column))):
yield result, (index_x, row_index)
def move_to_next_issue(self):
try:
issue, self._cursor_position = self._issue_generator().send(None)
except StopIteration:
pass
def move_to_next_issue_of_tool(self):
current_tool = self.get_selection().tool
for issue, position in self._issue_generator():
if issue.tool == current_tool:
self._cursor_position = position
return
def toggle_status_style(self):
self.is_status_simple = not self.is_status_simple
self.sync_with_filesystem(sync_tools=False, sync_paths=False)
def sort(self, is_directory_sort):
def directory_sort(path):
return (os.path.dirname(path), tools.splitext(path)[1],
os.path.basename(path))
def type_sort(path):
return (tools.splitext(path)[1], os.path.dirname(path),
os.path.basename(path))
key_func = directory_sort if is_directory_sort else type_sort
self._paths.sort(key=key_func)
self.is_directory_sort = is_directory_sort
self.sync_with_filesystem(sync_tools=False, sync_paths=False)
class Log:
GREY_BOLD_STYLE = termstr.CharStyle(termstr.Color.grey_100, is_bold=True)
GREEN_STYLE = termstr.CharStyle(termstr.Color.green)
def __init__(self, appearance_changed_event):
self._appearance_changed_event = appearance_changed_event
self.widget = fill3.Column([])
self.portal = fill3.Portal(self.widget)
self._appearance_cache = None
def log_message(self, message, timestamp=None, char_style=None):
if isinstance(message, list):
message = [part[1] if isinstance(part, tuple) else part
for part in message]
message = fill3.join("", message)
if char_style is not None:
message = termstr.TermStr(message, char_style)
timestamp = (time.strftime("%H:%M:%S", time.localtime())
if timestamp is None else timestamp)
label = fill3.Text(termstr.TermStr(timestamp, Log.GREY_BOLD_STYLE) +
" " + message)
self.widget.append(label)
self.widget.widgets = self.widget[-200:]
self._appearance_cache = None
self._appearance_changed_event.set()
def log_command(self, message, timestamp=None):
self.log_message(message, char_style=Log.GREEN_STYLE)
def appearance_min(self):
appearance = self._appearance_cache
if appearance is None:
self._appearance_cache = appearance = self.widget.appearance_min()
return appearance
def appearance(self, dimensions):
width, height = dimensions
full_appearance = self.appearance_min()
self.portal.position = (0, max(0, len(full_appearance) - height))
return self.portal.appearance(dimensions)
def _highlight_chars(str_, style, marker="*"):
parts = str_.split(marker)
highlighted_parts = [termstr.TermStr(part[0], style) + part[1:]
for part in parts[1:] if part != ""]
return fill3.join("", [parts[0]] + highlighted_parts)
class Help:
def __init__(self, summary, screen):
self.summary = summary
self.screen = screen
self.body = fill3.Placeholder()
self.view = fill3.View.from_widget(self.body)
self.widget = fill3.Border(self.view, title="Help")
self.usage = _highlight_chars(__doc__, Log.GREEN_STYLE)
portal = self.view.portal
self.key_map = {"h": self.exit_help, "d": portal.scroll_up,
"c": portal.scroll_down, "j": portal.scroll_left,
"k": portal.scroll_right, "q": self.exit_help}
def exit_help(self):
self.screen._is_help_visible = False
def on_keypressed(self):
try:
action = self.key_map[sys.stdin.read(1)]
except KeyError:
pass
else:
action()
def appearance(self, dimensions):
text = fill3.join(
"\n", [self.usage, "Statuses:"] +
[" " + status_to_str(status, self.summary.is_status_simple) +
" " + meaning for status, meaning in tools.STATUS_MEANINGS])
self.body.widget = fill3.Text(text)
return self.widget.appearance(dimensions)
class Listing:
def __init__(self, view):
self.view = view
self.last_dimensions = None
def appearance(self, dimensions):
self.last_dimensions = dimensions
return self.view.appearance(dimensions)
def add_watch_manager_to_mainloop(watch_manager, mainloop):
notifier = pyinotify.Notifier(watch_manager)
def on_inotify():
notifier.read_events()
notifier.process_events()
mainloop.add_reader(watch_manager.get_fd(), on_inotify)
def is_path_excluded(path):
return any(part.startswith(".") for part in path.split(os.path.sep))
class Screen:
def __init__(self, summary, log, appearance_changed_event, main_loop):
self._summary = summary
self._log = log
self._appearance_changed_event = appearance_changed_event
self._main_loop = main_loop
self._is_listing_portrait = True
self._is_log_visible = True
self._is_help_visible = False
self._is_watching_filesystem = False
self.toggle_watch_filesystem()
self._make_widgets()
self._make_keymap()
def make_watch_manager(self):
def on_filesystem_change(event):
self._log.log_message("Filesystem changed.")
self._summary.sync_with_filesystem(sync_tools=False)
self._appearance_changed_event.set()
def on_tools_change(event):
self._log.log_message("Tools changed.")
self._summary.sync_with_filesystem(sync_paths=False)
self._appearance_changed_event.set()
watch_manager = pyinotify.WatchManager()
event_mask = (pyinotify.IN_CREATE | pyinotify.IN_DELETE |
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_ATTRIB)
watch_manager.add_watch(
self._summary._root_path, event_mask, rec=True, auto_add=True,
proc_fun=on_filesystem_change, exclude_filter=lambda path:
is_path_excluded(path))
watch_manager.add_watch(tools.__file__, event_mask,
proc_fun=on_tools_change)
self._watch_manager = watch_manager
add_watch_manager_to_mainloop(self._watch_manager, self._main_loop)
def _partition(self, widgets, height):
smaller_height = max(height // 4, 10)
return [height - smaller_height, smaller_height]
def _partition_2(self, widgets, height):
smaller_height = max(height // 4, 10)
return [smaller_height, height - smaller_height]
def _make_widgets(self):
self._help_widget = Help(self._summary, self)
root_path = os.path.basename(self._summary._root_path)
summary = fill3.Border(self._summary, title="Summary of " + root_path)
selected_widget = self._summary.get_selection()
self._view = fill3.View.from_widget(selected_widget.result)
self._listing = fill3.Border(Listing(self._view))
log = fill3.Border(self._log, title="Log")
port_log = fill3.Row([fill3.Column([summary, log], self._partition),
self._listing])
land_log = fill3.Column([fill3.Row([summary, log]), self._listing],
self._partition_2)
port_no_log = fill3.Row([summary, self._listing])
land_no_log = fill3.Column([summary, self._listing], self._partition_2)
self._layouts = [[land_no_log, port_no_log], [land_log, port_log]]
def _make_keymap(self):
key_map = {}
for keys, action in self._KEY_DATA:
for key in keys:
key_map[key] = action
self._key_map = key_map
def toggle_help(self):
self._is_help_visible = not self._is_help_visible
def toggle_log(self):
self._is_log_visible = not self._is_log_visible
def toggle_window_orientation(self):
self._is_listing_portrait = not self._is_listing_portrait
def cursor_up(self):
self._summary.cursor_up()
def cursor_down(self):
self._summary.cursor_down()
def cursor_right(self):
self._summary.cursor_right()
def cursor_left(self):
self._summary.cursor_left()
def _move_listing(self, dx, dy):
listing_width, listing_height = self._listing.widget.last_dimensions
selected_widget = self._summary.get_selection()
x, y = selected_widget.scroll_position
selected_widget.scroll_position = \
(max(x + dx * (listing_width // 2), 0),
max(y + dy * (listing_height // 2), 0))
def listing_up(self):
self._move_listing(0, -1)
def listing_down(self):
self._move_listing(0, 1)
def listing_right(self):
self._move_listing(1, 0)
def listing_left(self):
self._move_listing(-1, 0)
def move_to_next_issue(self):
self._summary.move_to_next_issue()
def move_to_next_issue_of_tool(self):
self._summary.move_to_next_issue_of_tool()
def edit_file(self):
path = self._summary.get_selection().path
path_colored = tools._path_colored(path)
self._log.log_message("Editing " + path_colored + " in emacs.")
subprocess.Popen(["emacsclient", path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def toggle_status_style(self):
self._summary.toggle_status_style()
def toggle_sort(self):
new_sort = not self._summary.is_directory_sort
sort_order = ("directory then type" if new_sort
else "type then directory")
self._log.log_command("Ordering files by %s." % sort_order)
self._summary.sort(new_sort)
def toggle_watch_filesystem(self):
self._is_watching_filesystem = not self._is_watching_filesystem
self._log.log_command("Watching the filesystem for changes."
if self._is_watching_filesystem else
"Stopped watching the filesystem.")
if self._is_watching_filesystem:
self._summary.sync_with_filesystem()
self.make_watch_manager()
else:
self._main_loop.remove_reader(self._watch_manager.get_fd())
self._watch_manager = None
def quit_(self):
raise KeyboardInterrupt
def on_mouse_event(self, event):
if event[0] not in ["mouse press", "mouse drag"]:
return
if event[1] == 4: # Mouse wheel up
self.listing_up()
self._appearance_changed_event.set()
return
if event[1] == 5: # Mouse wheel down
self.listing_down()
self._appearance_changed_event.set()
return
x, y = event[2:4]
border_width = 1
view_width, view_height = \
self._summary._view_widget.portal.last_dimensions
if x < border_width or y < border_width or x > view_width or \
y > view_height:
return
status_width = 1 if self._summary.is_status_simple else 2
view_x, view_y = self._summary._view_widget.portal.position
spacer = 1
column_index = (x - self._summary._max_path_length - spacer -
border_width + view_x) // status_width
row_index = y - border_width + view_y
if row_index >= len(self._summary._column):
return
row = self._summary._column[row_index]
if column_index < 0 or column_index >= len(row):
return
new_position = column_index, row_index
if new_position != self._summary._cursor_position:
self._summary._cursor_position = new_position
self._appearance_changed_event.set()
def on_keypressed(self, urwid_screen):
if self._is_help_visible:
self._help_widget.on_keypressed()
self._appearance_changed_event.set()
return
events = urwid_screen.get_input()
for event in events:
if type(event) == tuple:
self.on_mouse_event(event)
continue
try:
action = self._key_map[event]
except KeyError:
pass
else:
action(self)
self._appearance_changed_event.set()
_STATUS_BAR = _highlight_chars(" *help *quit *d,*c,*j,*k:navigate *turn"
" *log *edit *next *watch *order *statuses",
Log.GREEN_STYLE)
@functools.lru_cache(maxsize=2)
def _get_status_bar_appearance(self, width, is_directory_sort,
is_watching_filesystem, progress_bar_size):
ordering_text = "directory" if is_directory_sort else "type "
watching_text = "watching" if is_watching_filesystem else "--------"
indicators = " %s order:%s " % (watching_text, ordering_text)
spacing = " " * (width - len(self._STATUS_BAR) - len(indicators))
bar = (self._STATUS_BAR[:width - len(indicators)] + spacing +
indicators)[:width]
return [bar[:progress_bar_size].underline() + bar[progress_bar_size:]]
def appearance(self, dimensions):
width, height = dimensions
if self._is_help_visible:
return self._help_widget.appearance(dimensions)
widget = self._summary.get_selection()
view = self._listing.widget.view
view.position = widget.scroll_position
view.widget = widget.result
tool_name = tools._tool_name_colored(widget.tool, widget.path)
self._listing.title = (
tools._path_colored(widget.path) + " ─── " + tool_name + " " +
status_to_str(widget.status, self._summary.is_status_simple))
incomplete = self._summary.result_total - self._summary.completed_total
progress_bar_size = max(0, width * incomplete //
self._summary.result_total)
status_bar_appearance = self._get_status_bar_appearance(
width, self._summary.is_directory_sort,
self._is_watching_filesystem, progress_bar_size)
return (self._layouts[self._is_log_visible][self._is_listing_portrait]
.appearance((width, height-len(status_bar_appearance))) +
status_bar_appearance)
_KEY_DATA = [
({"t"}, toggle_window_orientation), ({"l"}, toggle_log),
({"h"}, toggle_help), ({"d", "up"}, cursor_up),
({"c", "down"}, cursor_down), ({"j", "left"}, cursor_left),
({"k", "right"}, cursor_right), ({"D", "page up"}, listing_up),
({"C", "page down"}, listing_down), ({"J", "home"}, listing_left),
({"K", "end"}, listing_right), ({"o"}, toggle_sort),
({"n"}, move_to_next_issue), ({"N"}, move_to_next_issue_of_tool),
({"e"}, edit_file), ({"s"}, toggle_status_style),
({"w"}, toggle_watch_filesystem), ({"q"}, quit_),
({curses.KEY_MOUSE}, on_mouse_event)]
class Runner:
def __init__(self):
self.result = None
self.is_running = True
def job_runner(self, summary, log, jobs_added_event,
appearance_changed_event):
while True:
jobs_added_event.wait()
while self.is_running:
try:
self.result = summary.get_closest_placeholder()
except StopIteration:
log.log_message("All results are up to date.")
break
try:
self.result.run(log, appearance_changed_event)
summary.completed_total += 1
except EOFError: # Occurs if the process is terminated
pass
jobs_added_event.clear()
_UPDATE_THREAD_STOPPED = False
def update_screen(main_widget, appearance_changed_event):
while True:
appearance_changed_event.wait()
appearance_changed_event.clear()
if _UPDATE_THREAD_STOPPED:
break
fill3.patch_screen(main_widget)
def main(root_path, urwid_screen):
global _UPDATE_THREAD_STOPPED
os.chdir(root_path) # FIX: Don't change directory if possible.
loop = asyncio.get_event_loop()
jobs_added_event = threading.Event()
appearance_changed_event = threading.Event()
try:
pickle_path = os.path.join(_CACHE_PATH, ".summary.pickle")
with gzip.open(pickle_path, "rb") as file_:
screen = pickle.load(file_)
except FileNotFoundError:
summary = Summary(root_path, jobs_added_event)
log = Log(appearance_changed_event)
screen = Screen(summary, log, appearance_changed_event, loop)
else:
screen._appearance_changed_event = appearance_changed_event
screen._main_loop = loop
if screen._is_watching_filesystem:
screen.make_watch_manager()
summary = screen._summary
summary._lock = threading.Lock()
summary._jobs_added_event = jobs_added_event
log = screen._log
log._appearance_changed_event = appearance_changed_event
if screen._is_watching_filesystem:
summary.sync_with_filesystem()
log.log_message("Program started.")
jobs_added_event.set()
update_display_thread = threading.Thread(
target=update_screen, args=(screen, appearance_changed_event),
daemon=True)
update_display_thread.start()
loop.add_reader(sys.stdin, screen.on_keypressed, urwid_screen)
runners = [Runner() for index in range(multiprocessing.cpu_count() * 2)]
screen.runners = runners
for runner in runners:
args = (summary, log, jobs_added_event, appearance_changed_event)
threading.Thread(target=runner.job_runner, args=args,
daemon=True).start()
def on_window_resize(n, frame):
appearance_changed_event.set()
signal.signal(signal.SIGWINCH, on_window_resize)
appearance_changed_event.set()
try:
loop.run_forever()
except KeyboardInterrupt:
log.log_message("Program stopped.")
_UPDATE_THREAD_STOPPED = True
appearance_changed_event.set()
update_display_thread.join()
for runner in runners:
runner.is_running = False
runner.result.reset()
# Cannot pickle generators, locks, sockets or events.
summary.closest_placeholder_generator = None
summary._lock = None
summary._jobs_added_event = None
screen._appearance_changed_event = None
screen._main_loop = None
screen._watch_manager = None
log._appearance_changed_event = None
open_compressed = functools.partial(gzip.open, compresslevel=1)
dump_pickle_safe(screen, pickle_path, open=open_compressed)
def manage_cache(root_path):
cache_path = os.path.join(root_path, _CACHE_PATH)
timestamp_path = os.path.join(cache_path, ".creation-time")
if os.path.exists(cache_path) and \
os.stat(__file__).st_mtime > os.stat(timestamp_path).st_mtime:
print("Vigil has been updated, so clearing the cache and"
" recalculating all results...")
shutil.rmtree(cache_path)
if not os.path.exists(cache_path):
os.mkdir(cache_path)
open(timestamp_path, "w").close()
if __name__ == "__main__":
if len(sys.argv) == 2:
root_path = os.path.abspath(sys.argv[1])
with terminal.console_title("vigil: " + os.path.basename(root_path)):
manage_cache(root_path)
with terminal.hidden_cursor():
with terminal.urwid_screen() as urwid_screen:
main(root_path, urwid_screen)
else:
usage = __doc__.replace("*", "")
print(usage)

1
vigil.py Symbolic link
View file

@ -0,0 +1 @@
vigil

235
vigil_test.py Executable file
View file

@ -0,0 +1,235 @@
#!/usr/bin/env python3
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import os
import shutil
import tempfile
import threading
# import time
import unittest
import fill3
import golden
import vigil
class LruCacheWithEvictionTestCase(unittest.TestCase):
def _assert_cache(self, func, hits, misses, current_size):
cache_info = func.cache_info()
self.assertEqual(cache_info.hits, hits)
self.assertEqual(cache_info.misses, misses)
self.assertEqual(cache_info.currsize, current_size)
def test_lru_cache_with_eviction(self):
@vigil.lru_cache_with_eviction()
def a(foo):
return foo
self._assert_cache(a, 0, 0, 0)
self.assertEqual(a(1), 1)
self._assert_cache(a, 0, 1, 1)
a(1)
self._assert_cache(a, 1, 1, 1)
a.evict(1)
self._assert_cache(a, 1, 1, 1)
a(1)
self._assert_cache(a, 1, 2, 2)
class MultiprocessingWrapperTestCase(unittest.TestCase):
def test_multiprocessing_wrapper(self):
def a(b, c, d=1):
return b + c + d
process = vigil.multiprocessing_process(a, 1, 2)
result = process.result_conn.recv()
process.join()
self.assertEqual(result, 4)
_DIMENSIONS = (40, 40)
def _widget_to_string(widget, dimensions=_DIMENSIONS):
appearance = (widget.appearance_min() if dimensions is None
else widget.appearance(dimensions))
return str(fill3.join("\n", appearance))
def touch(path):
open(path, "w").close()
def assert_widget_appearance(widget, golden_path, dimensions=_DIMENSIONS):
golden.assertGolden(_widget_to_string(widget, dimensions), golden_path)
class MockMainLoop:
def add_reader(self, foo, bar):
pass
class MainTestCase(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
foo_path = os.path.join(self.temp_dir, "foo.py")
touch(foo_path)
jobs_added_event = threading.Event()
appearance_changed_event = threading.Event()
summary = vigil.Summary(self.temp_dir, jobs_added_event)
log = vigil.Log(appearance_changed_event)
self.main_widget = vigil.Screen(summary, log, appearance_changed_event,
MockMainLoop())
def tearDown(self):
shutil.rmtree(self.temp_dir)
# def test_initial_appearance(self):
# assert_widget_appearance(self.main_widget, "golden-files/initial")
def test_help_appearance(self):
self.main_widget.toggle_help()
assert_widget_appearance(self.main_widget, "golden-files/help")
# def test_log_appearance(self):
# log_shown = _widget_to_string(self.main_widget)
# self.main_widget.toggle_log()
# log_hidden = _widget_to_string(self.main_widget)
# actual = "shown:\n%s\nhidden:\n%s" % (log_shown, log_hidden)
# golden.assertGolden(actual, "golden-files/log")
# def test_window_orientation(self):
# window_left_right = _widget_to_string(self.main_widget)
# self.main_widget.toggle_window_orientation()
# window_top_bottom = _widget_to_string(self.main_widget)
# actual = ("left-right:\n%s\ntop-bottom:\n%s" %
# (window_left_right, window_top_bottom))
# golden.assertGolden(actual, "golden-files/window-orientation")
class SummaryCursorTest(unittest.TestCase):
def setUp(self):
self.original_method = vigil.Summary.sync_with_filesystem
vigil.Summary.sync_with_filesystem = lambda foo: None
self.summary = vigil.Summary(None, None)
self.summary._column = [[1, 1, 1], [1, 1], [1, 1, 1]]
def tearDown(self):
vigil.Summary.sync_with_filesystem = self.original_method
def _assert_movements(self, movements):
for movement, expected_position in movements:
movement()
self.assertEqual(self.summary.cursor_position(), expected_position)
def test_cursor_movement(self):
self.assertEqual(self.summary.cursor_position(), (0, 0))
self._assert_movements([(self.summary.cursor_right, (1, 0)),
(self.summary.cursor_down, (1, 1)),
(self.summary.cursor_left, (0, 1)),
(self.summary.cursor_up, (0, 0))])
def test_cursor_wrapping(self):
self._assert_movements([(self.summary.cursor_up, (0, 2)),
(self.summary.cursor_down, (0, 0)),
(self.summary.cursor_left, (2, 0)),
(self.summary.cursor_right, (0, 0))])
def test_cursor_moving_between_different_sized_rows(self):
self.summary._cursor_position = (2, 0)
self._assert_movements([(self.summary.cursor_down, (1, 1)),
(self.summary.cursor_down, (2, 2))])
class SummarySyncWithFilesystem(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.foo_path = os.path.join(self.temp_dir, "foo")
self.bar_path = os.path.join(self.temp_dir, "bar")
self.zoo_path = os.path.join(self.temp_dir, "zoo")
touch(self.foo_path)
touch(self.bar_path)
self.jobs_added_event = threading.Event()
self.appearance_changed_event = threading.Event()
self.summary = vigil.Summary(self.temp_dir, self.jobs_added_event)
self.jobs_added_event.clear()
def tearDown(self):
shutil.rmtree(self.temp_dir)
def _assert_paths(self, expected_paths):
actual_paths = [entry[0].path for entry in self.summary._column]
self.assertEqual(actual_paths, expected_paths)
def test_summary_initial_state(self):
self._assert_paths(["./bar", "./foo"])
self.assertFalse(self.jobs_added_event.isSet())
def test_sync_removed_file(self):
os.remove(self.foo_path)
self._assert_paths(["./bar", "./foo"])
self.summary.sync_with_filesystem()
self._assert_paths(["./bar"])
self.assertFalse(self.jobs_added_event.isSet())
def test_sync_added_file(self):
touch(self.zoo_path)
self.summary.sync_with_filesystem()
self._assert_paths(["./bar", "./foo", "./zoo"])
self.assertTrue(self.jobs_added_event.isSet())
# def test_sync_changed_file_metadata(self):
# ids_before = [id(entry) for entry in self.summary._column]
# time.sleep(1)
# touch(self.foo_path)
# self.summary.sync_with_filesystem()
# ids_after = [id(entry) for entry in self.summary._column]
# self.assertTrue(ids_before[0] == ids_after[0]) # bar
# self.assertTrue(ids_before[1] != ids_after[1]) # foo
# self.assertTrue(self.jobs_added_event.isSet())
# def test_sync_same_objects(self):
# ids_before = [id(entry) for entry in self.summary._column]
# self.summary.sync_with_filesystem()
# ids_after = [id(entry) for entry in self.summary._column]
# self.assertTrue(ids_before == ids_after)
# self.assertFalse(self.jobs_added_event.isSet())
def test_sync_linked_files(self):
"""Symbolic and hard-linked files are given distinct entry objects"""
baz_path = os.path.join(self.temp_dir, "baz")
os.symlink(self.foo_path, baz_path)
os.link(self.foo_path, self.zoo_path)
self.summary.sync_with_filesystem()
self._assert_paths(["./bar", "./baz", "./foo", "./zoo"])
self.assertTrue(id(self.summary._column[1]) != # baz
id(self.summary._column[2])) # foo
self.assertTrue(id(self.summary._column[2]) != # foo
id(self.summary._column[3])) # zoo
self.assertTrue(self.jobs_added_event.isSet())
# class LogTestCase(unittest.TestCase):
# def test_log(self):
# appearance_changed_event = threading.Event()
# log = vigil.Log(appearance_changed_event)
# assert_widget_appearance(log, "golden-files/log-initial", None)
# timestamp = "11:11:11"
# self.assertFalse(appearance_changed_event.isSet())
# log.log_message("foo", timestamp=timestamp)
# self.assertTrue(appearance_changed_event.isSet())
# assert_widget_appearance(log, "golden-files/log-one-message", None)
# log.log_message("bar", timestamp=timestamp)
# assert_widget_appearance(log, "golden-files/log-two-messages", None)
# assert_widget_appearance(log, "golden-files/log-appearance")
if __name__ == "__main__":
golden.main()