-
Notifications
You must be signed in to change notification settings - Fork 0
/
processing_pipeline.py
103 lines (78 loc) · 3.69 KB
/
processing_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
import os
import subprocess
import time
from pathlib import Path
from yaml import load, dump
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
import shutil
if __name__ == '__main__':
which_root = shutil.which("root")
if which_root == "/cvmfs/sft.cern.ch/lcg/releases/LCG_103/ROOT/6.28.00/x86_64-ubuntu2004-gcc9-opt/bin/root":
print("Found expected ROOT version")
else:
print("This script relies on pyROOT, but found unexpected root version.")
print(" > In case of issues, run:")
print(" > source /cvmfs/sft.cern.ch/lcg/releases/LCG_103/ROOT/6.28.00/x86_64-ubuntu2004-gcc9-opt/ROOT-env.sh")
from root_dumper import dump_to_root
from data_dumper import data_dumper
skip_stageout = True
td02_dir = '/home/daq/ETROC_output/'
data_dir = './ETROC_output/'
while True:
with open('daq_log.txt', 'r') as f:
available_runs = f.readlines()
available_runs = [int(x) for x in available_runs]
Path('process_log.txt').touch()
with open('process_log.txt', 'r') as f:
processed_runs = f.readlines()
processed_runs = [int(x) for x in processed_runs]
for run in available_runs:
if run not in processed_runs:
print(f"\n\n >>> Starting to process run {run} <<<")
print(f" > Trying to load DAQ log")
try:
with open(data_dir + f"/log_run_{run}_rb0.yaml", "r") as f:
log = load(f, Loader=Loader)
print(f" > Nevents according to log: {log['nevents']}")
if log['lost_events'] > 0:
print(" > Lost events detected, will not process the data.")
with open('process_log.txt', 'a') as f:
f.write(f'{run}\n')
continue
if log['nevents'] < 1:
print(" > Empty run detected, can't process any data.")
with open('process_log.txt', 'a') as f:
f.write(f'{run}\n')
continue
except FileNotFoundError:
print("Couldn't find log")
print(f" > Converting binary to json")
n_events = data_dumper(
f"{data_dir}/output_run_{run}_rb0.dat",
skip_trigger_check=True,
)
continue_processing = n_events==log['nevents']
#print("Number of L1A and events in agreement?", continue_processing)
if continue_processing:
#subprocess.call(f"python3 data_dumper.py --input {run} --rbs 0 --skip_trigger_check", shell=True)
outfile = f'ETROC_merged_run_{run}.root'
print(f" > Converting json to root")
dump_to_root(
f'{data_dir}/{outfile}',
f'{data_dir}/output_run_{run}_rb0.json',
)
if not skip_stageout:
print(f" > Stage out root file.")
subprocess.call(f"scp {data_dir}/{outfile} [email protected]:{td02_dir}/{outfile}", shell=True)
#print(f" > Backup of raw data to EOS")
else:
print(" ! Data and number of L1As not in agreement, did not further process!")
with open('process_log.txt', 'a') as f:
f.write(f'{run}\n')
print(f" > Done.")
print("Done with all runs, sleeping for 1min")
time.sleep(60)