import os
import rich
import ubelt as ub
import networkx as nx
from xdev.patterns import MultiPattern
from progiter.manager import ProgressManager
[docs]
class DirectoryWalker:
"""
Configurable directory walker that can explore a directory
and report information about its contents in a concise manner.
Options will impact how long this process takes based on how much data /
metadata we need to parse out of the filesystem.
Ignore:
>>> from xdev.directory_walker import * # NOQA
>>> self = DirectoryWalker('.', exclude_dnames=['.*'])
>>> self._walk()
>>> self._update_labels()
>>> self.write_network_text()
"""
def __init__(self,
dpath,
exclude_dnames=None,
exclude_fnames=None,
include_dnames=None,
include_fnames=None,
max_walk_depth=None,
max_files=None,
parse_content=False,
show_progress=True,
ignore_empty_dirs=False,
sort=False,
fs=None,
**kwargs):
"""
Args:
dpath (str | PathLike): the path to walk
exclude_dnames (Coercable[MultiPattern]):
blocks directory names matching this pattern
exclude_fnames (Coercable[MultiPattern]):
blocks file names matching this pattern
include_dnames (Coercable[MultiPattern]):
if specified, excludes directories that do NOT match this pattern.
include_fnames (Coercable[MultiPattern]):
if specified, excludes files that do NOT match this pattern.
max_files (None | int):
ignore all files in directories with more than this number.
max_walk_depth (None | int):
how far to recurse
parse_content (bool):
if True, include content analysis
sort (bool):
if True, sort files and directories before adding them to the
graph.
fs (fsspec.spec.AbstractFileSystem):
experimental: an fsspec filesystem
**kwargs : passed to label options
"""
if 'block_fnames' in kwargs:
ub.schedule_deprecation(
'xdev', 'DirectoryWalker block_fnames', 'arg',
migration='Use exclude_fnames instead',
deprecate='now',
)
if exclude_fnames is not None:
raise ValueError('mutex with block_fnames')
exclude_fnames = kwargs.pop('block_fnames')
if 'block_dnames' in kwargs:
ub.schedule_deprecation(
'xdev', 'DirectoryWalker block_dnames', 'arg',
migration='Use exclude_dnames instead',
deprecate='now',
)
if exclude_dnames is not None:
raise ValueError('mutex with block_dnames')
exclude_dnames = kwargs.pop('block_dnames')
self.dpath = ub.Path(dpath).absolute()
self.exclude_fnames = _null_coerce(MultiPattern, exclude_fnames)
self.exclude_dnames = _null_coerce(MultiPattern, exclude_dnames)
self.include_fnames = _null_coerce(MultiPattern, include_fnames)
self.include_dnames = _null_coerce(MultiPattern, include_dnames)
self.max_walk_depth = max_walk_depth
self.parse_content = parse_content
self.max_files = max_files
self.show_progress = show_progress
self.ignore_empty_dirs = ignore_empty_dirs
self.sort = sort
kwargs = ub.udict(kwargs)
self.label_options = {
'abs_root_label': True,
'pathstyle': 'name',
'show_nfiles': 'auto',
'show_types': False,
'colors': True,
}
self.label_options.update(kwargs & self.label_options)
kwargs -= self.label_options
if kwargs:
raise ValueError(f'Unhandled kwargs {kwargs}')
self.fs = fs
self.graph = None
self._topo_order = None
self._type_to_path = {}
@property
def root(self):
"""
Alias for ``self.dpath``
"""
return self.dpath
[docs]
def write_network_text(self, **kwargs):
nx.write_network_text(self.graph, rich.print, end='', **kwargs)
[docs]
def write_report(self, max_nodes=10, **nxtxt_kwargs):
"""
Args:
**nxtxt_kwargs:
path : string or file or callable or None
Filename or file handle for data output.
if a function, then it will be called for each generated line.
if None, this will default to "sys.stdout.write"
with_labels : bool | str
If True will use the "label" attribute of a node to display if it
exists otherwise it will use the node value itself. If given as a
string, then that attribute name will be used instead of "label".
Defaults to True.
sources : List
Specifies which nodes to start traversal from. Note: nodes that are not
reachable from one of these sources may not be shown. If unspecified,
the minimal set of nodes needed to reach all others will be used.
max_depth : int | None
The maximum depth to traverse before stopping. Defaults to None.
ascii_only : Boolean
If True only ASCII characters are used to construct the visualization
end : string
The line ending character
vertical_chains : Boolean
If True, chains of nodes will be drawn vertically when possible.
Example:
>>> # xdoctest: +REQUIRES(module:pandas)
>>> import xdev
>>> walker = xdev.DirectoryWalker.demo()
>>> walker.write_report(max_nodes=0)
"""
import pandas as pd # type: ignore
if len(self.graph.nodes) <= max_nodes * 10000: # type: ignore
try:
self.write_network_text(**nxtxt_kwargs)
except KeyboardInterrupt:
...
else:
print('...graph to big, not printing')
if len(self._topo_order): # type: ignore
root_node = self._topo_order[0] # type: ignore
else:
root_node = None
def _node_table(node):
node_data = self.graph.nodes[node] # type: ignore
stats = node_data.get('stats', {})
stat_rows = []
for k, v in stats.items():
ext, kind = k.split('.')
if not ext:
# ext = '∅'
# ext = '𝙣𝙪𝙡𝙡'
ext = '*null*'
stat_rows.append({'ext': ext, 'kind': kind, 'value': v})
table = pd.DataFrame(stat_rows)
if len(table) > 0:
piv = table.pivot(index='ext', columns='kind', values='value')
piv = piv.sort_values('size')
else:
piv = pd.DataFrame([], index=pd.Index([], name='ext'), columns=pd.Index(['size', 'files'], name='kind'))
totals = piv.sum(axis=0)
disp_totals = totals.copy()
disp_totals = disp_totals.astype(object)
disp_totals['size'] = byte_str(totals['size'])
disp_piv = piv.copy()
disp_piv['size'] = piv['size'].apply(byte_str)
disp_piv = disp_piv.fillna('--')
disp_piv.loc['∑ total'] = disp_totals
disp_piv['files'] = disp_piv['files'].astype(int)
return disp_piv
if root_node:
child_rows = []
for node in self.graph.succ[root_node]: # type: ignore
disp_piv = _node_table(node)
row = disp_piv.iloc[-1].to_dict()
row['name'] = self.graph.nodes[node]['name'] # type: ignore
child_rows.append(row)
if child_rows:
print('')
df = pd.DataFrame(child_rows)
if 'total_lines' in df.columns:
df = df.sort_values('total_lines')
rich.print(df)
# if self.graph.nodes[node]['type'] == 'dir':
# print(f'node={node}')
disp_piv = _node_table(root_node)
print('')
rich.print(disp_piv[:-1])
rich.print(disp_piv[-1:])
print('root_node = {}'.format(ub.urepr(root_node, nl=1)))
if 0:
# Feature to show most recently modified files in a tree?
table = []
for node, node_data in self.graph.nodes(data=True):
node_data['path_stat'] = node.stat()
node_data['path'] = node
table.append(node_data)
table = sorted(table, key=lambda r: r['path_stat'].st_mtime)
for row in table:
import xdev
time = xdev.datetime.coerce(row['path_stat'].st_mtime)
if 'dev' in str(row['path']):
print(time, row['path'])
[r['path'] for r in table]
# disp_stats = self._humanize_stats(stats, 'dir', reduce_prefix=True)
# rich.print('stats = {}'.format(ub.urepr(disp_stats, nl=1)))
[docs]
def build(self):
"""
Build the internal graph structure with requested metadata
"""
self._walk()
self._update_stats()
self._update_labels()
self._sort()
return self
[docs]
def stats(self, typed=False, root=None):
"""
Return stats about the directories starting at the root.
Requires walker has been built. If root unspecified uses walker root
"""
node = self.graph.nodes[self.root] # type: ignore
stats = node['stats']
# node_type = node['type']
if typed:
_stats = stats
else:
_stats = self._reduce_stats(stats)
return _stats
# self._humanize_stats(stats, node_type)
# disp_stats = self._humanize_stats(_stats, node_type)
[docs]
def _inplace_filter_dnames(self, dnames):
if self.include_dnames is not None:
dnames[:] = [d for d in dnames if self.include_dnames.match(d)]
if self.exclude_dnames is not None:
dnames[:] = [d for d in dnames if not self.exclude_dnames.match(d)]
[docs]
def _inplace_filter_fnames(self, fnames):
if self.include_fnames is not None:
fnames[:] = [f for f in fnames if self.include_fnames.match(f)]
if self.exclude_fnames is not None:
fnames[:] = [f for f in fnames if not self.exclude_fnames.match(f)]
[docs]
def _walk(self):
dpath = self.dpath
g = nx.DiGraph()
g.add_node(self.dpath, label=self.dpath.name, type='dir', is_root=True)
max_files = self.max_files
pman = ProgressManager(enabled=self.show_progress)
with pman:
prog = pman.progiter(desc='Walking directory')
if self.max_walk_depth is not None:
start_depth = str(self.dpath).count(os.path.sep)
if self.fs is None:
walkgen = self.dpath.walk()
else:
walkgen = self.fs.walk(os.fspath(dpath))
for root, dnames, fnames in walkgen:
if self.fs is not None:
root = ub.Path(root)
prog.step()
root_attrs = {}
root_attrs['unfiltered_num_dirs'] = len(dnames)
root_attrs['unfiltered_num_files'] = len(fnames)
if self.max_walk_depth is not None:
curr_depth = str(root).count(os.path.sep)
rel_depth = (curr_depth - start_depth)
if rel_depth >= self.max_walk_depth:
del dnames[:]
# Remove directories / files that match the blocklist or dont
# match the include list
self._inplace_filter_dnames(dnames)
self._inplace_filter_fnames(fnames)
root_attrs['num_dirs'] = len(dnames)
root_attrs['num_files'] = num_files = len(fnames)
too_many_files = max_files is not None and num_files >= max_files
if too_many_files:
root_attrs['too_many_files'] = too_many_files
g.add_node(
root,
type='dir',
name=root.name,
label=root.name,
**root_attrs,
)
# if root != dpath:
# g.add_edge(root.parent, root)
if self.sort:
# TODO: good API to customize sorting
fnames = sorted(fnames)
dnames = sorted(dnames)
if not too_many_files:
for f in fnames:
fpath = root / f
g.add_node(fpath, name=fpath.name, label=fpath.name, type='file')
g.add_edge(root, fpath)
for d in dnames:
dpath = root / d
g.add_node(dpath, name=dpath.name, label=dpath.name, type='dir')
g.add_edge(root, dpath)
self._topo_order = list(nx.topological_sort(g))
self.graph = g
if self.ignore_empty_dirs:
for node in self._topo_order[::-1]:
node_data = g.nodes[node]
if node_data['type'] == 'file':
node_data['stats'] = {'file': 1}
self._accum_stats()
to_remove = []
for node in self._topo_order[::-1]:
node_data = g.nodes[node]
if node_data['stats'].get('file', 0) == 0:
to_remove.append(node)
g.remove_nodes_from(to_remove)
self._topo_order = list(nx.topological_sort(g))
self.graph = g
self._type_to_path = {}
for p, d in self.graph.nodes(data=True):
t = d['type']
if t not in self._type_to_path:
self._type_to_path[t] = []
self._type_to_path[t].append(p)
@property
def file_paths(self):
return self._type_to_path.get('file', [])
@property
def dir_paths(self):
return self._type_to_path.get('dir', [])
[docs]
def _accum_stats(self):
g = self.graph
# Accumulate size stats
### Iterate from leaf-to-root, and accumulate info in directories
for node in self._topo_order[::-1]: # type: ignore
children = g.succ[node] # type: ignore
node_data = g.nodes[node] # type: ignore
if node_data['type'] == 'dir':
node_data['stats'] = accum_stats = {}
for child in children:
child_data = g.nodes[child] # type: ignore
child_stats = child_data.get('stats', {})
for key, stat_value in child_stats.items():
# a collections.Counter might be more efficient
# but we probably want to serialize to dictionary
# after.
if key not in accum_stats:
accum_stats[key] = 0
accum_stats[key] += stat_value
[docs]
def _update_stats(self):
g = self.graph
fs = self.fs
# Get size stats for each file.
pman = ProgressManager()
with pman:
prog = pman.progiter(desc='Parse File Info', total=len(g)) # type: ignore
for fpath, node_data in g.nodes(data=True): # type: ignore
if node_data['type'] == 'file':
stats = parse_file_stats(fpath,
parse_content=self.parse_content, fs=fs)
node_data['stats'] = stats
prog.step()
self._accum_stats()
[docs]
def _update_stats2(self):
# Variant that uses parallel process boilerplate
def worker(fpath):
stats = parse_file_stats(fpath, parse_content=self.parse_content)
return stats
self._parallel_process_files(worker, 'Parse File Info')
self._accum_stats()
[docs]
def _parallel_process_files(self, func, desc=None, max_workers=8, mode='thread'):
"""
Applies a function to every node.
"""
graph = self.graph
if desc is None:
desc = str(func)
# Get size stats for each file.
jobs = ub.JobPool(mode=mode, max_workers=max_workers)
pman = ProgressManager(backend='progiter')
submit_desc = 'Submit: ' + desc
collect_desc = 'Collect: ' + desc
with pman, jobs:
# Get the files from the graph first.
fpaths = [
path
for path, data in graph.nodes(data=True) # type: ignore
if data['isfile']
]
prog = ub.ProgIter(fpaths, desc=submit_desc, total=len(fpaths),
homogeneous=False)
for fpath in prog:
job = jobs.submit(func, fpath)
job.fpath = fpath # type: ignore
for job in ub.ProgIter(jobs.as_completed(), desc=collect_desc,
total=len(jobs)):
fpath = job.fpath # type: ignore
result = job.result()
yield fpath, result
[docs]
@classmethod
def _reduce_stats(cls, stats):
"""
Combines stats over the a prefix
"""
suffixes = [k.split('.', 1)[1] for k in stats.keys()]
_stats = ub.udict(ub.group_items(stats.values(), suffixes)).map_values(sum)
# _stats.update({k: v for k, v in stats.items() if k.endswith('.files')})
return _stats
[docs]
@classmethod
def _humanize_stats(cls, stats, node_type, reduce_prefix=False):
disp_stats = {}
if reduce_prefix:
_stats = cls._reduce_stats(stats)
else:
_stats = stats
if node_type == 'dir':
for k, v in _stats.items():
if k.endswith('.size') or k == 'size':
disp_stats[k] = byte_str(v)
else:
disp_stats[k] = v
elif node_type == 'file':
disp_stats = {k.split('.', 1)[1]: v for k, v in _stats.items()}
disp_stats.pop('files', None)
if 'size' in disp_stats:
disp_stats['size'] = byte_str(disp_stats['size'])
else:
raise KeyError(node_type)
return disp_stats
[docs]
def _find_duplicate_files(self):
hasher = 'blake3'
for path, node_data in self.graph.nodes(data=True): # type: ignore
if node_data['isfile']:
node_data[hasher] = ub.hash_file(path, hasher=hasher)
hash_to_paths = ub.ddict(list)
for path, node_data in self.graph.nodes(data=True): # type: ignore
if node_data['isfile']:
hash = node_data[hasher]
hash_to_paths[hash].append(path)
hash_to_paths = ub.udict(hash_to_paths)
dups = []
for k, v in hash_to_paths.items():
if len(v) > 1:
dups.append(k)
dup_hash_to_paths = hash_to_paths & dups # type: ignore
print('dup_hash_to_paths = {}'.format(ub.urepr(dup_hash_to_paths, nl=2)))
[docs]
def _update_labels(self):
"""
Update how each node will be displayed
"""
from os.path import relpath
from rich.markup import escape
label_options = self.label_options
pathstyle = label_options['pathstyle']
show_nfiles = label_options['show_nfiles']
show_types = label_options['show_types']
abs_root_label = label_options['abs_root_label']
colors = label_options['colors']
def pathrep_name(p, node_data):
return node_data['name']
def pathrep_rel(p, node_data):
return relpath(p, self.dpath)
def pathrep_abs(p, node_data):
return os.fspath(p)
if pathstyle == 'name':
pathrep_func = pathrep_name
elif pathstyle == 'rel':
pathrep_func = pathrep_rel
elif pathstyle == 'abs':
pathrep_func = pathrep_abs
else:
raise KeyError(pathstyle)
self._update_path_metadata()
for path, node_data in self.graph.nodes(data=True): # type: ignore
stats = node_data.get('stats', None)
node_type = node_data.get('type', None)
if abs_root_label and node_data.get('is_root', False):
pathrep = pathrep_abs(path, node_data)
else:
pathrep = pathrep_func(path, node_data)
if stats:
disp_stats = self._humanize_stats(stats, node_type)
stats_text = ub.urepr(disp_stats, nl=0, compact=1)
suffix = ': ' + stats_text
else:
suffix = ''
prefix_parts = []
if show_types:
prefix_parts.append(f'({node_data["typelabel"]})')
if node_type == 'dir':
richlink = True
color = 'blue'
if show_nfiles == 'auto':
show_nfiles_ = node_data.get('too_many_files', False)
else:
show_nfiles_ = show_nfiles
if show_nfiles_ and 'num_files' in node_data:
prefix_parts.append(
'[ {} ]'.format(node_data['num_files'])
)
elif node_type == 'file':
richlink = False
if node_data.get('X_ok', False):
color = 'green'
else:
color = 'reset'
else:
raise KeyError(node_type)
targetrep = None
if node_data['islink']:
target = node_data['target']
targetrep = target
if node_data['broken']:
color = 'red'
else:
color = 'cyan'
if node_data['isdir']:
target_color = 'blue'
target_richlink = True
else:
target_color = 'reset'
target_richlink = False
if colors:
targetrep = escape(targetrep)
if target_richlink:
import urllib.parse
encoded_target = 'file://' + urllib.parse.quote(os.fspath(target))
targetrep = f'[link={encoded_target}]{targetrep}[/link]'
targetrep = f'[{target_color}]{targetrep}[/{target_color}]'
if colors:
if richlink:
import urllib.parse
pathrep = escape(pathrep)
encoded_path = 'file://' + urllib.parse.quote(os.fspath(path))
pathrep = f'[link={encoded_path}]{pathrep}[/link]'
pathrep = f'[{color}]{pathrep}[/{color}]'
if targetrep is not None:
pathrep = f'{pathrep} -> {targetrep}'
if prefix_parts:
prefix = ' '.join(prefix_parts) + ' '
else:
prefix = ''
node_data['label'] = prefix + pathrep + suffix
[docs]
def _sort(self):
g = self.graph
# Order nodes based on size
ordered_nodes = dict(g.nodes(data=True)) # type: ignore
ordered_edges = []
for node in self._topo_order[::-1]: # type: ignore
# Sort children by total lines
children = g.succ[node] # type: ignore
children = ub.udict({c: g.nodes[c] for c in children}) # type: ignore
children = children.sorted_keys(lambda c: (g.nodes[c]['type'], g.nodes[c].get('stats', {}).get('total_lines', 0)), reverse=True) # type: ignore
for c, d in children.items():
ordered_nodes.pop(c, None)
ordered_nodes[c] = d
ordered_edges.append((node, c))
# ordered_nodes.update(children)
assert not (set(g.edges) - set(ordered_edges)) # type: ignore
new = nx.DiGraph()
new.add_nodes_from(ordered_nodes.items())
new.add_edges_from(ordered_edges)
self.graph = new
[docs]
@classmethod
def demo(cls):
"""
Create a persistent demo directory tree and return a built walker.
The directory is created under ``ub.Path.appdir('directory_walker/demo')`` and
is re-initialized on each call to keep doctests deterministic.
Returns:
DirectoryWalker
Example:
>>> import xdev
>>> walker = xdev.DirectoryWalker.demo()
>>> walker.dpath.exists()
True
"""
import os
import ubelt as ub
demo_root = ub.Path.appdir('xdev/directory_walker/demo').ensuredir()
# Make deterministic by clearing and recreating
if demo_root.exists():
demo_root.delete()
demo_root.ensuredir()
# Build a small stable tree
(demo_root / 'adir').ensuredir()
(demo_root / 'bdir').ensuredir()
(demo_root / 'adir' / 'foo.txt').write_text('hello')
(demo_root / 'adir' / 'bar.md').write_text('world')
(demo_root / 'bdir' / 'foo.md').write_text('x')
# Optional symlink (best effort)
link_path = demo_root / 'alink.txt'
try:
os.symlink(demo_root / 'adir' / 'foo.txt', link_path)
except Exception:
# Windows / permissions / filesystem may not allow symlinks
pass
walker = cls(demo_root)
walker.build()
return walker
[docs]
def find(self, pattern, data=False, root=None, filetype=None):
"""
Search for nodes whose **name** matches a MultiPattern, optionally filtering by type.
Args:
pattern: Coerced via ``MultiPattern.coerce(pattern)`` and tested with
``pattern.match(node.name)``.
data (bool): if True, also yield the node data dict
root (pathlib.Path | None): if specified, search descendants of this node
filetype (Iterable[str] | None):
Iterable of type chars from {'f', 'd', 'l'}:
- 'f' = regular file
- 'd' = directory
- 'l' = symlink
Examples: 'f', 'fd', {'l'}, ['f','l'].
Yields:
pathlib.Path | Tuple[pathlib.Path, dict]
Example:
>>> import xdev
>>> walker = DirectoryWalker.demo()
>>> sorted(p.name for p in walker.find('foo.txt'))
['foo.txt']
>>> sorted(p.name for p in walker.find('foo*', filetype='f'))
['foo.md', 'foo.txt']
>>> sorted(p.name for p in walker.find('adir', filetype='d'))
['adir']
>>> # Best-effort: only assert something meaningful if the symlink exists
>>> links = list(walker.find('alink.txt', filetype='l'))
>>> (len(links) == 1)
True
"""
import networkx as nx
if self.graph is None:
raise RuntimeError('DirectoryWalker.find() requires build() first')
graph = self.graph
# Normalize root
if root is not None:
if root not in graph:
raise KeyError(f'root {root!r} not found in graph')
nodes = nx.descendants(graph, root)
else:
nodes = graph.nodes
# Coerce pattern
pattern = MultiPattern.coerce(pattern)
# Normalize filetype: iterable of chars in {f,d,l}
ftypes = None
if filetype is not None:
ftypes = set(filetype)
unknown = ftypes - {'f', 'd', 'l'}
if unknown:
raise ValueError(f'unknown filetype chars={sorted(unknown)!r}, expected subset of {{"f","d","l"}}')
for node in nodes:
# Match only on node.name
if not pattern.match(node.name):
continue
node_data = graph.nodes[node]
if ftypes is not None:
keep = False
if ('l' in ftypes and node_data['islink']):
keep = True
if ('f' in ftypes and node_data['isfile']):
keep = True
if ('d' in ftypes and node_data['isdir']):
keep = True
if not keep:
continue
if data:
yield node, node_data
else:
yield node
[docs]
def find_one(self, pattern, data=False, root=None, filetype=None):
"""
Find exactly one node matching a pattern (and optional type filter).
Args:
pattern: Coerced via ``kwutil.MultiPattern.coerce(pattern)`` (see ``find``).
data (bool): if True, also return the node data dict.
root (pathlib.Path | None): if specified, search descendants of this node.
filetype (Iterable[str] | None): iterable of {'f','d','l'} (see ``find``).
Returns:
pathlib.Path | Tuple[pathlib.Path, dict]
Raises:
KeyError: if zero or multiple matches are found.
Example:
>>> walker = DirectoryWalker.demo()
>>> walker.find_one('foo.txt').name
'foo.txt'
"""
matches = list(self.find(pattern, data=data, root=root, filetype=filetype))
if not matches:
raise KeyError(f'find_one({pattern!r}) found no matches')
if len(matches) > 1:
raise KeyError(
f'find_one({pattern!r}) found {len(matches)} matches, expected exactly one'
)
return matches[0]
[docs]
def parse_file_stats(fpath, parse_content=True, fs=None):
"""
Get information about a file, including things like number of code lines /
documentation lines, if that sort of information is available.
"""
ext = fpath.suffix
prefix = ext.lstrip('.') + '.'
stats = {}
try:
if fs is None:
stat_obj = fpath.stat()
size = stat_obj.st_size
else:
stat_obj = fs.stat(os.fspath(fpath))
size = stat_obj['size']
except FileNotFoundError:
is_broken = True
stats['broken_link'] = True
stats['size'] = 0
else:
is_broken = False
stats['size'] = size
stats['files'] = 1
if not is_broken and parse_content:
try:
text = fpath.read_text()
except UnicodeDecodeError:
# Binary file
...
else:
total_lines = text.count('\n')
stats['total_lines'] = total_lines
if ext == '.py':
try:
raw_code = strip_comments_and_newlines(text)
code_lines = raw_code.count('\n')
except Exception:
...
else:
stats['code_lines'] = code_lines
try:
# TODO: this belongs more in the pypackage summarizer
# from xdoctest.core import package_calldefs
from xdoctest.static_analysis import TopLevelVisitor
self = TopLevelVisitor.parse(text)
calldefs = self.calldefs
total_doclines = 0
for k, v in calldefs.items():
if v.docstr is not None:
total_doclines += v.docstr.count('\n')
except Exception:
...
else:
stats['doc_lines'] = total_doclines
elif ext == '.rs':
try:
stats.update(parse_rust_content_stats(text))
except Exception:
...
stats = {prefix + k: v for k, v in stats.items()}
return stats
[docs]
def strip_docstrings(tokens):
"""
Replace docstring tokens with NL tokens in a `tokenize` stream.
Any STRING token not part of an expression is deemed a docstring.
Indented docstrings are not yet recognised.
"""
import tokenize
stack = []
state = 'wait_string'
for t in tokens:
typ = t[0]
if state == 'wait_string':
if typ in (tokenize.NL, tokenize.COMMENT):
yield t
elif typ in (tokenize.DEDENT, tokenize.INDENT, tokenize.STRING):
stack.append(t)
elif typ == tokenize.NEWLINE:
stack.append(t)
start_line, end_line = stack[0][2][0], stack[-1][3][0] + 1
for i in range(start_line, end_line):
yield tokenize.NL, '\n', (i, 0), (i, 1), '\n'
for t in stack:
if t[0] in (tokenize.DEDENT, tokenize.INDENT):
yield t[0], t[1], (i + 1, t[2][1]), (i + 1, t[3][1]), t[4]
del stack[:]
else:
stack.append(t)
for t in stack:
yield t
del stack[:]
state = 'wait_newline'
elif state == 'wait_newline':
if typ == tokenize.NEWLINE:
state = 'wait_string'
yield t
def byte_str(num, unit='auto', precision=2):
"""
Automatically chooses relevant unit (KB, MB, or GB) for displaying some
number of bytes.
Args:
num (int): number of bytes
unit (str): which unit to use, can be auto, B, KB, MB, GB, or TB
References:
.. [WikiOrdersOfMag] https://en.wikipedia.org/wiki/Orders_of_magnitude_(data)
Returns:
str: string representing the number of bytes with appropriate units
Example:
>>> import ubelt as ub
>>> num_list = [1, 100, 1024, 1048576, 1073741824, 1099511627776]
>>> result = ub.urepr(list(map(byte_str, num_list)), nl=0)
>>> print(result)
['0.00 KB', '0.10 KB', '1.00 KB', '1.00 MB', '1.00 GB', '1.00 TB']
>>> byte_str(10, unit='B')
'10.00 B'
"""
abs_num = abs(num)
if unit == 'auto':
if abs_num < 2.0 ** 10:
unit = 'KB'
elif abs_num < 2.0 ** 20:
unit = 'KB'
elif abs_num < 2.0 ** 30:
unit = 'MB'
elif abs_num < 2.0 ** 40:
unit = 'GB'
else:
unit = 'TB'
if unit.lower().startswith('b'):
num_unit = num
elif unit.lower().startswith('k'):
num_unit = num / (2.0 ** 10)
elif unit.lower().startswith('m'):
num_unit = num / (2.0 ** 20)
elif unit.lower().startswith('g'):
num_unit = num / (2.0 ** 30)
elif unit.lower().startswith('t'):
num_unit = num / (2.0 ** 40)
else:
raise ValueError('unknown num={!r} unit={!r}'.format(num, unit))
fmtstr = ('{:.' + str(precision) + 'f} {}')
res = fmtstr.format(num_unit, unit)
return res
def _null_coerce(cls, arg, **kwargs):
if arg is None:
return arg
else:
return cls.coerce(arg, **kwargs)
class DirectoryDiff:
"""
Given two directory walkers (that walk over what should be similar
directories), compare the state of them both.
Ignore:
from xdev.directory_walker import * # NOQA
walker1 = DirectoryWalker('.')
walker2 = DirectoryWalker('.')
walker1.build()
walker2.build()
self = DirectoryDiff(walker1, walker2).build()
self.write_report()
"""
def __init__(self, walker1, walker2):
self.walker1 = walker1
self.walker2 = walker2
def build(self):
rel_paths1 = {p.relative_to(self.walker1.dpath) for p in self.walker1.graph.nodes}
rel_paths2 = {p.relative_to(self.walker2.dpath) for p in self.walker2.graph.nodes}
self.root1 = self.walker1.dpath
self.root2 = self.walker2.dpath
self.common_paths = rel_paths1 & rel_paths2
self.unique_paths1 = rel_paths1 - rel_paths2
self.unique_paths2 = rel_paths2 - rel_paths1
common_table = []
for rel_path in self.common_paths:
path1 = self.root1 / rel_path
path2 = self.root2 / rel_path
row = {
'rel_path': rel_path,
'type': None,
'hash': None,
'num_errors': 0,
}
data1 = self.walker1.graph.nodes[path1]
data2 = self.walker2.graph.nodes[path2]
to_compare = {
'item1': {'type': data1['type']},
'item2': {'type': data2['type']},
}
type1 = data1['type']
type2 = data2['type']
if type1 == type2:
if data1['isfile']:
# TODO: control what is compared.
to_compare['item1']['hash'] = ub.hash_file(path1)
to_compare['item2']['hash'] = ub.hash_file(path2)
stat1 = path1.stat()
stat2 = path2.stat()
to_compare['item1']['st_mode'] = stat1.st_mode
to_compare['item2']['st_mode'] = stat2.st_mode
to_compare['item1']['st_mtime'] = stat1.st_mtime
to_compare['item2']['st_mtime'] = stat2.st_mtime
to_compare['item1']['st_ctime'] = stat1.st_ctime
to_compare['item2']['st_ctime'] = stat2.st_ctime
to_compare['item1']['st_gid'] = stat1.st_gid
to_compare['item2']['st_gid'] = stat2.st_gid
to_compare['item1']['st_uid'] = stat1.st_uid
to_compare['item2']['st_uid'] = stat2.st_uid
to_compare['item1']['stats'] = data2['stats']
to_compare['item2']['stats'] = data2['stats']
compare1 = to_compare['item1']
compare2 = to_compare['item2']
for k in compare1.keys():
v1 = compare1[k]
v2 = compare2[k]
if v1 != v2:
row[k] = f'MISMATCH: {v1} {v2}'
row['num_errors'] += 1
else:
row[k] = v1
common_table.append(row)
self.common_table = common_table
return self
def summary(self):
from collections import Counter
error_hist = Counter({0: 0, 1: 0})
error_hist.update(r['num_errors'] for r in self.common_table)
summary = {
'n_common_paths': len(self.common_paths),
'n_unique_paths1': len(self.unique_paths1),
'n_unique_paths2': len(self.unique_paths2),
'error_hist': error_hist,
}
return summary
def write_report(self):
summary = self.summary()
print(f'summary = {ub.urepr(summary, nl=1)}')
# TODO: move rust utils to helpers
def parse_rust_content_stats(source: str):
"""
Count effective Rust lines of code.
This counts non-empty lines after removing Rust comments, while preserving
comment-like text inside normal strings, raw strings, byte strings, and C
strings. Rust nested block comments are handled.
Returns:
Dict[str, int]: contains code_lines, comment_lines, and doc_lines.
Example:
>>> import ubelt as ub
>>> source = ub.codeblock(
>>> r'''
>>> // module comment
>>>
>>> fn main() {
>>> println!("http://example.com"); // trailing comment
>>> let text = "/* not comment */";
>>> let raw = r#"// not comment"#;
>>> /*
>>> block comment
>>> /* nested */
>>> */
>>> /// doc comment
>>> pub fn documented() {}
>>> }
>>> ''')
>>> stats = parse_rust_content_stats(source)
>>> assert stats['code_lines'] == 6
>>> assert stats['doc_lines'] == 1
"""
import collections
n = len(source)
i = 0
line = 0
line_has_code = collections.defaultdict(bool)
comment_lines = set()
doc_lines = set()
def mark_comment_char(ch, is_doc):
nonlocal line
if ch == '\n':
line += 1
elif not ch.isspace():
comment_lines.add(line)
if is_doc:
doc_lines.add(line)
def mark_code_span(start, stop):
nonlocal line
j = start
while j < stop:
ch = source[j]
if ch == '\n':
line += 1
elif not ch.isspace():
line_has_code[line] = True
j += 1
while i < n:
ch = source[i]
# Rust line comments: //, ///, //!
if source.startswith('//', i):
is_doc = source.startswith('///', i) or source.startswith('//!', i)
while i < n and source[i] != '\n':
mark_comment_char(source[i], is_doc)
i += 1
continue
# Rust nested block comments: /* ... */, /** ... */, /*! ... */
if source.startswith('/*', i):
is_doc = (
source.startswith('/**', i) and
not source.startswith('/***', i)
) or source.startswith('/*!', i)
depth = 0
while i < n:
if source.startswith('/*', i):
depth += 1
mark_comment_char(source[i], is_doc)
mark_comment_char(source[i + 1], is_doc)
i += 2
continue
if source.startswith('*/', i):
mark_comment_char(source[i], is_doc)
mark_comment_char(source[i + 1], is_doc)
i += 2
depth -= 1
if depth <= 0:
break
continue
mark_comment_char(source[i], is_doc)
i += 1
continue
# Rust raw strings: r"...", r#"..."#, br"...", br#"..."#.
close_delim = _rust_raw_string_close_delim(source, i)
if close_delim is not None:
open_quote = source.find('"', i)
stop = source.find(close_delim, open_quote + 1)
if stop < 0:
stop = n
else:
stop += len(close_delim)
mark_code_span(i, stop)
i = stop
continue
# Normal string-ish literals. This avoids treating // or /* inside a
# string as a comment.
if (
ch == '"' or
(ch in {'b', 'c'} and i + 1 < n and source[i + 1] == '"')
):
stop = i + 1
if ch in {'b', 'c'} and i + 1 < n and source[i + 1] == '"':
stop = i + 2
escape = False
while stop < n:
c = source[stop]
stop += 1
if escape:
escape = False
elif c == '\\':
escape = True
elif c == '"':
break
mark_code_span(i, stop)
i = stop
continue
if ch == '\n':
line += 1
i += 1
continue
if not ch.isspace():
line_has_code[line] = True
i += 1
return {
'code_lines': sum(line_has_code.values()),
'comment_lines': len(comment_lines),
'doc_lines': len(doc_lines),
}
def _rust_raw_string_close_delim(source: str, pos: int):
"""
Return the closing delimiter for a Rust raw string at ``pos``, or None.
"""
n = len(source)
if source.startswith(('br', 'cr'), pos):
j = pos + 2
prefix_len = 2
elif pos < n and source[pos] == 'r':
j = pos + 1
prefix_len = 1
else:
return None
while j < n and source[j] == '#':
j += 1
if j < n and source[j] == '"':
hashes = source[pos + prefix_len:j]
return '"' + hashes
return None