From a6860cb584efb3b7277b20bb44a746ca882c4b45 Mon Sep 17 00:00:00 2001 From: Kai Koehler Date: Thu, 21 Nov 2024 17:36:40 -0700 Subject: [PATCH] First half of redis integration --- src/sasquatchbackpack/sasquatch.py | 33 +++++++++++++++++++++++---- src/sasquatchbackpack/schemas/usgs.py | 7 ++---- src/sasquatchbackpack/scripts/usgs.py | 3 +++ 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/sasquatchbackpack/sasquatch.py b/src/sasquatchbackpack/sasquatch.py index 4314b87..4160571 100644 --- a/src/sasquatchbackpack/sasquatch.py +++ b/src/sasquatchbackpack/sasquatch.py @@ -136,6 +136,29 @@ def create_topic(self) -> str: return response.text + def _remove_redis_duplicates(self, records: list[dict]) -> list[dict]: + """Check the redis server for any duplicate data points + present in the provided records, and return a list with them removed. + + Parameters + ---------- + records : list[dict] + Output of a source.get_records() call. + + Returns + ------- + final : list[dict] + List with duplicate elements in common with those + on the redis server removed. + """ + final = [] + + for record in records: + if self.redis.get(self.source.get_redis_key(record)) is None: + final.append(record) # noqa: PERF401 + + return final + def post(self) -> str: """Assemble schema and payload from the given source, then makes a POST request to kafka. @@ -145,10 +168,7 @@ def post(self) -> str: response text : str The results of the POST request in string format """ - records = self.source.get_records() - - # TODO: Check redis for records, and remove matches from list - # All redis entris begin with their + records = self._remove_redis_duplicates(self.source.get_records()) payload = {"value_schema": self.schema, "records": records} @@ -174,6 +194,9 @@ def post(self) -> str: except requests.RequestException as e: return f"Error POSTing data: {e}" - # TODO: Once post is successful add remaining records to redis + for record in records: + self.redis.store( + self.config.get_redis_key(record), + ) return response.text diff --git a/src/sasquatchbackpack/schemas/usgs.py b/src/sasquatchbackpack/schemas/usgs.py index aa1a581..48c7e85 100644 --- a/src/sasquatchbackpack/schemas/usgs.py +++ b/src/sasquatchbackpack/schemas/usgs.py @@ -43,10 +43,7 @@ def store(self, key: str, item: EarthquakeSchema) -> None: raise RuntimeError("Model is undefined.") self.loop.run_until_complete(self.model.store(key, item)) - def get(self, key: str) -> EarthquakeSchema: + def get(self, key: str) -> EarthquakeSchema | None: if self.model is None: raise RuntimeError("Model is undefined.") - target = self.loop.run_until_complete(self.model.get(key)) - if target is None: - raise LookupError(f"Entry with key of {key} could not be found") - return target + return self.loop.run_until_complete(self.model.get(key)) diff --git a/src/sasquatchbackpack/scripts/usgs.py b/src/sasquatchbackpack/scripts/usgs.py index 8512d67..47c168f 100644 --- a/src/sasquatchbackpack/scripts/usgs.py +++ b/src/sasquatchbackpack/scripts/usgs.py @@ -155,3 +155,6 @@ def get_redis_key(self, datapoint: dict) -> str: # Redis keys are formatted "topic_name:key_value" # to keep data from different APIs discreet return f"{self.topic_name}:{datapoint["value"]["id"]}" + + # Might need a construct schema method to allow redis to encode data + # if avro is not made for that