From 681056d1099ecd4d435d55c319cb80ba21e3bc27 Mon Sep 17 00:00:00 2001 From: Kai Koehler Date: Tue, 26 Nov 2024 17:56:08 -0700 Subject: [PATCH] Broken redis refactor Trying to account for additional API sources, as initial implementation was only for usgs --- src/sasquatchbackpack/commands/usgs.py | 16 ++++++++-------- src/sasquatchbackpack/sasquatch.py | 24 +++++++++++++++++++++++- src/sasquatchbackpack/schemas/usgs.py | 26 -------------------------- src/sasquatchbackpack/scripts/usgs.py | 3 --- 4 files changed, 31 insertions(+), 38 deletions(-) diff --git a/src/sasquatchbackpack/commands/usgs.py b/src/sasquatchbackpack/commands/usgs.py index bafafe6..5b5c516 100644 --- a/src/sasquatchbackpack/commands/usgs.py +++ b/src/sasquatchbackpack/commands/usgs.py @@ -165,6 +165,14 @@ def usgs_earthquake_data( days, hours = duration total_duration = timedelta(days=days, hours=hours) + config = scripts.USGSConfig( + total_duration, radius, coords, magnitude_bounds + ) + source = scripts.USGSSource(config) + backpack_dispatcher = sasquatch.BackpackDispatcher( + source, sasquatch.DispatcherConfig() + ) + results = scripts.search_api( total_duration, radius, @@ -193,14 +201,6 @@ def usgs_earthquake_data( click.echo("Post mode enabled: Sending data...") - config = scripts.USGSConfig( - total_duration, radius, coords, magnitude_bounds - ) - source = scripts.USGSSource(config) - - backpack_dispatcher = sasquatch.BackpackDispatcher( - source, sasquatch.DispatcherConfig() - ) result = backpack_dispatcher.post() if "Error" in result: diff --git a/src/sasquatchbackpack/sasquatch.py b/src/sasquatchbackpack/sasquatch.py index edc5e6d..91fcc6e 100644 --- a/src/sasquatchbackpack/sasquatch.py +++ b/src/sasquatchbackpack/sasquatch.py @@ -2,11 +2,13 @@ __all__ = ["BackpackDispatcher", "DispatcherConfig", "DataSource"] +import asyncio import os from abc import ABC, abstractmethod from dataclasses import dataclass, field from string import Template +import redis.asyncio as redis import requests from sasquatchbackpack.schemas import usgs as schemas @@ -43,6 +45,26 @@ def get_redis_key(self, datapoint: dict) -> str: pass +class RedisManager: + """Manage redis for USGS.""" + + def __init__(self, address: str) -> None: + self.address = address + self.model = redis.from_url(self.address) + + self.loop = asyncio.new_event_loop() + + def store(self, key: str, item: set) -> None: + if self.model is None: + raise RuntimeError("Model is undefined.") + self.loop.run_until_complete(self.model.hset(key, item)) + + def get(self, key: str) -> set: + if self.model is None: + raise RuntimeError("Model is undefined.") + return self.loop.run_until_complete(self.model.get(key)) + + @dataclass class DispatcherConfig: """Class containing relevant configuration information for the @@ -90,7 +112,7 @@ def __init__(self, source: DataSource, config: DispatcherConfig) -> None: "topic_name": self.source.topic_name, } ) - self.redis = schemas.EarthquakeRedisManager(config.redis_address) + self.redis = RedisManager(config.redis_address) def create_topic(self) -> str: """Create kafka topic based off data from provided source. diff --git a/src/sasquatchbackpack/schemas/usgs.py b/src/sasquatchbackpack/schemas/usgs.py index 48c7e85..6b06c31 100644 --- a/src/sasquatchbackpack/schemas/usgs.py +++ b/src/sasquatchbackpack/schemas/usgs.py @@ -1,11 +1,7 @@ """USGS Schemas.""" -import asyncio - -import redis.asyncio as redis from dataclasses_avroschema.pydantic import AvroBaseModel from pydantic import Field -from safir.redis import PydanticRedisStorage class EarthquakeSchema(AvroBaseModel): @@ -25,25 +21,3 @@ class Meta: namespace = "$namespace" schema_name = "$topic_name" - - -class EarthquakeRedisManager: - """Manage redis for USGS.""" - - def __init__(self, address: str) -> None: - self.address = address - connection = redis.from_url(self.address) - self.model = PydanticRedisStorage( - datatype=EarthquakeSchema, redis=connection - ) - self.loop = asyncio.new_event_loop() - - def store(self, key: str, item: EarthquakeSchema) -> None: - if self.model is None: - raise RuntimeError("Model is undefined.") - self.loop.run_until_complete(self.model.store(key, item)) - - def get(self, key: str) -> EarthquakeSchema | None: - if self.model is None: - raise RuntimeError("Model is undefined.") - return self.loop.run_until_complete(self.model.get(key)) diff --git a/src/sasquatchbackpack/scripts/usgs.py b/src/sasquatchbackpack/scripts/usgs.py index 47c168f..8512d67 100644 --- a/src/sasquatchbackpack/scripts/usgs.py +++ b/src/sasquatchbackpack/scripts/usgs.py @@ -155,6 +155,3 @@ def get_redis_key(self, datapoint: dict) -> str: # Redis keys are formatted "topic_name:key_value" # to keep data from different APIs discreet return f"{self.topic_name}:{datapoint["value"]["id"]}" - - # Might need a construct schema method to allow redis to encode data - # if avro is not made for that