-
Notifications
You must be signed in to change notification settings - Fork 1
/
etl.py
199 lines (166 loc) · 7.59 KB
/
etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""Run ETL script.
Cleans and combines raw_data/*.csv files into a single parquet file.
"""
import datetime
import pathlib
import traceback
import zoneinfo
from typing import Final
import humanize
import hydra
import polars as pl
from omegaconf import DictConfig # noqa: TC002
__all__: list[str] = []
@hydra.main(version_base=None, config_path="..", config_name="config")
def etl(cfg: DictConfig) -> None: # pylint: disable=too-many-locals
"""Run ETL script.
Args:
cfg (DictConfig): Hydra configuration.
Raises:
OSError: An error occurred reading a csv.
ValueError: A data quality check failed.
"""
# setup variables
# pylint: disable=invalid-name,duplicate-code
PACKAGE_PATH: Final = pathlib.Path(cfg["general"]["package_path"]).expanduser()
RAW_DATA_RELATIVE_PATH: Final = cfg["daq"]["raw_data_relative_path"]
SAVED_DATA_RELATIVE_PATH: Final = cfg["etl"]["saved_data_relative_path"]
DATE_FMT: Final = cfg["general"]["date_fmt"]
TIME_FMT: Final = cfg["general"]["time_fmt"]
FNAME_DATETIME_FMT: Final = cfg["general"]["fname_datetime_fmt"]
DATETIME_FMT: Final = f"{DATE_FMT} {TIME_FMT}"
LOCAL_TIMEZONE_STR: Final = cfg["general"]["local_timezone"]
if LOCAL_TIMEZONE_STR not in zoneinfo.available_timezones():
AVAILABLE_TIMEZONES: Final = "\n".join(list(zoneinfo.available_timezones()))
raise ValueError(f"Unknown {LOCAL_TIMEZONE_STR = }, choose from:\n{AVAILABLE_TIMEZONES}")
UTC_TIMEZONE: Final = zoneinfo.ZoneInfo("UTC")
# pylint: enable=duplicate-code
# when the issue of drifting seconds was fixed by replacing t_start's second and microsecond with 0
DT_END_OF_DRIFTING_SECONDS: Final = datetime.datetime.strptime(
cfg["daq"]["end_of_drifting_seconds"], DATETIME_FMT
).replace(tzinfo=UTC_TIMEZONE)
# when web threading was fixed, eliminating duplicate records from multiple threads
DT_END_OF_THREADING_DUPLICATES: Final = datetime.datetime.strptime(
cfg["daq"]["end_of_threading_duplicates"], DATETIME_FMT
).replace(tzinfo=UTC_TIMEZONE)
# When sticking flow variable was fixed
DT_END_OF_STICKING_FLOW: Final = datetime.datetime.strptime(
cfg["daq"]["end_of_sticking_flow"], DATETIME_FMT
).replace(tzinfo=UTC_TIMEZONE)
# pylint: enable=invalid-name
# load raw csv files
dfpl_list = []
csv_total_bytes = 0
for f_csv in (PACKAGE_PATH / RAW_DATA_RELATIVE_PATH).glob("*.csv"):
try:
f_csv = pathlib.Path(f_csv)
dfpl = pl.scan_csv(f_csv)
dfpl = dfpl.with_columns(pl.lit(f_csv.name).alias("fname"))
dfpl_list.append(dfpl)
csv_total_bytes += f_csv.stat().st_size
except Exception as error:
raise OSError(
f"Error loading file {f_csv}!\n{error = }\n{type(error) = }\n{traceback.format_exc()}"
) from error
dfpl = pl.concat(dfpl_list)
# set UTC timezone
dfpl = dfpl.with_columns(
pl.col("datetime_utc").str.to_datetime(DATETIME_FMT).dt.replace_time_zone("UTC")
)
# remove any datetimes that have more than 1 row, caused by historical bug in DAQ threading
DFPL_DUPLICATE_DATETIME: Final = ( # pylint: disable=invalid-name
dfpl.group_by("datetime_utc").agg(pl.len()).filter(1 < pl.col("len"))
)
dfpl = dfpl.join(
DFPL_DUPLICATE_DATETIME.select("datetime_utc"),
on="datetime_utc",
how="anti",
)
# make sure none of the duplicates are post threading fix
DFPL_DUPLICATE_DATETIME_POST_THREADING_FIX: Final = ( # pylint: disable=invalid-name
DFPL_DUPLICATE_DATETIME.filter(DT_END_OF_THREADING_DUPLICATES < pl.col("datetime_utc"))
)
N_DUPLICATE_DATETIME_POST_THREADING_FIX: Final = ( # pylint: disable=invalid-name
DFPL_DUPLICATE_DATETIME_POST_THREADING_FIX.select(pl.len()).collect(streaming=True).item()
)
if 0 < N_DUPLICATE_DATETIME_POST_THREADING_FIX:
DUPLICATE_DATETIME_POST_FIX_MIN: Final = ( # pylint: disable=invalid-name
DFPL_DUPLICATE_DATETIME_POST_THREADING_FIX.select(pl.col("datetime_utc").min())
.collect(streaming=True)
.item()
.strftime(DATETIME_FMT)
)
DUPLICATE_DATETIME_POST_FIX_MAX: Final = ( # pylint: disable=invalid-name
DFPL_DUPLICATE_DATETIME_POST_THREADING_FIX.select(pl.col("datetime_utc").max())
.collect(streaming=True)
.item()
.strftime(DATETIME_FMT)
)
raise ValueError(
f"Found {N_DUPLICATE_DATETIME_POST_THREADING_FIX} datetimes with multiple entries"
+ f" after {DT_END_OF_THREADING_DUPLICATES.strftime(DATETIME_FMT)} UTC!"
+ f"\nDuplicates are between {DUPLICATE_DATETIME_POST_FIX_MIN} and {DUPLICATE_DATETIME_POST_FIX_MAX}."
)
# check for drifting seconds after fix implemented at DT_END_OF_DRIFTING_SECONDS
DFPL_DRIFT_SECONDS_RECORDS: Final = dfpl.filter( # pylint: disable=invalid-name
(DT_END_OF_DRIFTING_SECONDS < pl.col("datetime_utc"))
& (pl.col("datetime_utc").dt.second() != 0)
)
N_ROWS_DRIFT_SECONDS: Final = ( # pylint: disable=invalid-name
DFPL_DRIFT_SECONDS_RECORDS.select(pl.len()).collect(streaming=True).item()
)
if 0 < N_ROWS_DRIFT_SECONDS:
print(DFPL_DRIFT_SECONDS_RECORDS.collect(streaming=True))
raise ValueError(
f"Found {N_ROWS_DRIFT_SECONDS = } after {DT_END_OF_DRIFTING_SECONDS.strftime(DATETIME_FMT)} UTC!"
)
# set had_flow to -1 for dates before DT_END_OF_STICKING_FLOW
dfpl = dfpl.with_columns(pl.col("had_flow").alias("had_flow_original")).with_columns(
pl.when(pl.col("datetime_utc") <= DT_END_OF_STICKING_FLOW)
.then(pl.lit(-1))
.otherwise(pl.col("had_flow_original"))
.alias("had_flow")
)
# check for invalid had_flow values
DFPL_INVALID_HAD_FLOW_RECORDS: Final = dfpl.filter( # pylint: disable=invalid-name
~pl.col("had_flow").is_in([-1, 0, 1])
)
N_INVALID_HAD_FLOW_ROWS: Final = ( # pylint: disable=invalid-name
DFPL_INVALID_HAD_FLOW_RECORDS.select(pl.len()).collect(streaming=True).item()
)
if 0 < N_INVALID_HAD_FLOW_ROWS:
print(DFPL_INVALID_HAD_FLOW_RECORDS.collect(streaming=True))
raise ValueError(f"Found {N_INVALID_HAD_FLOW_ROWS = }!")
dfpl = dfpl.sort(pl.col("datetime_utc"), descending=False)
print(
"\nETL Summary:",
dfpl.select("datetime_utc", "mean_pressure_value").collect(streaming=True).describe(),
)
(PACKAGE_PATH / SAVED_DATA_RELATIVE_PATH).mkdir(parents=True, exist_ok=True)
PARQUET_DATETIME_MIN: Final = ( # pylint: disable=invalid-name
dfpl.select(pl.col("datetime_utc").min())
.collect(streaming=True)
.item()
.strftime(FNAME_DATETIME_FMT)
)
PARQUET_DATETIME_MAX: Final = ( # pylint: disable=invalid-name
dfpl.select(pl.col("datetime_utc").max())
.collect(streaming=True)
.item()
.strftime(FNAME_DATETIME_FMT)
)
f_parquet = (
PACKAGE_PATH
/ SAVED_DATA_RELATIVE_PATH
/ f"data_{PARQUET_DATETIME_MIN}_to_{PARQUET_DATETIME_MAX}.parquet"
)
dfpl.collect(streaming=True).write_parquet(f_parquet)
parquet_total_bytes = f_parquet.stat().st_size
print(
f"\nCombined parquet saved to {f_parquet}"
+ f"\n\nInput CSVs: {humanize.naturalsize(csv_total_bytes)}"
+ f", Output parquet: {humanize.naturalsize(parquet_total_bytes)}"
+ f", a reduction of {(csv_total_bytes - parquet_total_bytes) / csv_total_bytes:.0%}\n"
)
if __name__ == "__main__":
etl() # pylint: disable=no-value-for-parameter