Skip to content

Commit

Permalink
Merge pull request #91 from airalab/dev
Browse files Browse the repository at this point in the history
v.1.6.4
  • Loading branch information
tubleronchik authored Oct 3, 2024
2 parents adc862d + 5cc7e4f commit dada6c9
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 51 deletions.
3 changes: 2 additions & 1 deletion connectivity/config/default.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"general": {
"publish_interval": 30,
"db_path": ""
"datalog_db_path":"",
"ipfs_db_path":""
},
"comstation": {
"enable": false,
Expand Down
6 changes: 3 additions & 3 deletions connectivity/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .src.feeders import DatalogFeeder, FrontierFeeder, RobonomicsFeeder
from .src.stations import COMStation, HTTPStation, MQTTStation
from .src.stations.trackargostation import TrackAgroStation
from .utils.database import DataBase
from .utils.datalog_db import DatalogDB

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("sensors-connectivity")
Expand Down Expand Up @@ -45,7 +45,7 @@ def __init__(self, path: str) -> None:
self.stations: list = self._populate_stations()
self.feeders: list = self._populate_feeders()
self.station_data: list = []
self.db: DataBase = DataBase(self.config)
self.datalog_db: DatalogDB = DatalogDB(self.config["general"]["datalog_db_path"])

def _read_configuration(self, config_path: str) -> dict:
"""Internal method. Loads configuration.
Expand Down Expand Up @@ -145,7 +145,7 @@ def db_watcher() -> None:

logger.info("Checking data base...")
Timer(3600, db_watcher).start()
for data in self.db.checker(time.time()):
for data in self.datalog_db.checker(time.time()):
for hash in data:
self.feeders[2].to_datalog(hash)

Expand Down
43 changes: 27 additions & 16 deletions connectivity/src/feeders/datalog_feeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from robonomicsinterface import RWS, Account, Datalog

from connectivity.config.logging import LOGGING_CONFIG
from connectivity.utils.database import DataBase
from connectivity.utils.datalog_db import DatalogDB
from connectivity.utils.ipfs_db import IPFSDB

from ...constants import PING_MODEL
from .ifeeder import IFeeder
Expand Down Expand Up @@ -73,12 +74,19 @@ def _get_multihash(buf: set, db: object, endpoint: str = "/ip4/127.0.0.1/tcp/500
if m.public in payload:
payload[m.public]["measurements"].append(m.measurement)
else:
payload[m.public] = {
"model": m.model,
"geo": "{},{}".format(m.geo_lat, m.geo_lon),
"donated_by": m.donated_by,
"measurements": [m.measurement],
}
if m.geo_lat:
payload[m.public] = {
"model": m.model,
"geo": "{},{}".format(m.geo_lat, m.geo_lon),
"donated_by": m.donated_by,
"measurements": [m.measurement],
}
else:
payload[m.public] = {
"model": m.model,
"donated_by": m.donated_by,
"measurements": [m.measurement],
}
except Exception as e:
logger.warning(f"Datalog Feeder: Couldn't store data: {e}")

Expand Down Expand Up @@ -173,8 +181,10 @@ def __init__(self, config) -> None:
if config["robonomics"]["ipfs_provider"]
else "/ip4/127.0.0.1/tcp/5001/http"
)
self.db: DataBase = DataBase(self.config)
self.db.create_table()
self.datalog_db: DatalogDB = DatalogDB(self.config["general"]["datalog_db_path"])
self.ipfs_db: IPFSDB = IPFSDB(self.config["general"]["ipfs_db_path"])
self.datalog_db.create_table()
self.ipfs_db.create_table()

def feed(self, data: tp.List[dict]) -> None:
"""Main function of the feeder and it is called in `main.py`. It collects
Expand All @@ -185,7 +195,7 @@ def feed(self, data: tp.List[dict]) -> None:
"""
if self.config["datalog"]["enable"]:
if data:
for d in data:
for d in data:
if d.public and d.model != PING_MODEL:
logger.debug(f"DatalogFeeder: Adding data to buffer: {d}")
self.buffer.add(d)
Expand All @@ -195,7 +205,8 @@ def feed(self, data: tp.List[dict]) -> None:
self.last_time = time.time()
logger.debug("Datalog Feeder: About to publish collected data...")
logger.debug(f"Datalog Feeder: Buffer is {self.buffer}")
ipfs_hash, file_path, file_size = _get_multihash(self.buffer, self.db, self.ipfs_endpoint)
ipfs_hash, file_path, file_size = _get_multihash(self.buffer, self.datalog_db, self.ipfs_endpoint)
self.ipfs_db.add_hash(ipfs_hash)
self._pin_to_temporal(file_path)
_pin_to_pinata(file_path, self.config)
self.buffer = set()
Expand Down Expand Up @@ -253,12 +264,12 @@ def to_datalog(self, ipfs_hash: str) -> None:
robonomics_receipt = rws_datalog.record(ipfs_hash)
else:
datalog = Datalog(account)
robonomics_receipt = datalog.record(ipfs_hash)
logger.info(
f"Datalog Feeder: Ipfs hash sent to Robonomics Parachain and included in block {robonomics_receipt}"
)
# robonomics_receipt = datalog.record(ipfs_hash)
# logger.info(
# f"Datalog Feeder: Ipfs hash sent to Robonomics Parachain and included in block {robonomics_receipt}"
# )
DATALOG_STATUS_METRIC.state("success")
self.db.update_status("sent", ipfs_hash)
self.datalog_db.update_status("sent", ipfs_hash)
except Exception as e:
logger.warning(f"Datalog Feeder: Something went wrong during extrinsic submission to Robonomics: {e}")
DATALOG_STATUS_METRIC.state("error")
7 changes: 7 additions & 0 deletions connectivity/src/sensors/environmental_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,10 @@ def _SDS011_values_saver(self, meas: dict, value: dict) -> dict:
else:
meas[value["value_type"]] = value["value"]
return meas


def __str__(self) -> str:
if self.model == SDS011_MODEL:
return f"{{Public: {self.public}, geo: ({self.geo_lat},{self.geo_lon}), model: {self.model}, donated_by: {self.donated_by}, measurements: {self.measurement}}}"
self.measurement.update({"geo": f"{self.geo_lat},{self.geo_lon}"})
return f"{{Public: {self.public}, model: {self.model}, donated_by: {self.donated_by}, measurements: {self.measurement}}}"
4 changes: 4 additions & 0 deletions connectivity/src/sensors/mobile_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ def _mobile_sensor_data_saver(self, meas: dict, value: tuple) -> dict:
meas[key] = float(item) / PASKAL2MMHG
meas[key] = item
return meas

def __str__(self) -> str:
self.measurement.update({"geo": f"{self.geo_lat},{self.geo_lon}"})
return f"{{Public: {self.public}, model: {self.model}, donated_by: {self.donated_by}, measurements: {self.measurement}}}"
63 changes: 34 additions & 29 deletions connectivity/utils/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,74 @@
import contextlib
import sqlite3
import typing as tp
from sqlite3.dbapi2 import connect


class DataBase:
def __init__(self, config: dict) -> None:
self.config: dict = config
def __init__(self, path_to_db_file: str, table_name) -> None:
self.path_to_db_file: str = path_to_db_file
self.table_name: str = table_name

def connection(self) -> tp.Any:
db_file = self.config["general"]["db_path"]
db_file = self.path_to_db_file
try:
connection = sqlite3.connect(db_file)

except sqlite3.Error as e:
print(f"Could not connect to data base {e}")

return connection

def create_table(self) -> None:
def create_table(self, rows_with_types: str) -> None:
connection = self.connection()
cursor = connection.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS datalog (
id integer PRIMARY KEY,
status text,
hash text,
time real,
payload blob
); """
f"CREATE TABLE IF NOT EXISTS {self.table_name} ({rows_with_types});"
)

def add_data(self, status, hash, time, payload) -> None:
def insert_data(self, value_names: str, values: tuple):
connection = self.connection()
cursor = connection.cursor()
values_str = ', '.join(["?"] * len(values))
with contextlib.closing(connection) as conn: # auto-closes
with conn: # auto-commits
with contextlib.closing(cursor) as cursor: # auto-closes
cursor.execute(
"INSERT INTO datalog (status, hash, time, payload) VALUES (?, ?, ?, ?)",
(status, hash, time, payload),
f"INSERT INTO {self.table_name} ({value_names}) VALUES ({values_str})",
values,
)


def update_status(self, status, hash) -> None:
def update(self, condition: str, values: tuple) -> None:
connection = self.connection()
cursor = connection.cursor()
with contextlib.closing(connection) as conn: # auto-closes
with conn: # auto-commits
with contextlib.closing(cursor) as cursor: # auto-closes
cursor.execute(
"UPDATE datalog SET status = ? WHERE hash = ?", (status, hash)
f"UPDATE {self.table_name} {condition}", values
)

def get_data(self, values_to_select: str, condifitons: str, condidition_values: tp.Optional[tuple]=None) -> list:
connection = self.connection()
cursor = connection.cursor()
with contextlib.closing(connection) as conn: # auto-closes
with conn: # auto-commits
with contextlib.closing(cursor) as cursor: # auto-closes
if condidition_values:
data_from_table = cursor.execute(
f"SELECT {values_to_select} FROM {self.table_name} {condifitons}",
condidition_values,
).fetchall()
else:
data_from_table = cursor.execute(
f"SELECT {values_to_select} FROM {self.table_name} {condifitons}",
).fetchall()
return data_from_table

def checker(self, current_time) -> list:
def delete_data(self, conditions: str) -> None:
connection = self.connection()
cursor = connection.cursor()
time = current_time - 86400 # 24hrs
with contextlib.closing(connection) as conn: # auto-closes
with conn: # auto-commits
with contextlib.closing(cursor) as cursor: # auto-closes
hashes = cursor.execute(
"SELECT hash FROM datalog WHERE time < ? AND status='not sent'",
(time,),
).fetchall()
cursor.execute("DELETE FROM datalog WHERE status='sent'")
return hashes
cursor.execute(f"DELETE FROM {self.table_name} {conditions}")
24 changes: 24 additions & 0 deletions connectivity/utils/datalog_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

from .database import DataBase

class DatalogDB:
def __init__(self, path_to_db_file: str) -> None:
self.db_class = DataBase(path_to_db_file, "datalog")

def create_table(self) -> None:
self.db_class.create_table("id integer PRIMARY KEY, status text, hash text, time real, payload blob")

def add_data(self, status, hash, time, payload) -> None:
self.db_class.insert_data("status, hash, time, payload", (status, hash, time, payload))

def update_status(self, status, hash) -> None:
self.db_class.update("SET status = ? WHERE hash = ?", (status, hash))

def checker(self, current_time) -> list:
time = current_time - 86400 # 24hrs
hashes = self.db_class.get_data("hash", "WHERE time < ? AND status='not sent'", (time,))
self.db_class.delete_data("WHERE status='sent'")
return hashes



13 changes: 13 additions & 0 deletions connectivity/utils/ipfs_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .database import DataBase


class IPFSDB:
def __init__(self, path_to_db_file: str) -> None:
self.db_class = DataBase(path_to_db_file, "ipfs")

def create_table(self) -> None:
self.db_class.create_table("id integer PRIMARY KEY, status text, hash text")

def add_hash(self, hash: str) -> None:
status = "pinned"
self.db_class.insert_data("status, hash", (status, hash))
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sensors_connectivity"
version = "1.6.3"
version = "1.6.4"
description = "Robonomics package to read data from sensors and publish to different output channels"
authors = [
"Vadim Manaenko <[email protected]>",
Expand All @@ -25,7 +25,7 @@ pyserial = "^3.5"
pynacl = "^1.5.0"
pyyaml = "^6.0"
requests = "^2.27.1"
sentry_sdk = "^1.1.5"
sentry_sdk = "2.8.0"
robonomics-interface = "^1.6.0"
ipfshttpclient = "0.8.0a2"
pinatapy-vourhey = "^0.1.3"
Expand Down

0 comments on commit dada6c9

Please sign in to comment.