-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunctions.py
412 lines (301 loc) · 13 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 26 13:28:14 2022
@author: alyabolowich
"""
#%% Import packages
import pymrio
import pandas as pd
import psycopg2
import psycopg2.extras
import requests
import re
import zipfile
import os
import sys
from datetime import date
import config
import shutil
#%% Get path
def getPath():
''' Get the path name that this project is stored in. '''
#path = os.path.join(os.path.dirname(__file__))
path = os.getcwd()
return path
#%% Get current DOI
def getCurrentDOI():
''' Get the current DOI, stored in the current_doi.txt file that accompanies
this project. Make a request or the url to see whether the latest DOI is
different from the previous version.
DOI 3583070 represents all versions of EXIOBASE on Zenodo.'''
# test for existence/absence of current_doi
with open(getPath() + "/current_doi.txt") as f:
if True:
print("Reading current DOI from currentDOI.txt.")
return f.read()
else:
print("Error retrieving DOI from current_doi.txt file.")
#%% Update current DOI
def updateCurrentDOI(retrieved_DOI):
''' If a new version of exiobase exists, this function will update the
current_doi.txt file with the new value.
Input arguments:
rerieved_DOI : value returned from getCurrentDOI '''
with open(getPath() + "/current_doi.txt", "w") as f:
if True:
f.write(str(retrieved_DOI))
print("New version found. Writing new DOI to currentDOI.txt.")
else:
print("Error updating DOI in the current_DOI.txt file.")
#%% Find most recent version of Exiobase
def findMostRecentVersion():
''' Check that the most recent EXIOBASE version is being used. '''
current_DOI = getCurrentDOI()
# DOI 3583070 gives access to the most recent EXOIBASE version
# by default, but each version has its own DOI.
main_url = "https://doi.org/10.5281/zenodo.3583070"
# Establish connection
req = requests.get(main_url)
# This will yield the url that holds the most recent Zenodo version
retrieved_url = req.url
# Extract only the DOI integer from the URL
retrieved_DOI = re.findall("\d+", retrieved_url)
retrieved_DOI = retrieved_DOI[0]
# Perform action based on whether the retrieved_URL matches the current_URL.
if current_DOI == retrieved_DOI:
print("Version is the same, no need to update API.")
#exit() # NEED TO UNCOMMENT
#return retrieved_DOI #this will need to move to the else statement later
elif current_DOI != retrieved_DOI:
print(type(current_DOI), type(retrieved_DOI))
updateCurrentDOI(retrieved_DOI)
print("DOIs did not match. New DOI updated in current_DOI.txt file.")
print("Running fundMostRecentVersion() again to get latest version")
findMostRecentVersion()
else:
print("Error finding most recent version of EXIOBASE.")
#%% Create a storage directory for EXIOBASE files that will be updated.
def createExioStorageDirectory():
''' Create a file automatically for the exiobase storage. '''
# Create a path to the directory which will be named "exiostorage"
new_directory = "exiostorage"
parent_dir = getPath()
exio_folder_path = os.path.join(parent_dir, new_directory)
# If the directory does not already exist, create it, else skip.
if not os.path.exists(exio_folder_path):
os.mkdir(exio_folder_path)
return exio_folder_path
elif os.path.exists(exio_folder_path):
print("Path to exiostorage folder already exists.")
return exio_folder_path
else:
print("Error creating exiobase storage directory.")
sys.exit("Cannot create EXIOBASE storage folder, and one does not exist.")
#%% Get EXIO storage directory
def getExioStorageDirectory():
new_directory = "exiostorage"
parent_dir = getPath()
exio_folder_path = os.path.join(parent_dir, new_directory)
return exio_folder_path
#%% Unzip file
def unzipExioFiles(year):
''' Files downloaded from Zenodo will be zipped. To use, these
must be uziipped first.'''
# Get path to exiostorage folder
directory = getExioStorageDirectory()
# Read the files in the directory based on the years targeted
myfile = "IOT_{}_ixi.zip".format(year)
file_to_unzip = os.path.join(directory, myfile)
print(file_to_unzip)
if zipfile.is_zipfile(file_to_unzip) is True:
print("ZIP file is valid.")
else:
print("Zip file is invalid. Cannot unzip.")
sys.exit("Unable to unzip Exiobase files in the exiostorage directory.")
with zipfile.ZipFile(file_to_unzip, 'r') as zip_ref:
if True:
print("Unzipping files")
zip_ref.extractall(directory)
else:
sys.exit("Error unzipping EXIOBASE files.")
#%% Get years we want to update
def getYears():
''' Each time the API is updated, automatically update the last 3 years to account
for any prior changes. This function will return a list of the last three years. '''
# Get current year
current_year = date.today().year
# Create list of the last two years
years = []
for i in range(2):
years.append(current_year- i)
return years
#%% Download from Zenodo
def dataDownload(year):
exio_storage = getExioStorageDirectory()
exio_meta = pymrio.download_exiobase3(
storage_folder = exio_storage,
system = "ixi",
years = year
)
print("Download of year {} complete.".format(year))
print("Downloaded successfully", exio_meta)
#%% Read csv files as dataframes
def readFiles(year):
''' Read the files from the exiostorage directory. Store as dataframes with
one vector of values.
Matrices extracted and returned as dataframes:
D_cba, D_pba, industries, Z'''
# Read in the files as dataframes
dcba = pd.read_csv(getPath() + "/exiostorage" +
"/IOT_{}_ixi/satellite/D_cba.txt".format(year),
delimiter="\t",
header=[0,1],
index_col=0)
dpba = pd.read_csv(getPath() + "/exiostorage" +
"/IOT_{}_ixi/satellite/D_pba.txt".format(year),
delimiter="\t",
header=[0,1],
index_col=0)
# Get regions based on DCBA (assume is same for DPBA)
regions = dcba.columns.levels[0].tolist()
return dcba, dpba, regions
#%% Format data
def formatData(df, year):
'''The returned dataframe will be stacked to allow for one column of
values. This formatting will be used in PostgreSQL.
Args:
1) dataframe from the ouput of readFiles()
2) year - required to append to "year" column
The resulting format for the dataframe (D_cba/D_pba matrices) is as follows:
stressor | region | sector | value | year
Returns a dictionary.'''
num_sectors = 163
num_stressors = 1113
# Replace all values from
stressors_ordered = df.index.str.replace(' ','_').str.lower()
sectors_ordered = df.columns.to_frame()['sector'].str.replace(' ','_').str.lower().unique()
regions_ordered = df.columns.to_frame()['region'].str.lower().unique()
data = df.values.ravel()
df_dict = dict()
# Need to make columns for the dataframe that will be stored in the dictionary
for r,region in enumerate(regions_ordered):
index_stack = pd.MultiIndex.from_product([stressors_ordered,
sectors_ordered,
[region],
[year]],
names=['stressor','sector','region','year'])
df_clean = pd.DataFrame(data[r*num_sectors*num_stressors:(r+1)*num_sectors*num_stressors],
index=index_stack,
columns=['value'])
df_clean.reset_index(inplace=True)
df_dict[region] = df_clean
return df_dict
#%% Separate each region into a separate df
def separateDfByRegion(df):
''' D_cba, D_pba are separated into regions and each regional dataframe
is stored as a value in a dictionary, and the keys are the region abbreviations.
These abbreviations follow the 2-letter country codes ISO 3166-1 country
codes used in exiobase.
Args:
- df is the dcba_clean or d_pba clean.
Returns a dictionary.'''
df_dict = dict(list(df.groupby('region')))
return df_dict
#%%
def connection():
con = psycopg2.connect(database = config.db_connection["database"],
user = config.db_connection["user"],
password = config.db_connection["password"],
host = config.db_connection["host"],
port = config.db_connection["port"])
cur = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
return con, cur
#%% Upload to Postgres tables
def uploadToPostgres(dictionary, con, cur, tblext):
''' Upload the dataframes stored in the dictionary to Postgres. This function
should only be used when initially uploading the data to Postgres. For continual
updates, use updateValuesInDatabase().
Required inputs:
- Dcba or dpba dictionary
- Postgres connection, con, and cursor, cur, from connection()
- Table extension (either 'dcba' or 'dpba'')
Adapted from: https://naysan.ca/2020/05/09/pandas-to-postgresql-using-psycopg2-bulk-insert-performance-benchmark/. '''
# Create table
# Make list of region names to be used in the names of the tables
names = list(dictionary.keys())
for name in names:
name.lower()
# Make table name
table = "{}_".format(name) + tblext
# =============================================================================
# # Drop if exists
# cur.execute("DROP TABLE IF EXISTS {}".format(table))
# cur.execute("COMMIT")
#
# # Create table
# cur.execute("""CREATE TABLE {} ("stressor" VARCHAR(255),
# "sector" VARCHAR(255),
# "region" VARCHAR(3),
# "value" DOUBLE PRECISION,
# "year" SMALLINT);""".format(table))
# cur.execute("ROLLBACK")
# =============================================================================
# Create a list of tuples from the dataframe values
df_to_upload = dictionary[name]
tuples = [tuple(x) for x in df_to_upload.to_numpy()]
# Comma-separated dataframe columns
cols = ','.join(list(df_to_upload.columns))
cols = ', '.join([f'"{word}"' for word in cols.split(',')])
query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
psycopg2.extras.execute_values(cur, query, tuples)
con.commit()
print("Uploaded {} to Postgres".format(name))
#%% Drop old table
def dropTable(dictionary, con, cur, tblext):
names = list(dictionary.keys())
for name in names:
name.lower()
# Make table name
table = "{}_".format(name) + tblext
# Drop if exists
cur.execute("DROP TABLE IF EXISTS {}".format(table))
print("Dropped table {}".format(table))
cur.execute("COMMIT")
# Create table
cur.execute("""CREATE TABLE {} ("stressor" VARCHAR(255),
"sector" VARCHAR(255),
"region" VARCHAR(3),
"value" DOUBLE PRECISION,
"year" SMALLINT);""".format(table))
cur.execute("ROLLBACK")
#%% Update values in DB
def updateValuesInDatabase(cur, con, df, table):
''' Update values in each Postgres table.
Inputs:
- dataframe (df) that will be updated (from dcba_dict or dpba_dict)
- table name (e.g. AT_dcba, BG_dpba)
'''
# Create a tuple of the values that need to be updated. In this case, the
# values need to be updated according to the year. How the sectors and
# stressors are arranged in the df is assumed to remain the same.
rows = zip(df.value, df.year)
# Create a temporary table to store the data that will be uploaded
cur.execute("""CREATE TEMP TABLE testtemp (value INTEGER, year INTEGER) ON COMMIT DROP""")
cur.executemany("""INSERT INTO testtemp (value, year) VALUES(%s, %s)""", rows)
cur.execute("""
UPDATE {}
SET value = testtemp.value
FROM testtemp
WHERE testtemp.year = {}.year;
""".format(table, table))
cur.rowcount
con.commit()
cur.close()
con.close()
#%% Remove file from exiostorage folder
def removeFilesFromExiostorage(year):
# Remove folder
shutil.rmtree(getPath() + "/exiostorage" + "/IOT_{}_ixi".format(year))
# Remove zip file
os.remove(getPath() + "/exiostorage" + "/IOT_{}_ixi.zip".format(year))