-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathFine Grained Demand Forecasting.py
509 lines (385 loc) · 18 KB
/
Fine Grained Demand Forecasting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
# Databricks notebook source
# MAGIC %md #Demand Forecasting
# MAGIC
# MAGIC The objective of this notebook is to illustrate how we might generate a large number of fine-grained forecasts at the store-item level in an efficient manner leveraging the distributed computational power of Databricks. For this exercise, we will make use of an increasingly popular library for demand forecasting, [FBProphet](https://facebook.github.io/prophet/), which we will load into the notebook session associated with a cluster running Databricks 6.0 or higher:
# COMMAND ----------
# load fbprophet library
# dbutils.library.installPyPI('FBProphet', version='0.5') # find latest version of fbprophet here: https://pypi.org/project/fbprophet/
# dbutils.library.installPyPI('holidays','0.9.12') # this line is in response to this issue with fbprophet 0.5: https://github.com/facebook/prophet/issues/1293
# dbutils.library.restartPython()
# COMMAND ----------
# MAGIC %md ## Examine the Data
# MAGIC
# MAGIC For our training dataset, we will make use of 5-years of store-item unit sales data for 50 items across 10 different stores. This data set is publicly available as part of a past Kaggle competition and can be downloaded [here](https://www.kaggle.com/c/demand-forecasting-kernels-only/data).
# MAGIC
# MAGIC Once downloaded, we can uzip the *train.csv.zip* file and upload the decompressed CSV to */FileStore/tables/demand_forecast/train/* using the file import steps documented [here](https://docs.databricks.com/data/tables.html#create-table-ui). Please note when performing the file import, you don't need to select the *Create Table with UI* or the *Create Table in Notebook* options to complete the import process.
# MAGIC
# MAGIC With the dataset accessible within Databricks, we can now explore it in preparation for modeling:
# COMMAND ----------
from pyspark.sql.types import *
# structure of the training data set
train_schema = StructType([
StructField('date', DateType()),
StructField('store', IntegerType()),
StructField('item', IntegerType()),
StructField('sales', IntegerType())
])
# read the training file into a dataframe
train = spark.read.csv(
'/FileStore/tables/demand_forecast/train/train.csv',
header=True,
schema=train_schema
)
# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')
# COMMAND ----------
# MAGIC %md When performing demand forecasting, we are often interested in general trends and seasonality. Let's start our exploration by examing the annual trend in unit sales:
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC year(date) as year,
# MAGIC sum(sales) as sales
# MAGIC FROM train
# MAGIC GROUP BY year(date)
# MAGIC ORDER BY year;
# COMMAND ----------
# MAGIC %md It's very clear from the data that there is a generally upward trend in total unit sales across the stores. If we had better knowledge of the markets served by these stores, we might wish to identify whether there is a maximum growth capacity we'd expect to approach over the life of our forecast. But without that knowledge and by just quickly eyeballing this dataset, it feels safe to assume that if our goal is to make a forecast a few days, months or even a year out, we might expect continued linear growth over that time span.
# MAGIC
# MAGIC Now let's examine seasonality. If we aggregate the data around the individual months in each year, a distinct yearly seasonal pattern is observed which seems to grow in scale with overall growth in sales:
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC TRUNC(date, 'MM') as month,
# MAGIC SUM(sales) as sales
# MAGIC FROM train
# MAGIC GROUP BY TRUNC(date, 'MM')
# MAGIC ORDER BY month;
# COMMAND ----------
# MAGIC %md Aggregating the data at a weekday level, a pronounced weekly seasonal pattern is observed with a peak on Sunday (weekday 0), a hard drop on Monday (weekday 1) and then a steady pickup over the week heading back to the Sunday high. This pattern seems to be pretty stable across the five years of observations:
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC YEAR(date) as year,
# MAGIC CAST(DATE_FORMAT(date, 'u') as Integer) % 7 as weekday,
# MAGIC --CONCAT(DATE_FORMAT(date, 'u'), '-', DATE_FORMAT(date, 'EEEE')) as weekday,
# MAGIC AVG(sales) as sales
# MAGIC FROM (
# MAGIC SELECT
# MAGIC date,
# MAGIC SUM(sales) as sales
# MAGIC FROM train
# MAGIC GROUP BY date
# MAGIC ) x
# MAGIC GROUP BY year, CAST(DATE_FORMAT(date, 'u') as Integer) --, CONCAT(DATE_FORMAT(date, 'u'), '-', DATE_FORMAT(date, 'EEEE'))
# MAGIC ORDER BY year, weekday;
# COMMAND ----------
# MAGIC %md Now that we are oriented to the basic patterns within our data, let's explore how we might build a forecast.
# COMMAND ----------
# MAGIC %md ###Build a Forecast
# MAGIC
# MAGIC Before attempting to generate forecasts for individual combinations of stores and items, it might be helpful to build a single forecast for no other reason than to orient ourselves to the use of FBProphet.
# MAGIC
# MAGIC Our first step is to assemble the historical dataset on which we will train the model:
# COMMAND ----------
# query to aggregate data to date (ds) level
sql_statement = '''
SELECT
CAST(date as date) as ds,
sales as y
FROM train
WHERE store=1 AND item=1
ORDER BY ds
'''
# assemble dataset in Pandas dataframe
history_pd = spark.sql(sql_statement).toPandas()
# drop any missing records
history_pd = history_pd.dropna()
# COMMAND ----------
# MAGIC %md Now, we will import the fbprophet library, but because it can be a bit verbose when in use, we will need to fine-tune the logging settings in our environment:
# COMMAND ----------
from fbprophet import Prophet
import logging
# disable informational messages from fbprophet
logging.getLogger('py4j').setLevel(logging.ERROR)
# COMMAND ----------
# MAGIC %md Based on our review of the data, it looks like we should set our overall growth pattern to linear and enable the evaluation of weekly and yearly seasonal patterns. We might also wish to set our seasonality mode to multiplicative as the seasonal pattern seems to grow with overall growth in sales:
# COMMAND ----------
# set model parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# fit the model to historical data
model.fit(history_pd)
# COMMAND ----------
# MAGIC %md Now that we have a trained model, let's use it to build a 90-day forecast:
# COMMAND ----------
# define a dataset including both historical dates & 90-days beyond the last available date
future_pd = model.make_future_dataframe(
periods=90,
freq='d',
include_history=True
)
# predict over the dataset
forecast_pd = model.predict(future_pd)
display(forecast_pd)
# COMMAND ----------
# MAGIC %md How did our model perform? Here we can see the general and seasonal trends in our model presented as graphs:
# COMMAND ----------
trends_fig = model.plot_components(forecast_pd)
display(trends_fig)
# COMMAND ----------
# MAGIC %md And here, we can see how our actual and predicted data line up as well as a forecast for the future, though we will limit our graph to the last year of historical data just to keep it readable:
# COMMAND ----------
predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales')
# adjust figure to display dates from last year + the 90 day forecast
xlim = predict_fig.axes[0].get_xlim()
new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
predict_fig.axes[0].set_xlim(new_xlim)
display(predict_fig)
# COMMAND ----------
# MAGIC %md **NOTE** This visualization is a bit busy. Bartosz Mikulski provides [an excellent breakdown](https://www.mikulskibartosz.name/prophet-plot-explained/) of it that is well worth checking out. In a nutshell, the black dots represent our actuals with the darker blue line representing our predictions and the lighter blue band representing our (95%) uncertainty interval.
# COMMAND ----------
# MAGIC %md Visual inspection is useful, but a better way to evaulate the forecast is to calculate Mean Absolute Error, Mean Squared Error and Root Mean Squared Error values for the predicted relative to the actual values in our set:
# COMMAND ----------
from sklearn.metrics import mean_squared_error, mean_absolute_error
from math import sqrt
from datetime import date
# get historical actuals & predictions for comparison
actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
predicted_pd = forecast_pd[ forecast_pd['ds'] < date(2018, 1, 1) ]['yhat']
# calculate evaluation metrics
mae = mean_absolute_error(actuals_pd, predicted_pd)
mse = mean_squared_error(actuals_pd, predicted_pd)
rmse = sqrt(mse)
# print metrics to the screen
print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
# COMMAND ----------
# MAGIC %md FBProphet provides [additional means](https://facebook.github.io/prophet/docs/diagnostics.html) for evaluating how your forecasts hold up over time. You're strongly encouraged to consider using these and those additional techniques when building your forecast models but we'll skip this here to focus on the scaling challenge.
# COMMAND ----------
# MAGIC %md ###Scaling Model Training & Forecasting
# MAGIC
# MAGIC With the mechanics under our belt, let's now tackle our original goal of building numerous, fine-grain models & forecasts for individual store and item combinations. We will start by assembling sales data at the store-item-date level of granularity:
# MAGIC
# MAGIC **NOTE**: The data in this data set should already be aggregated at this level of granularity but we are explicitly aggregating to ensure we have the expected data structure.
# COMMAND ----------
sql_statement = '''
SELECT
store,
item,
CAST(date as date) as ds,
SUM(sales) as y
FROM train
GROUP BY store, item, ds
ORDER BY store, item, ds
'''
store_item_history = (
spark
.sql( sql_statement )
.repartition(sc.defaultParallelism, ['store', 'item'])
).cache()
# COMMAND ----------
# MAGIC %md With our data aggregated at the store-item-date level, we need to consider how we will pass our data to FBProphet. If our goal is to build a model for each store and item combination, we will need to pass in a store-item subset from the dataset we just assembled, train a model on that subset, and receive a store-item forecast back. We'd expect that forecast to be returned as a dataset with a structure like this where we retain the store and item identifiers for which the forecast was assembled and we limit the output to just the relevant subset of fields generated by the Prophet model:
# COMMAND ----------
from pyspark.sql.types import *
result_schema =StructType([
StructField('ds',DateType()),
StructField('store',IntegerType()),
StructField('item',IntegerType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('yhat_lower',FloatType())
])
# COMMAND ----------
# MAGIC %md To train the model and generate a forecast we will leverage a Pandas user-defined function (UDF). We will define this function to receive a subset of data organized around a store and item combination. It will return a forecast in the format identified in the previous cell:
# COMMAND ----------
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast_store_item( history_pd ):
# TRAIN MODEL AS BEFORE
# --------------------------------------
# remove missing values (more likely at day-store-item level)
history_pd = history_pd.dropna()
# configure the model
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# train the model
model.fit( history_pd )
# --------------------------------------
# BUILD FORECAST AS BEFORE
# --------------------------------------
# make predictions
future_pd = model.make_future_dataframe(
periods=90,
freq='d',
include_history=True
)
forecast_pd = model.predict( future_pd )
# --------------------------------------
# ASSEMBLE EXPECTED RESULT SET
# --------------------------------------
# get relevant fields from forecast
f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
# get relevant fields from history
h_pd = history_pd[['ds','store','item','y']].set_index('ds')
# join history and forecast
results_pd = f_pd.join( h_pd, how='left' )
results_pd.reset_index(level=0, inplace=True)
# get store & item from incoming data set
results_pd['store'] = history_pd['store'].iloc[0]
results_pd['item'] = history_pd['item'].iloc[0]
# --------------------------------------
# return expected dataset
return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]
# COMMAND ----------
# MAGIC %md There's a lot taking place within our UDF, but if you compare the first two blocks of code within which the model is being trained and a forecast is being built to the cells in the previous portion of this notebook, you'll see the code is pretty much the same as before. It's only in the assembly of the required result set that truly new code is being introduced and it consists of fairly standard Pandas dataframe manipulations.
# COMMAND ----------
# MAGIC %md Now let's call our UDF to build our forecasts. We do this by grouping our historical dataset around store and item. We then apply our UDF to each group and tack on today's date as our *training_date* for data management purposes:
# COMMAND ----------
from pyspark.sql.functions import current_date
results = (
store_item_history
.groupBy('store', 'item')
.apply(forecast_store_item)
.withColumn('training_date', current_date() )
)
results.createOrReplaceTempView('new_forecasts')
# COMMAND ----------
# MAGIC %md We we are likely wanting to report on our forecasts, so let's save them to a queriable table structure:
# COMMAND ----------
# MAGIC %sql
# MAGIC create database if not exists datasci_overview;
# MAGIC use datasci_overview;
# MAGIC -- create forecast table
# MAGIC create table if not exists forecasts (
# MAGIC date date,
# MAGIC store integer,
# MAGIC item integer,
# MAGIC sales float,
# MAGIC sales_predicted float,
# MAGIC sales_predicted_upper float,
# MAGIC sales_predicted_lower float,
# MAGIC training_date date
# MAGIC )
# MAGIC using delta
# MAGIC partitioned by (training_date);
# MAGIC
# MAGIC -- load data to it
# MAGIC insert into forecasts
# MAGIC select
# MAGIC ds as date,
# MAGIC store,
# MAGIC item,
# MAGIC y as sales,
# MAGIC yhat as sales_predicted,
# MAGIC yhat_upper as sales_predicted_upper,
# MAGIC yhat_lower as sales_predicted_lower,
# MAGIC training_date
# MAGIC from new_forecasts;
# COMMAND ----------
# MAGIC %md But how good (or bad) is each forecast? Using the UDF technique, we can generate evaluation metrics for each store-item forecast as follows:
# COMMAND ----------
import pandas as pd
# schema of expected result set
eval_schema =StructType([
StructField('training_date', DateType()),
StructField('store', IntegerType()),
StructField('item', IntegerType()),
StructField('mae', FloatType()),
StructField('mse', FloatType()),
StructField('rmse', FloatType())
])
# define udf to calculate metrics
@pandas_udf( eval_schema, PandasUDFType.GROUPED_MAP )
def evaluate_forecast( evaluation_pd ):
# get store & item in incoming data set
training_date = evaluation_pd['training_date'].iloc[0]
store = evaluation_pd['store'].iloc[0]
item = evaluation_pd['item'].iloc[0]
# calulate evaluation metrics
mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
rmse = sqrt( mse )
# assemble result set
results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
return pd.DataFrame.from_dict( results )
# calculate metrics
results = (
spark
.table('new_forecasts')
.filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
.select('training_date', 'store', 'item', 'y', 'yhat')
.groupBy('training_date', 'store', 'item')
.apply(evaluate_forecast)
)
results.createOrReplaceTempView('new_forecast_evals')
# COMMAND ----------
# MAGIC %md Once again, we will likely want to report the metrics for each forecast, so we persist these to a queriable table:
# COMMAND ----------
# MAGIC %sql
# MAGIC use datasci_overview;
# MAGIC create table if not exists forecast_evals (
# MAGIC store integer,
# MAGIC item integer,
# MAGIC mae float,
# MAGIC mse float,
# MAGIC rmse float,
# MAGIC training_date date
# MAGIC )
# MAGIC using delta
# MAGIC partitioned by (training_date);
# MAGIC
# MAGIC insert into forecast_evals
# MAGIC select
# MAGIC store,
# MAGIC item,
# MAGIC mae,
# MAGIC mse,
# MAGIC rmse,
# MAGIC training_date
# MAGIC from new_forecast_evals;
# COMMAND ----------
# MAGIC %md We now have constructed a forecast for each store-item combination and generated basic evaluation metrics for each. To see this forecast data, we can issue a simple query (limited here to product 1 across stores 1 through 10):
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC store,
# MAGIC date,
# MAGIC sales_predicted,
# MAGIC sales_predicted_upper,
# MAGIC sales_predicted_lower
# MAGIC FROM forecasts a
# MAGIC WHERE item = 1 AND
# MAGIC --store IN (1, 2, 3, 4, 5) AND
# MAGIC date >= '2018-01-01' AND
# MAGIC training_date=current_date()
# MAGIC ORDER BY store
# COMMAND ----------
# MAGIC %md And for each of these, we can retrieve a measure of help us assess the reliability of each forecast:
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC store,
# MAGIC mae,
# MAGIC mse,
# MAGIC rmse
# MAGIC FROM forecast_evals a
# MAGIC WHERE item = 1 AND
# MAGIC training_date=current_date()
# MAGIC ORDER BY store