-
Notifications
You must be signed in to change notification settings - Fork 0
/
db_manager.py
174 lines (148 loc) · 5.36 KB
/
db_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import psycopg2
from psycopg2 import sql
import json
import os
from dotenv import load_dotenv
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
load_dotenv()
class DBManager:
def __init__(self):
self.db_name = os.getenv("POSTGRES_DB")
self.db_user = os.getenv("POSTGRES_USER")
self.db_password = os.getenv("POSTGRES_PASSWORD")
self.db_host = os.getenv("POSTGRES_HOST")
self.db_port = os.getenv("POSTGRES_PORT")
self.influx_url = os.getenv("INFLUXDB_URL")
self.influx_token = os.getenv("INFLUXDB_TOKEN")
self.influx_org = os.getenv("INFLUXDB_ORG")
self.influx_bucket = os.getenv("INFLUXDB_BUCKET")
self.create_tables()
def create_conn(self):
conn_params = {
'dbname': self.db_name,
'user': self.db_user,
'password': self.db_password,
'host': self.db_host,
'port': self.db_port
}
print(f"Connecting to database with params: {conn_params}")
try:
conn = psycopg2.connect(**conn_params)
return conn
except Exception as e:
print(f"An error occurred: {e}")
return None
def create_tables(self):
try:
commands = (
"""
CREATE TABLE IF NOT EXISTS vehicles (
id SERIAL PRIMARY KEY,
plate_number VARCHAR(255) NOT NULL,
model VARCHAR(255) NOT NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS daily_data (
id SERIAL PRIMARY KEY,
date DATE NOT NULL,
vehicle VARCHAR(255) NOT NULL,
category VARCHAR(255) NOT NULL,
amount REAL NOT NULL
)
"""
)
conn = self.create_conn()
if conn is None:
print("Failed to connect to the database")
return
cur = conn.cursor()
for command in commands:
print(f"Executing command: {command}")
cur.execute(command)
conn.commit()
cur.close()
conn.close()
except psycopg2.Error as e:
print(f"An error occurred: {e}")
def insert_daily_data(self, date, vehicle, category, amount):
sql = """INSERT INTO daily_data (date, vehicle, category, amount)
VALUES (%s, %s, %s, %s)"""
conn = self.create_conn()
if conn is None:
print("Failed to connect to the database")
return
cur = conn.cursor()
cur.execute(sql, (date, vehicle, category, amount))
conn.commit()
cur.close()
conn.close()
def get_daily_data(self):
conn = self.create_conn()
if conn is None:
print("Failed to connect to the database")
return
cur = conn.cursor()
cur.execute("SELECT * FROM daily_data")
rows = cur.fetchall()
cur.close()
conn.close()
return rows
def get_daily_trip_data(self):
client = InfluxDBClient(url=self.influx_url, token=self.influx_token, org=self.influx_org)
query_api = client.query_api()
query = f'from(bucket:"{self.influx_bucket}") |> range(start: -1d)'
tables = query_api.query(query)
client.close()
return tables
def add_vehicle(self, plate, model):
sql = """INSERT INTO vehicles (plate_number, model)
VALUES (%s, %s)"""
conn = self.create_conn()
if conn is None:
print("Failed to connect to the database")
return
cur = conn.cursor()
cur.execute(sql, (plate, model))
conn.commit()
cur.close()
conn.close()
def list_vehicles(self):
conn = self.create_conn()
if conn is None:
print("Failed to connect to the database")
return
cur = conn.cursor()
cur.execute("SELECT * FROM vehicles")
rows = cur.fetchall()
cur.close()
conn.close()
return rows
def write_influx_data(self, data):
client = InfluxDBClient(url=self.influx_url, token=self.influx_token, org=self.influx_org)
write_api = client.write_api(write_options=SYNCHRONOUS)
point = Point("trip_data")
for key, value in data.items():
point = point.field(key, value)
write_api.write(bucket=self.influx_bucket, org=self.influx_org, record=point)
client.close()
def clear_influxdb_data(self, start_date, end_date):
client = InfluxDBClient(url=self.influx_url, token=self.influx_token, org=self.influx_org)
delete_api = client.delete_api()
start = f"{start_date}T00:00:00Z"
stop = f"{end_date}T00:00:00Z"
delete_api.delete(start, stop, '_measurement="trip_data"', bucket=self.influx_bucket, org=self.influx_org)
client.close()
def clear_sqlite_data(self):
conn = self.create_conn()
if conn is None:
print("Failed to connect to the database")
return
cur = conn.cursor()
cur.execute("DELETE FROM daily_data")
conn.commit()
cur.close()
conn.close()
def close_influx_connection(self):
pass