-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathcheckTasksConsistency.py
309 lines (285 loc) · 13 KB
/
checkTasksConsistency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import fnmatch
import os
import re
import sys
import yaml
from yaml import Loader
from yaml.constructor import ConstructorError
def no_duplicates_constructor(loader, node, deep=False):
"""Check for duplicate keys."""
mapping = {}
for key_node, value_node in node.value:
key = loader.construct_object(key_node, deep=deep)
value = loader.construct_object(value_node, deep=deep)
if key in mapping:
raise ConstructorError("while constructing a mapping", node.start_mark,
"found duplicate key (%s)" % key, key_node.start_mark)
mapping[key] = value
return loader.construct_mapping(node, deep)
yaml.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, no_duplicates_constructor)
if __name__ == "__main__":
file_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(file_dir))
__package__ = 'RunKit'
from .grid_tools import run_dasgoclient
class CheckResult:
def __init__(self, all_ok, tasks_by_name, tasks_by_dataset):
self.all_ok = all_ok
self.tasks_by_name = tasks_by_name
self.tasks_by_dataset = tasks_by_dataset
def check_consistency_era(task_cfg_files, dataset_name_mask_mc=None, dataset_name_mask_data=None):
tasks_by_name = {}
tasks_by_dataset = {}
all_ok = True
for task_cfg_file in task_cfg_files:
if not os.path.isfile(task_cfg_file):
print(f'ERROR: "{task_cfg_file}" does not exist.')
all_ok = False
continue
try:
with open(task_cfg_file) as f:
cfg = yaml.load(f, Loader=Loader)
except Exception as e:
print(f'ERROR: "{task_cfg_file}" unable to parse yaml.')
print(e)
all_ok = False
continue
if type(cfg) != dict:
print(f'ERROR: "{task_cfg_file}" contains {type(cfg)}, while dict is expected.')
all_ok = False
continue
is_data = cfg.get('config', {}).get('params', {}).get('sampleType', '') == 'data'
for task_name, task_desc in cfg.items():
if task_name == 'config': continue
customTask = type(task_desc) == dict
if customTask:
if 'inputDataset' not in task_desc:
print(f'ERROR: "{task_cfg_file}" task "{task_name}" does not have "inputDataset" field.')
all_ok = False
continue
if 'ignoreFiles' in task_desc:
print(f'WARNING: "{task_cfg_file}" task "{task_name}" has "ignoreFiles" field.')
inputDataset = task_desc['inputDataset']
else:
inputDataset = task_desc
task_entry = {
'name': task_name,
'inputDataset': inputDataset,
'file': task_cfg_file,
'isData': is_data,
}
if task_name not in tasks_by_name:
tasks_by_name[task_name] = []
tasks_by_name[task_name].append(task_entry)
if inputDataset not in tasks_by_dataset:
tasks_by_dataset[inputDataset] = []
tasks_by_dataset[inputDataset].append(task_entry)
for task_name, task_list in tasks_by_name.items():
if len(task_list) > 1:
print(f'ERROR: task "{task_name}" is defined in multiple files:')
for task_entry in task_list:
print(f' file={task_entry["file"]} dataset={task_entry["inputDataset"]}')
all_ok = False
for inputDataset, task_list in tasks_by_dataset.items():
if len(task_list) > 1:
print(f'ERROR: input dataset "{inputDataset}" is defined in multiple tasks:')
for task_entry in task_list:
print(f' file={task_entry["file"]} task={task_entry["name"]}')
all_ok = False
is_data = task_list[0]['isData']
name_mask = dataset_name_mask_data if is_data else dataset_name_mask_mc
if name_mask is not None and len(name_mask) > 0:
if name_mask[0] == '^':
mask_matched = re.match(name_mask, inputDataset)
else:
mask_matched = fnmatch.fnmatch(inputDataset, name_mask)
if not mask_matched:
print(f'ERROR: input dataset "{inputDataset}" does not matches the expected mask "{name_mask}"')
all_ok = False
return CheckResult(all_ok, tasks_by_name, tasks_by_dataset)
class ExceptionMatcher:
def __init__(self, exceptions):
self.exceptions = exceptions
self.used_patterns = set()
def get_known_exceptions(self, task_name):
matched_eras = set()
era_to_pattern_list = {}
for task_pattern, eras in exceptions.items():
if (task_pattern[0] == '^' and re.match(task_pattern, task_name)) or task_pattern == task_name:
for era in eras:
matched_eras.add(era)
if era not in era_to_pattern_list:
era_to_pattern_list[era] = []
era_to_pattern_list[era].append(task_pattern)
self.used_patterns.add(task_pattern)
all_ok = True
era_to_pattern = {}
for era, patterns in era_to_pattern_list.items():
if len(patterns) > 1:
all_ok = False
patterns_str = ', '.join(patterns)
print(f'{task_name} is matched by multiple exception patterns that include {era}: {patterns_str}')
era_to_pattern[era] = patterns[0]
return all_ok, matched_eras, era_to_pattern_list
def get_unused_patterns(self):
return set(self.exceptions.keys()) - self.used_patterns
def check_task_consistency(task_name, eras, all_eras, exception_matcher, era_results, name_matching,
dataset_name_masks, show_only_missing_with_candidates):
if not re.match('^[A-Za-z0-9_]+$', task_name):
print(f'{task_name} contains invalid characters')
return False
n_eras = len(all_eras)
is_data = era_results[eras[0]].tasks_by_name[task_name][0]['isData']
exception_match_ok, known_exceptions, known_exception_to_pattern = exception_matcher.get_known_exceptions(task_name)
if not exception_match_ok:
return False
missing_eras = all_eras - set(eras) - known_exceptions
redundant_exceptions = known_exceptions & set(eras)
if len(redundant_exceptions) > 0:
known_exceptions_str = ', '.join(known_exceptions)
redundant_exceptions_str = ', '.join(redundant_exceptions)
known_exception_patterns_str = ', '.join(known_exception_to_pattern.keys())
print(f'{task_name} is listed as exception for [{known_exceptions_str}] in [{known_exception_patterns_str}]'
f', but it exists for [{redundant_exceptions_str}]')
return False
if len(eras) != n_eras and not is_data and len(missing_eras) > 0:
missing_eras_str = ', '.join(missing_eras)
missing_prints = [ f'{task_name} is not available in: {missing_eras_str}' ]
print_missing = not show_only_missing_with_candidates
dataset_names = set()
for era in eras:
for task in era_results[era].tasks_by_name[task_name]:
missing_prints.append(f' era={era} file={task["file"]} dataset={task["inputDataset"]}')
dataset_name = re.match(dataset_name_masks[era]['mc'], task["inputDataset"]).group(1)
if dataset_name is None or len(dataset_name) == 0:
raise RuntimeError(f'Unable to extract dataset name from {task["inputDataset"]} using mask {dataset_name_masks[era]["mc"]}')
dataset_names.add(dataset_name)
for missing_era in missing_eras:
das_candidates = set()
for dataset_name in dataset_names:
full_dataset_name = dataset_name_masks[missing_era]['mc_das'].format(dataset_name)
new_das_candidates = run_dasgoclient(f'dataset dataset={full_dataset_name}', verbose=0)
for candidate in new_das_candidates:
if re.match(dataset_name_masks[missing_era]['mc'], candidate):
add_candidate = True
ext_match = re.match('.*_(ext[0-9]+)', task_name)
if ext_match:
ext_str = ext_match.group(1)
if not re.match(f'/.+/.+[_-]{ext_str}[_-]v[0-9]+/.+', candidate):
add_candidate = False
if add_candidate:
das_candidates.add(candidate)
if len(das_candidates) > 0:
missing_prints.append(f' {missing_era} potential candidates from DAS:')
print_missing = True
for candidate in das_candidates:
missing_prints.append(f' {candidate}')
else:
missing_prints.append(f' {missing_era} no candidates from DAS')
if print_missing:
for line in missing_prints:
print(line)
return False
file_names = {}
datasets = {}
for era in eras:
for task in era_results[era].tasks_by_name[task_name]:
file_name = os.path.split(task['file'])[1]
if file_name not in file_names:
file_names[file_name] = []
file_names[file_name].append(era)
dataset = task['inputDataset']
dataset_name = dataset.split('/')[1]
dataset_name_ref = dataset_name.lower()
for matching_entry in name_matching:
pattern, replacement = matching_entry.split(':')
dataset_name_ref = re.sub(pattern.lower(), replacement.lower(), dataset_name_ref)
if dataset_name_ref not in datasets:
datasets[dataset_name_ref] = []
datasets[dataset_name_ref].append((era, dataset, dataset_name))
if len(file_names) > 1:
print(f'{task_name} is defined in multiple files:')
for file_name, eras in file_names.items():
print(f' {file_name} in {", ".join(eras)}')
return False
if len(name_matching) > 0:
if len(datasets) > 1:
print(f'{task_name} is used to refer different datasets:')
for ref_name, era_list in datasets.items():
for era, dataset, dataset_name in era_list:
print(f' era={era} dataset={dataset} ref={ref_name}')
return False
return True
def check_consistency(era_files_dict, exceptions, name_matching, dataset_name_masks, show_only_missing_with_candidates):
era_results = {}
tasks_by_name = {}
all_ok = True
for era, files in era_files_dict.items():
if dataset_name_masks is None:
dataset_name_mask_mc = None
dataset_name_mask_data = None
else:
if era not in dataset_name_masks:
raise RuntimeError(f'Era {era} is not found in dataset_name_masks')
dataset_name_mask_mc = dataset_name_masks[era]['mc']
dataset_name_mask_data = dataset_name_masks[era]['data']
era_results[era] = check_consistency_era(files, dataset_name_mask_mc, dataset_name_mask_data)
all_ok = all_ok and era_results[era].all_ok
for task_name in era_results[era].tasks_by_name.keys():
if task_name not in tasks_by_name:
tasks_by_name[task_name] = []
tasks_by_name[task_name].append(era)
exception_matcher = ExceptionMatcher(exceptions)
all_eras = set(era_files_dict.keys())
for task_name, eras in tasks_by_name.items():
task_consistent = check_task_consistency(task_name, eras, all_eras, exception_matcher, era_results, name_matching,
dataset_name_masks, show_only_missing_with_candidates)
all_ok = all_ok and task_consistent
unused_patterns = exception_matcher.get_unused_patterns()
if len(unused_patterns) > 0:
print(f'WARNING: unused entries in exceptions: {", ".join(unused_patterns)}')
return all_ok
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Check consistency of tasks configurations for crabOverseer.')
parser.add_argument('--cross-eras', action='store_true', help='Check consistency of tasks across different eras.')
parser.add_argument('--era', type=str, required=False, default=None, help='Era')
parser.add_argument('--exceptions', type=str, required=False, default=None,
help='File with exceptions for the checks.')
parser.add_argument('--name-matching', type=str, required=False, default=[], action='append',
help='Matching equivalence between dataset names (multiple matching transformations accepted).')
parser.add_argument('--dataset-name-masks', type=str, required=False, default=None,
help='File with expected masks for dataset names in DAS for data and MC')
parser.add_argument('--show-only-missing-with-candidates', action='store_true',
help='Only show missing samples that have potential candidates in DAS.')
parser.add_argument('task_file', type=str, nargs='+', help="file(s) with task descriptions")
args = parser.parse_args()
era_files_dict = {}
if args.cross_eras:
for era_dir in args.task_file:
era_name = os.path.basename(era_dir)
era_files_dict[era_name] = []
for root, dirs, files in os.walk(era_dir):
task_files = [os.path.join(root, f) for f in files if f.endswith('.yaml')]
era_files_dict[era_name].extend(task_files)
else:
if args.era is None:
raise RuntimeError("Era is not specified.")
era_files_dict[args.era] = args.task_file
dataset_name_masks = None
if args.dataset_name_masks:
with open(args.dataset_name_masks) as f:
dataset_name_masks = yaml.safe_load(f)
exceptions = {}
if args.exceptions:
with open(args.exceptions) as f:
exceptions = yaml.safe_load(f)
all_ok = check_consistency(era_files_dict, exceptions, args.name_matching, dataset_name_masks,
args.show_only_missing_with_candidates)
if all_ok:
print("All checks are successfully passed.")
exit_code = 0
else:
print("Some checks are failed.")
exit_code = 1
sys.exit(exit_code)