Skip to content

Commit

Permalink
Complete redis refactor and CLI feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Fireye04 committed Dec 4, 2024
1 parent 681056d commit 7711b02
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/sasquatchbackpack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ def main() -> None:


main.add_command(usgs.usgs_earthquake_data)
main.add_command(usgs.test_redis)
main.add_command(usgs.test_usgs_redis)
41 changes: 26 additions & 15 deletions src/sasquatchbackpack/commands/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import click

from sasquatchbackpack import sasquatch
from sasquatchbackpack.schemas import usgs as schemas
from sasquatchbackpack.scripts import usgs as scripts

DEFAULT_RADIUS = 400
Expand Down Expand Up @@ -201,18 +200,38 @@ def usgs_earthquake_data(

click.echo("Post mode enabled: Sending data...")

result = backpack_dispatcher.post()
result, records = backpack_dispatcher.post()

if "Error" in result:
click.secho(result, fg="red")
elif "Warning" in result:
click.secho(result, fg="yellow")
else:
click.secho("Data successfully sent!", fg="green")
click.echo("The following items were added to Kafka:")

click.echo("------")
for record in records:
value = record["value"]
click.echo(
f"{value['id']} "
f"({value['latitude']}, {value['longitude']}) "
f"{value['depth']} km "
f"M{value['magnitude']}"
)
click.echo("------")

click.echo(
"All entries missing from this list "
"have been identified as already present in Kafka."
)


# Should be a test
@click.command()
def test_redis() -> None:
def test_usgs_redis() -> None:
"""Test redis implementation."""
erm = schemas.EarthquakeRedisManager(address="redis://localhost:6379/0")
erm = sasquatch.RedisManager(address="redis://localhost:6379/0")
config = scripts.USGSConfig(
timedelta(days=10),
DEFAULT_RADIUS,
Expand All @@ -227,15 +246,7 @@ def test_redis() -> None:
# Using earthquake id as redis key
record = records[0]
erm.store(
record["value"]["id"],
schemas.EarthquakeSchema(
timestamp=record["value"]["timestamp"],
id=record["value"]["id"],
latitude=record["value"]["latitude"],
longitude=record["value"]["longitude"],
depth=record["value"]["depth"],
magnitude=record["value"]["depth"],
),
source.get_redis_key(record),
)

erm.get(record["value"]["id"])
key = source.get_redis_key(record)
click.echo(f"Key '{key}' returns: {erm.get(key)}")
37 changes: 17 additions & 20 deletions src/sasquatchbackpack/sasquatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import redis.asyncio as redis
import requests

from sasquatchbackpack.schemas import usgs as schemas

# Code yoinked from https://github.com/lsst-sqre/
# sasquatch/blob/main/examples/RestProxyAPIExample.ipynb

Expand Down Expand Up @@ -54,12 +52,12 @@ def __init__(self, address: str) -> None:

self.loop = asyncio.new_event_loop()

def store(self, key: str, item: set) -> None:
def store(self, key: str, item: str = "value") -> None:
if self.model is None:
raise RuntimeError("Model is undefined.")
self.loop.run_until_complete(self.model.hset(key, item))
self.loop.run_until_complete(self.model.set(key, item))

def get(self, key: str) -> set:
def get(self, key: str) -> str:
if self.model is None:
raise RuntimeError("Model is undefined.")
return self.loop.run_until_complete(self.model.get(key))
Expand Down Expand Up @@ -181,17 +179,25 @@ def _remove_redis_duplicates(self, records: list[dict]) -> list[dict]:

return final

def post(self) -> str:
def post(self) -> tuple[str, list]:
"""Assemble schema and payload from the given source, then
makes a POST request to kafka.
Returns
-------
response text : str
response-text : str
The results of the POST request in string format
records : list
List of earthquakes with those already stored on remote removed
"""
records = self._remove_redis_duplicates(self.source.get_records())

if len(records) == 0:
return (
"Warning: All entries already present, aborting POST request",
records,
)

payload = {"value_schema": self.schema, "records": records}

url = (
Expand All @@ -213,20 +219,11 @@ def post(self) -> str:
timeout=10,
)
response.raise_for_status() # Raises HTTPError for bad responses

except requests.RequestException as e:
return f"Error POSTing data: {e}"
return f"Error POSTing data: {e}", records

for record in records:
self.redis.store(
self.source.get_redis_key(record),
schemas.EarthquakeSchema(
timestamp=0,
id="",
latitude=0,
longitude=0,
depth=0,
magnitude=0,
),
)
self.redis.store(self.source.get_redis_key(record))

return response.text
return response.text, records

0 comments on commit 7711b02

Please sign in to comment.