Skip to content

Commit

Permalink
Broken redis refactor
Browse files Browse the repository at this point in the history
Trying to account for additional API sources, as initial implementation
was only for usgs
  • Loading branch information
Fireye04 committed Nov 27, 2024
1 parent 9aca9ac commit 681056d
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 38 deletions.
16 changes: 8 additions & 8 deletions src/sasquatchbackpack/commands/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ def usgs_earthquake_data(
days, hours = duration
total_duration = timedelta(days=days, hours=hours)

config = scripts.USGSConfig(
total_duration, radius, coords, magnitude_bounds
)
source = scripts.USGSSource(config)
backpack_dispatcher = sasquatch.BackpackDispatcher(
source, sasquatch.DispatcherConfig()
)

results = scripts.search_api(
total_duration,
radius,
Expand Down Expand Up @@ -193,14 +201,6 @@ def usgs_earthquake_data(

click.echo("Post mode enabled: Sending data...")

config = scripts.USGSConfig(
total_duration, radius, coords, magnitude_bounds
)
source = scripts.USGSSource(config)

backpack_dispatcher = sasquatch.BackpackDispatcher(
source, sasquatch.DispatcherConfig()
)
result = backpack_dispatcher.post()

if "Error" in result:
Expand Down
24 changes: 23 additions & 1 deletion src/sasquatchbackpack/sasquatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

__all__ = ["BackpackDispatcher", "DispatcherConfig", "DataSource"]

import asyncio
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from string import Template

import redis.asyncio as redis
import requests

from sasquatchbackpack.schemas import usgs as schemas
Expand Down Expand Up @@ -43,6 +45,26 @@ def get_redis_key(self, datapoint: dict) -> str:
pass


class RedisManager:
"""Manage redis for USGS."""

def __init__(self, address: str) -> None:
self.address = address
self.model = redis.from_url(self.address)

self.loop = asyncio.new_event_loop()

def store(self, key: str, item: set) -> None:
if self.model is None:
raise RuntimeError("Model is undefined.")
self.loop.run_until_complete(self.model.hset(key, item))

def get(self, key: str) -> set:
if self.model is None:
raise RuntimeError("Model is undefined.")
return self.loop.run_until_complete(self.model.get(key))


@dataclass
class DispatcherConfig:
"""Class containing relevant configuration information for the
Expand Down Expand Up @@ -90,7 +112,7 @@ def __init__(self, source: DataSource, config: DispatcherConfig) -> None:
"topic_name": self.source.topic_name,
}
)
self.redis = schemas.EarthquakeRedisManager(config.redis_address)
self.redis = RedisManager(config.redis_address)

def create_topic(self) -> str:
"""Create kafka topic based off data from provided source.
Expand Down
26 changes: 0 additions & 26 deletions src/sasquatchbackpack/schemas/usgs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
"""USGS Schemas."""

import asyncio

import redis.asyncio as redis
from dataclasses_avroschema.pydantic import AvroBaseModel
from pydantic import Field
from safir.redis import PydanticRedisStorage


class EarthquakeSchema(AvroBaseModel):
Expand All @@ -25,25 +21,3 @@ class Meta:

namespace = "$namespace"
schema_name = "$topic_name"


class EarthquakeRedisManager:
"""Manage redis for USGS."""

def __init__(self, address: str) -> None:
self.address = address
connection = redis.from_url(self.address)
self.model = PydanticRedisStorage(
datatype=EarthquakeSchema, redis=connection
)
self.loop = asyncio.new_event_loop()

def store(self, key: str, item: EarthquakeSchema) -> None:
if self.model is None:
raise RuntimeError("Model is undefined.")
self.loop.run_until_complete(self.model.store(key, item))

def get(self, key: str) -> EarthquakeSchema | None:
if self.model is None:
raise RuntimeError("Model is undefined.")
return self.loop.run_until_complete(self.model.get(key))
3 changes: 0 additions & 3 deletions src/sasquatchbackpack/scripts/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,3 @@ def get_redis_key(self, datapoint: dict) -> str:
# Redis keys are formatted "topic_name:key_value"
# to keep data from different APIs discreet
return f"{self.topic_name}:{datapoint["value"]["id"]}"

# Might need a construct schema method to allow redis to encode data
# if avro is not made for that

0 comments on commit 681056d

Please sign in to comment.