-
Notifications
You must be signed in to change notification settings - Fork 1
/
google_bq_helper_functions.py
60 lines (44 loc) · 1.74 KB
/
google_bq_helper_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
import json
from google.cloud import bigquery
import pandas_gbq
def load_gbq_creds():
cred_path = "RTC-business-impact-bd1c675235ef.json"
if not os.path.isfile(cred_path):
print('ERROR: No file called `creds.json` found in the path.')
return None
creds = json.load(open(cred_path,))
return creds
def write_df_gbq_new_table(df, table_id, project_id):
try:
pandas_gbq.to_gbq(df, table_id, project_id, if_exists='replace')
print(f"Successfully wrote DF to new BigQuery table (`{table_id}`)!")
except Exception as e:
print(f"Failed to write DF to new BigQuery table (`{table_id}`): {e}")
def append_df_gbq(df, table_id, project_id):
try:
pandas_gbq.to_gbq(df, table_id, project_id, if_exists='append')
print(f"Successfully appended DF to new BigQuery table (`{table_id}`)!")
except Exception as e:
print(f"Failed to write DF to existing BigQuery table: {e}")
def read_gbq_df(sql, project_id):
df = pandas_gbq.read_gbq(sql, project_id=project_id)
return df
def load_csv_bq(filename, dataset_id, table_id):
client = bigquery.Client()
print(client)
dataset_ref = client.dataset(dataset_id) # TODO
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.autodetect = True
# with open(filename, "rb") as source_file:
# job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
#
# job.result() # Waits for table load to complete.
#
# print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_id, table_id))
def append_data_bq():
# TODO
pass