Skip to content

Commit

Permalink
refactoring: create base class for db. create db for ipfs
Browse files Browse the repository at this point in the history
  • Loading branch information
tubleronchik committed Sep 30, 2024
1 parent 3cfc5b2 commit 76a2479
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 39 deletions.
3 changes: 2 additions & 1 deletion connectivity/config/default.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"general": {
"publish_interval": 30,
"db_path": ""
"datalog_db_path":"",
"ipfs_db_path":""
},
"comstation": {
"enable": false,
Expand Down
6 changes: 3 additions & 3 deletions connectivity/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .src.feeders import DatalogFeeder, FrontierFeeder, RobonomicsFeeder
from .src.stations import COMStation, HTTPStation, MQTTStation
from .src.stations.trackargostation import TrackAgroStation
from .utils.database import DataBase
from .utils.datalog_db import DatalogDB

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("sensors-connectivity")
Expand Down Expand Up @@ -45,7 +45,7 @@ def __init__(self, path: str) -> None:
self.stations: list = self._populate_stations()
self.feeders: list = self._populate_feeders()
self.station_data: list = []
self.db: DataBase = DataBase(self.config)
self.datalog_db: DatalogDB = DatalogDB(self.config["general"]["datalog_db_path"])

def _read_configuration(self, config_path: str) -> dict:
"""Internal method. Loads configuration.
Expand Down Expand Up @@ -145,7 +145,7 @@ def db_watcher() -> None:

logger.info("Checking data base...")
Timer(3600, db_watcher).start()
for data in self.db.checker(time.time()):
for data in self.datalog_db.checker(time.time()):
for hash in data:
self.feeders[2].to_datalog(hash)

Expand Down
16 changes: 10 additions & 6 deletions connectivity/src/feeders/datalog_feeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from robonomicsinterface import RWS, Account, Datalog

from connectivity.config.logging import LOGGING_CONFIG
from connectivity.utils.database import DataBase
from connectivity.utils.datalog_db import DatalogDB
from connectivity.utils.ipfs_db import IPFSDB

from ...constants import PING_MODEL
from .ifeeder import IFeeder
Expand Down Expand Up @@ -173,8 +174,10 @@ def __init__(self, config) -> None:
if config["robonomics"]["ipfs_provider"]
else "/ip4/127.0.0.1/tcp/5001/http"
)
self.db: DataBase = DataBase(self.config)
self.db.create_table()
self.datalog_db: DatalogDB = DatalogDB(self.config["general"]["datalog_db_path"])
self.ipfs_db: IPFSDB = IPFSDB(self.config["general"]["ipfs_db_path"])
self.datalog_db.create_table()
self.ipfs_db.create_table()

def feed(self, data: tp.List[dict]) -> None:
"""Main function of the feeder and it is called in `main.py`. It collects
Expand All @@ -185,7 +188,7 @@ def feed(self, data: tp.List[dict]) -> None:
"""
if self.config["datalog"]["enable"]:
if data:
for d in data:
for d in data:
if d.public and d.model != PING_MODEL:
logger.debug(f"DatalogFeeder: Adding data to buffer: {d}")
self.buffer.add(d)
Expand All @@ -195,7 +198,8 @@ def feed(self, data: tp.List[dict]) -> None:
self.last_time = time.time()
logger.debug("Datalog Feeder: About to publish collected data...")
logger.debug(f"Datalog Feeder: Buffer is {self.buffer}")
ipfs_hash, file_path, file_size = _get_multihash(self.buffer, self.db, self.ipfs_endpoint)
ipfs_hash, file_path, file_size = _get_multihash(self.buffer, self.datalog_db, self.ipfs_endpoint)
self.ipfs_db.add_hash(ipfs_hash)
self._pin_to_temporal(file_path)
_pin_to_pinata(file_path, self.config)
self.buffer = set()
Expand Down Expand Up @@ -258,7 +262,7 @@ def to_datalog(self, ipfs_hash: str) -> None:
f"Datalog Feeder: Ipfs hash sent to Robonomics Parachain and included in block {robonomics_receipt}"
)
DATALOG_STATUS_METRIC.state("success")
self.db.update_status("sent", ipfs_hash)
self.datalog_db.update_status("sent", ipfs_hash)
except Exception as e:
logger.warning(f"Datalog Feeder: Something went wrong during extrinsic submission to Robonomics: {e}")
DATALOG_STATUS_METRIC.state("error")
63 changes: 34 additions & 29 deletions connectivity/utils/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,74 @@
import contextlib
import sqlite3
import typing as tp
from sqlite3.dbapi2 import connect


class DataBase:
def __init__(self, config: dict) -> None:
self.config: dict = config
def __init__(self, path_to_db_file: str, table_name) -> None:
self.path_to_db_file: str = path_to_db_file
self.table_name: str = table_name

def connection(self) -> tp.Any:
db_file = self.config["general"]["db_path"]
db_file = self.path_to_db_file
try:
connection = sqlite3.connect(db_file)

except sqlite3.Error as e:
print(f"Could not connect to data base {e}")

return connection

def create_table(self) -> None:
def create_table(self, rows_with_types: str) -> None:
connection = self.connection()
cursor = connection.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS datalog (
id integer PRIMARY KEY,
status text,
hash text,
time real,
payload blob
); """
f"CREATE TABLE IF NOT EXISTS {self.table_name} ({rows_with_types});"
)

def add_data(self, status, hash, time, payload) -> None:
def insert_data(self, value_names: str, values: tuple):
connection = self.connection()
cursor = connection.cursor()
values_str = ', '.join(["?"] * len(values))
with contextlib.closing(connection) as conn: # auto-closes
with conn: # auto-commits
with contextlib.closing(cursor) as cursor: # auto-closes
cursor.execute(
"INSERT INTO datalog (status, hash, time, payload) VALUES (?, ?, ?, ?)",
(status, hash, time, payload),
f"INSERT INTO {self.table_name} ({value_names}) VALUES ({values_str})",
values,
)


def update_status(self, status, hash) -> None:
def update(self, condition: str, values: tuple) -> None:
connection = self.connection()
cursor = connection.cursor()
with contextlib.closing(connection) as conn: # auto-closes
with conn: # auto-commits
with contextlib.closing(cursor) as cursor: # auto-closes
cursor.execute(
"UPDATE datalog SET status = ? WHERE hash = ?", (status, hash)
f"UPDATE {self.table_name} {condition}", values
)

def get_data(self, values_to_select: str, condifitons: str, condidition_values: tp.Optional[tuple]=None) -> list:
connection = self.connection()
cursor = connection.cursor()
with contextlib.closing(connection) as conn: # auto-closes
with conn: # auto-commits
with contextlib.closing(cursor) as cursor: # auto-closes
if condidition_values:
data_from_table = cursor.execute(
f"SELECT {values_to_select} FROM {self.table_name} {condifitons}",
condidition_values,
).fetchall()
else:
data_from_table = cursor.execute(
f"SELECT {values_to_select} FROM {self.table_name} {condifitons}",
).fetchall()
return data_from_table

def checker(self, current_time) -> list:
def delete_data(self, conditions: str) -> None:
connection = self.connection()
cursor = connection.cursor()
time = current_time - 86400 # 24hrs
with contextlib.closing(connection) as conn: # auto-closes
with conn: # auto-commits
with contextlib.closing(cursor) as cursor: # auto-closes
hashes = cursor.execute(
"SELECT hash FROM datalog WHERE time < ? AND status='not sent'",
(time,),
).fetchall()
cursor.execute("DELETE FROM datalog WHERE status='sent'")
return hashes
cursor.execute(f"DELETE FROM {self.table_name} {conditions}")
25 changes: 25 additions & 0 deletions connectivity/utils/datalog_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

from .database import DataBase

class DatalogDB:
def __init__(self, path_to_db_file: str) -> None:
self.db_class = DataBase(path_to_db_file, "datalog")

def create_table(self) -> None:
self.db_class.create_table("id integer PRIMARY KEY, status text, hash text, time real, payload blob")

def add_data(self, status, hash, time, payload) -> None:
print("adding data to db")
self.db_class.insert_data("status, hash, time, payload", (status, hash, time, payload))

def update_status(self, status, hash) -> None:
self.db_class.update("SET status = ? WHERE hash = ?", (status, hash))

def checker(self, current_time) -> list:
time = current_time - 86400 # 24hrs
hashes = self.db_class.get_data("hash", "WHERE time < ? AND status='not sent'", (time,))
self.db_class.delete_data("WHERE status='sent'")
return hashes



13 changes: 13 additions & 0 deletions connectivity/utils/ipfs_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .database import DataBase


class IPFSDB:
def __init__(self, path_to_db_file: str) -> None:
self.db_class = DataBase(path_to_db_file, "ipfs")

def create_table(self) -> None:
self.db_class.create_table("id integer PRIMARY KEY, status text, hash text")

def add_hash(self, hash: str) -> None:
status = "pinned"
self.db_class.insert_data("status, hash", (status, hash))

0 comments on commit 76a2479

Please sign in to comment.