-
Notifications
You must be signed in to change notification settings - Fork 2
/
example.py
61 lines (53 loc) · 1.79 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from __future__ import annotations
import datetime as dt
import numpy as np
import polars as pl
# Load a dataframe with times you wish to make a prediction
prediction_times_df = pl.DataFrame(
{"id": [1, 1, 2], "date": ["2020-01-01", "2020-02-01", "2020-02-01"]}
)
# Load a dataframe with raw values you wish to aggregate as predictors
predictor_df = pl.DataFrame(
{
"id": [1, 1, 1, 2],
"date": ["2020-01-15", "2019-12-10", "2019-12-15", "2020-01-02"],
"value": [1, 2, 3, 4],
}
)
# Load a dataframe specifying when the outcome occurs
outcome_df = pl.DataFrame({"id": [1], "date": ["2020-03-01"], "value": [1]})
# Specify how to aggregate the predictors and define the outcome
from timeseriesflattener import (
MaxAggregator,
MinAggregator,
OutcomeSpec,
PredictionTimeFrame,
PredictorSpec,
ValueFrame,
)
predictor_spec = PredictorSpec(
value_frame=ValueFrame(
init_df=predictor_df, entity_id_col_name="id", value_timestamp_col_name="date"
),
lookbehind_distances=[dt.timedelta(days=1)],
aggregators=[MaxAggregator(), MinAggregator()],
fallback=np.nan,
column_prefix="pred",
)
outcome_spec = OutcomeSpec(
value_frame=ValueFrame(
init_df=outcome_df, entity_id_col_name="id", value_timestamp_col_name="date"
),
lookahead_distances=[dt.timedelta(days=1)],
aggregators=[MaxAggregator(), MinAggregator()],
fallback=np.nan,
column_prefix="outc",
)
# Instantiate TimeseriesFlattener and add the specifications
from timeseriesflattener import Flattener
result = Flattener(
predictiontime_frame=PredictionTimeFrame(
init_df=prediction_times_df, entity_id_col_name="id", timestamp_col_name="date"
)
).aggregate_timeseries(specs=[predictor_spec, outcome_spec])
result # type: ignore # noqa: B018