Skip to content

Commit

Permalink
First half of redis integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Fireye04 committed Nov 22, 2024
1 parent 21e53b6 commit a6860cb
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
33 changes: 28 additions & 5 deletions src/sasquatchbackpack/sasquatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ def create_topic(self) -> str:

return response.text

def _remove_redis_duplicates(self, records: list[dict]) -> list[dict]:
"""Check the redis server for any duplicate data points
present in the provided records, and return a list with them removed.
Parameters
----------
records : list[dict]
Output of a source.get_records() call.
Returns
-------
final : list[dict]
List with duplicate elements in common with those
on the redis server removed.
"""
final = []

for record in records:
if self.redis.get(self.source.get_redis_key(record)) is None:
final.append(record) # noqa: PERF401

return final

def post(self) -> str:
"""Assemble schema and payload from the given source, then
makes a POST request to kafka.
Expand All @@ -145,10 +168,7 @@ def post(self) -> str:
response text : str
The results of the POST request in string format
"""
records = self.source.get_records()

# TODO: Check redis for records, and remove matches from list
# All redis entris begin with their
records = self._remove_redis_duplicates(self.source.get_records())

payload = {"value_schema": self.schema, "records": records}

Expand All @@ -174,6 +194,9 @@ def post(self) -> str:
except requests.RequestException as e:
return f"Error POSTing data: {e}"

# TODO: Once post is successful add remaining records to redis
for record in records:
self.redis.store(
self.config.get_redis_key(record),
)

return response.text
7 changes: 2 additions & 5 deletions src/sasquatchbackpack/schemas/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ def store(self, key: str, item: EarthquakeSchema) -> None:
raise RuntimeError("Model is undefined.")
self.loop.run_until_complete(self.model.store(key, item))

def get(self, key: str) -> EarthquakeSchema:
def get(self, key: str) -> EarthquakeSchema | None:
if self.model is None:
raise RuntimeError("Model is undefined.")
target = self.loop.run_until_complete(self.model.get(key))
if target is None:
raise LookupError(f"Entry with key of {key} could not be found")
return target
return self.loop.run_until_complete(self.model.get(key))
3 changes: 3 additions & 0 deletions src/sasquatchbackpack/scripts/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,6 @@ def get_redis_key(self, datapoint: dict) -> str:
# Redis keys are formatted "topic_name:key_value"
# to keep data from different APIs discreet
return f"{self.topic_name}:{datapoint["value"]["id"]}"

# Might need a construct schema method to allow redis to encode data
# if avro is not made for that

0 comments on commit a6860cb

Please sign in to comment.