forked from sdorgancs/eopf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheopfctl
executable file
·119 lines (96 loc) · 3.66 KB
/
eopfctl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python
import json
import sys
from traceback import print_exc
import click
from eopf.algorithms import ProcessingContext
from eopf.core.computing.pool import DistributedPool, LocalPool
from eopf.core.production.triggering import import_algorithms, registry
@click.group()
def cli():
pass
@cli.command(help="List registered algorithms")
def list():
for algo in registry.algorithms:
print(algo)
schemahelp = """Prints processing unit json schema on STDOUT
\nArguments:
\n\t- PU is the name of the processing unit
"""
@cli.command(help=schemahelp)
@click.argument("algorithm")
def schema(algorithm):
if algorithm in registry.algorithms:
algo = registry.algorithms[algorithm]
d = dict()
d["description"] = algo.__doc__
d["input"] = algo.input_class().json_schema()
d["output"] = algo.output_class().json_schema()
json.dump(d, sys.stdout)
else:
print("Error: {algorithm} does not exits", file=sys.stderr)
print("Use list command to find available algorithms", file=sys.stderr)
exit(1)
describehelp = """Print the description of an processing unit on STDOUT
\nArguments:
\n\t- PU is the name of the processing unit to describe
"""
@cli.command(help=describehelp)
@click.argument("algorithm")
def describe(algorithm):
if algorithm in registry.algorithms:
algo = registry.algorithms[algorithm]
print(algo.__doc__)
else:
print("Error: {algorithm} does not exits", file=sys.stderr)
print("Use list command to find available processing units", file=sys.stderr)
exit(1)
runhelp = """Runs a processing unit
\nArguments:
\n\t- ALGORITHM is the name of the processing unit to run, to display available processing units one can use 'list' command
\n\t- INPUT_FILE is a json file containing processing unit input parameters
\n\t- OUTPUT_FILE is a json file where the result of the processing unit execution is written
\nIf an error occurs during the execution of the processing unit, exit status of the command is not zero and the error analysis is written in OUTPUT_FILE.error
"""
@cli.command(help=runhelp)
@click.argument("algorithm")
@click.argument("input_file")
@click.argument("output_file")
@click.option(
"--resources",
default=None,
help="a URL to a json file describing the resource pool configuration.\n"
+ "By default the algorithm is executed locally and uses the maximum available resources.",
)
def run(algorithm, input_file, output_file, use_ray):
try:
if algorithm in registry.algorithms:
algo = registry.algorithms[algorithm]
with open(input_file) as fi:
input_class = algo.input_class()
param = input_class.from_json(fi.read(), validate=True)
if use_ray:
context = ProcessingContext(DistributedPool(), None)
else:
context = ProcessingContext(LocalPool(), None)
output = algo(context)(param)
jsoutout = output.to_json()
with open(output_file, "w") as fo:
fo.write(jsoutout)
else:
print(f"Error: {algorithm} does not exits", file=sys.stderr)
print(
"Use list command to find available processing units", file=sys.stderr
)
exit(1)
except BaseException:
print(
f"An exception occurs running {algorithm} processing unit:", file=sys.stderr
)
print_exc(file=sys.stderr)
exit(2)
if __name__ == "__main__":
import logging
logging.basicConfig(filename="eopf_cli.log", level=logging.INFO)
import_algorithms()
cli()