-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCK_dbFunctions.py
66 lines (56 loc) · 2.07 KB
/
CK_dbFunctions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import json
import pandas as pd
import pymysql
import pymongo
import sqlalchemy
from sqlalchemy import create_engine
import os
# Heroku check
is_heroku = False
if 'IS_HEROKU' in os.environ:
is_heroku = True
if is_heroku == True:
# if IS_HEROKU is found in the environment variables, then use the rest
# NOTE: you still need to set up the IS_HEROKU environment variable on Heroku (it is not there by default)
mongoConn = os.environ.get('mongoConn')
remote_db_endpoint = os.environ.get('remote_db_endpoint')
remote_db_port = os.environ.get('remote_db_port')
remote_db_name = os.environ.get('remote_db_name')
remote_db_user = os.environ.get('remote_db_user')
remote_db_pwd = os.environ.get('remote_db_pwd')
else:
# use the config.py file if IS_HEROKU is not detected
from config import mongoConn, remote_db_endpoint, remote_db_port, remote_db_name, remote_db_user, remote_db_pwd
#CONNECT to mySQL
conn = pymysql.connect(host=f'{remote_db_endpoint}',
user=remote_db_user,
password=remote_db_pwd,
db=remote_db_name)
mycursor = conn.cursor()
pymysql.install_as_MySQLdb()
engine = create_engine(
f"mysql://{remote_db_user}:{remote_db_pwd}@{remote_db_endpoint}:{remote_db_port}/{remote_db_name}")
def view_exists(db_view_name):
conn = engine.connect()
sql = '''
SELECT TABLE_NAME DB_VIEW FROM (
SELECT TABLE_NAME FROM INFORMATION_SCHEMA.VIEWS
WHERE TABLE_SCHEMA = 'ripe_bananas'
UNION
SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'ripe_bananas'
) DB_VIEWS
'''
df = pd.read_sql(sql, con=conn)
df = df.loc[df.DB_VIEW == db_view_name]
conn.close()
return len(df) > 0
#THIS FUNCTION RETURNS MYSQL VIEW OF SERVICES TABLE
def get_dataframe_from_db(db_view_name):
conn = engine.connect()
if not view_exists(db_view_name):
return 'db view object not found / invalid'
sql = f''' SELECT * FROM {db_view_name}'''
df = pd.read_sql(sql, con=conn)
conn.close()
return df