diff --git a/src/sasquatchbackpack/cli.py b/src/sasquatchbackpack/cli.py index c9b991b..bc4da92 100644 --- a/src/sasquatchbackpack/cli.py +++ b/src/sasquatchbackpack/cli.py @@ -12,4 +12,4 @@ def main() -> None: main.add_command(usgs.usgs_earthquake_data) -main.add_command(usgs.test_redis) +main.add_command(usgs.test_usgs_redis) diff --git a/src/sasquatchbackpack/commands/usgs.py b/src/sasquatchbackpack/commands/usgs.py index 5b5c516..4f29331 100644 --- a/src/sasquatchbackpack/commands/usgs.py +++ b/src/sasquatchbackpack/commands/usgs.py @@ -5,7 +5,6 @@ import click from sasquatchbackpack import sasquatch -from sasquatchbackpack.schemas import usgs as schemas from sasquatchbackpack.scripts import usgs as scripts DEFAULT_RADIUS = 400 @@ -201,18 +200,38 @@ def usgs_earthquake_data( click.echo("Post mode enabled: Sending data...") - result = backpack_dispatcher.post() + result, records = backpack_dispatcher.post() if "Error" in result: click.secho(result, fg="red") + elif "Warning" in result: + click.secho(result, fg="yellow") else: click.secho("Data successfully sent!", fg="green") + click.echo("The following items were added to sasquatch:") + + click.echo("------") + for record in records: + value = record["value"] + click.echo( + f"{value['id']} " + f"({value['latitude']}, {value['longitude']}) " + f"{value['depth']} km " + f"M{value['magnitude']}" + ) + click.echo("------") + + click.echo( + "All entries missing from this list " + "have been identified as already present in sasquatch." + ) +# Should be a test @click.command() -def test_redis() -> None: +def test_usgs_redis() -> None: """Test redis implementation.""" - erm = schemas.EarthquakeRedisManager(address="redis://localhost:6379/0") + erm = sasquatch.RedisManager(address="redis://localhost:6379/0") config = scripts.USGSConfig( timedelta(days=10), DEFAULT_RADIUS, @@ -227,15 +246,7 @@ def test_redis() -> None: # Using earthquake id as redis key record = records[0] erm.store( - record["value"]["id"], - schemas.EarthquakeSchema( - timestamp=record["value"]["timestamp"], - id=record["value"]["id"], - latitude=record["value"]["latitude"], - longitude=record["value"]["longitude"], - depth=record["value"]["depth"], - magnitude=record["value"]["depth"], - ), + source.get_redis_key(record), ) - - erm.get(record["value"]["id"]) + key = source.get_redis_key(record) + click.echo(f"Key '{key}' returns: {erm.get(key)}") diff --git a/src/sasquatchbackpack/sasquatch.py b/src/sasquatchbackpack/sasquatch.py index 91fcc6e..8406c2f 100644 --- a/src/sasquatchbackpack/sasquatch.py +++ b/src/sasquatchbackpack/sasquatch.py @@ -11,8 +11,6 @@ import redis.asyncio as redis import requests -from sasquatchbackpack.schemas import usgs as schemas - # Code yoinked from https://github.com/lsst-sqre/ # sasquatch/blob/main/examples/RestProxyAPIExample.ipynb @@ -54,12 +52,12 @@ def __init__(self, address: str) -> None: self.loop = asyncio.new_event_loop() - def store(self, key: str, item: set) -> None: + def store(self, key: str, item: str = "value") -> None: if self.model is None: raise RuntimeError("Model is undefined.") - self.loop.run_until_complete(self.model.hset(key, item)) + self.loop.run_until_complete(self.model.set(key, item)) - def get(self, key: str) -> set: + def get(self, key: str) -> str: if self.model is None: raise RuntimeError("Model is undefined.") return self.loop.run_until_complete(self.model.get(key)) @@ -181,17 +179,25 @@ def _remove_redis_duplicates(self, records: list[dict]) -> list[dict]: return final - def post(self) -> str: + def post(self) -> tuple[str, list]: """Assemble schema and payload from the given source, then makes a POST request to kafka. Returns ------- - response text : str + response-text : str The results of the POST request in string format + records : list + List of earthquakes with those already stored on remote removed """ records = self._remove_redis_duplicates(self.source.get_records()) + if len(records) == 0: + return ( + "Warning: All entries already present, aborting POST request", + records, + ) + payload = {"value_schema": self.schema, "records": records} url = ( @@ -213,20 +219,11 @@ def post(self) -> str: timeout=10, ) response.raise_for_status() # Raises HTTPError for bad responses + except requests.RequestException as e: - return f"Error POSTing data: {e}" + return f"Error POSTing data: {e}", records for record in records: - self.redis.store( - self.source.get_redis_key(record), - schemas.EarthquakeSchema( - timestamp=0, - id="", - latitude=0, - longitude=0, - depth=0, - magnitude=0, - ), - ) + self.redis.store(self.source.get_redis_key(record)) - return response.text + return response.text, records