-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatahub.py
152 lines (127 loc) · 4.98 KB
/
datahub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# %% Libraries
import json
import pandas as pd
import requests
import re
from dateutil.parser import parse
from datetime import datetime, timedelta
def camel_to_snake(names):
return [x.replace('.', '_') for x in [re.sub(r'(?<!^)(?=[A-Z])', '_',x).lower() for x in names]]
# %% Get data access token
def get_data_access_token(refresh_token):
"""
Returns the data access token.
Parameters
----------
refresh_token : str
Refresh token, which can be generated on https://energinet.dk/Energidata/DataHub/Eloverblik. Please note that the refresh token is valid for one year.
Returns
-------
dataaccesstoken : str
Data access token, which should be used retrieve your own electricity data.
"""
base_url = "https://api.eloverblik.dk/CustomerApi/api/"
url = base_url+'token'
header = {"Authorization": "Bearer "+refresh_token}
r = requests.get(url, headers=header)
r.raise_for_status()
res = r.text
json_res = json.loads(res)
dataaccesstoken = json_res['result']
return dataaccesstoken
# %% Get metering point
def get_metering_points(token: str):
"""
Returns a data frame with all metering points.
Parameters
----------
token : str
Data access token generated by get_data_access_token()
Returns
-------
df : DataFrame
Pandas data frame with all metering points.
"""
base_url = "https://api.eloverblik.dk/CustomerApi/api/"
url = base_url + 'meteringpoints/meteringpoints?includeAll=false'
header = {"accept": "application/json", "Authorization": "Bearer "+token}
r = requests.get(url, headers=header)
r.raise_for_status()
res = r.text
json_res = json.loads(res)
df = pd.json_normalize(json_res['result'])
df.columns = camel_to_snake(df.columns)
return df
# %% Get time series
def get_time_series(token: str, metering_point_id: str, start_date: str, end_date: str, resolution: str='Hour'):
"""
Returns DataFrame with electricity time series readings.
Parameters
----------
token : str
Data access token generated by get_data_access_token().
metering_point_id : str
Generated by get_metering_points().
start_date : str
Start date in format YYYY-MM-DD.
end_date : str
End date in format YYYY-MM-DD..
resolution : str, optional
Resolution type. Must be either 'Hour' or 'Day'.
Returns
-------
readings_df : Pandas DataFrame
Electricity readings.
"""
# Validating aggregating input
resolution_types = ['Hour', 'Day']
if resolution not in resolution_types:
raise ValueError("Invalid aggregation. Expected one of: %s" % resolution_types)
# Validating date formats
date_format = "%Y-%m-%d"
try:
datetime.strptime(start_date, date_format)
except:
raise ValueError(f"start_date does not match format {date_format}")
try:
datetime.strptime(end_date, date_format)
except:
raise ValueError(f"end_date does not match format {date_format}")
# Retrieving time series data
base_url = "https://api.eloverblik.dk/CustomerApi/api/"
url = base_url + 'MeterData/GetTimeSeries/' + start_date + '/' + end_date + '/' + resolution
header = {"accept": "application/json", "Authorization": "Bearer "+token, "Content-Type": "application/json"}
data_input = {
"meteringPoints":
{"meteringPoint": [metering_point_id]}
}
r = requests.post(url=url, headers=header, json=data_input)
r.raise_for_status()
res = r.content
json_res = json.loads(res)
if len(json_res['result'][0]['MyEnergyData_MarketDocument']['TimeSeries']) == 0:
raise ValueError("No data")
readings = json_res['result'][0]['MyEnergyData_MarketDocument']['TimeSeries'][0]['Period']
readings_df = pd.DataFrame(columns=['metering_point_id','timestamp_local','timestamp_utc','resolution','energy','unit','quality_code'])
for reading in readings:
start_time = parse(reading['timeInterval']['start'])
reading_df = pd.json_normalize(reading['Point'])
reading_df['position'] = reading_df['position'].astype('int32')
if reading['resolution'] == 'PT1D':
reading_df['timestamp_utc'] = [start_time + timedelta(days=x) for x in reading_df['position'] -1]
elif reading['resolution'] == 'PT1H':
reading_df['timestamp_utc'] = [start_time + timedelta(hours=x) for x in reading_df['position'] -1]
else:
continue
reading_df['resolution'] = reading['resolution']
reading_df['timestamp_local'] = reading_df['timestamp_utc'].dt.tz_convert('Europe/Copenhagen')
reading_df = reading_df.rename(columns={'out_Quantity.quantity':'energy','out_Quantity.quality':'quality_code'})
readings_df = readings_df.append(reading_df[['timestamp_local','timestamp_utc','resolution','energy','quality_code']], ignore_index=True)
unit = json_res['result'][0]['MyEnergyData_MarketDocument']['TimeSeries'][0]['measurement_Unit.name']
if unit.lower() == 'kwh':
unit = 'kWh'
elif unit.lower() == 'mwh':
unit = 'MWh'
readings_df['metering_point_id'] = json_res['result'][0]['id']
readings_df['unit'] = unit
return readings_df