-
Notifications
You must be signed in to change notification settings - Fork 0
/
abs.py
109 lines (90 loc) · 3.66 KB
/
abs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python
"""
A multi-processing Python implementation of the ``abs`` program
from commit 4aa9863ab5eadc9dcd6b96f30449ea764ce28979
"""
import os
import sys
from pathlib import Path
from argparse import ArgumentParser, Namespace, ArgumentDefaultsHelpFormatter
from concurrent.futures import ProcessPoolExecutor
from chris_plugin import chris_plugin, PathMapper
__version__ = '1.0.0'
DISPLAY_TITLE = r"""
_ _
| | | |
_ __ | |______ __ _| |__ ___
| '_ \| |______/ _` | '_ \/ __|
| |_) | | | (_| | |_) \__ \
| .__/|_| \__,_|_.__/|___/
| |
|_|
"""
parser = ArgumentParser(description='A ChRIS plugin to take the absolute value of data files',
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('-p', '--pattern', default='**/*.txt', type=str,
help='input file filter glob')
parser.add_argument('-V', '--version', action='version',
version=f'%(prog)s {__version__}')
@chris_plugin(
parser=parser,
title='Absolute Value',
category='', # ref. https://chrisstore.co/plugins
min_memory_limit='100Mi', # supported units: Mi, Gi
min_cpu_limit='1000m', # millicores, e.g. "1000m" = 1 CPU core
min_gpu_limit=0 # set min_gpu_limit=1 to enable GPU
)
def main(options: Namespace, inputdir: Path, outputdir: Path):
"""
*ChRIS* plugins usually have two positional arguments: an **input directory** containing
input files and an **output directory** where to write output files. Command-line arguments
are passed to this main method implicitly when ``main()`` is called below without parameters.
:param options: non-positional arguments parsed by the parser given to @chris_plugin
:param inputdir: directory containing (read-only) input files
:param outputdir: directory where to write output files
"""
proc = get_workers()
print(f'Using {proc} threads', flush=True, file=sys.stderr)
mapper = PathMapper.file_mapper(inputdir, outputdir, glob=options.pattern.split(','))
input_files, output_files = zip(*mapper)
with ProcessPoolExecutor(max_workers=proc) as pool:
results = pool.map(abs_file, input_files, output_files)
for _ in results:
pass
def get_workers() -> int:
if e := os.getenv('NUM_THREADS', None) is not None:
try:
return int(e)
except ValueError:
print(
f'WARNING: Environment variable NUM_THREADS={e} '
f'cannot be parsed as int, ignoring',
file=sys.stderr,
flush=True
)
return len(os.sched_getaffinity(0))
def abs_file(input_file: Path, output_file: Path):
"""
A pure-Python implementation of absolute value which removes negative signs from in front of numbers.
We avoid deserializing the data as floats to avoid loss of precision, and don't do any other kinds of
processing to preserve whitespace and whatever else.
"""
with input_file.open('rb') as i:
with output_file.open('wb') as o:
prev = b''
was_negative = False
cur = None
while cur := i.read(1):
if cur == b'-':
was_negative = True
elif was_negative:
if cur in b'1234567890.':
prev = b''
was_negative = False
o.write(prev)
prev = cur
if cur is not None:
o.write(cur)
print(f'{input_file} -> {output_file}')
if __name__ == '__main__':
main()