From 8f1b9db9f9edd7ed50d5a94ede2472fa2ce1cd02 Mon Sep 17 00:00:00 2001 From: Kai Koehler Date: Tue, 19 Nov 2024 17:14:03 -0700 Subject: [PATCH] Add functional redis push and pull! --- src/sasquatchbackpack/commands/usgs.py | 25 +++++++++++-------------- src/sasquatchbackpack/schemas/usgs.py | 12 +++++++----- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/sasquatchbackpack/commands/usgs.py b/src/sasquatchbackpack/commands/usgs.py index 28bab70..8a500ae 100644 --- a/src/sasquatchbackpack/commands/usgs.py +++ b/src/sasquatchbackpack/commands/usgs.py @@ -1,6 +1,5 @@ """USGS CLI.""" -import asyncio from datetime import timedelta import click @@ -214,19 +213,17 @@ def test_redis() -> None: # for record in records: # Using earthquake id as redis key - record = records[0] - asyncio.run( - erm.store( - record["value"]["id"], - schemas.EarthquakeSchema( - timestamp=record["value"]["timestamp"], - id=record["value"]["id"], - latitude=record["value"]["latitude"], - longitude=record["value"]["longitude"], - depth=record["value"]["depth"], - magnitude=record["value"]["depth"], - ), + erm.store( + record["value"]["id"], + schemas.EarthquakeSchema( + timestamp=record["value"]["timestamp"], + id=record["value"]["id"], + latitude=record["value"]["latitude"], + longitude=record["value"]["longitude"], + depth=record["value"]["depth"], + magnitude=record["value"]["depth"], ), - debug=True, ) + + erm.get(record["value"]["id"]) diff --git a/src/sasquatchbackpack/schemas/usgs.py b/src/sasquatchbackpack/schemas/usgs.py index d9261cb..7f85bf8 100644 --- a/src/sasquatchbackpack/schemas/usgs.py +++ b/src/sasquatchbackpack/schemas/usgs.py @@ -1,5 +1,7 @@ """USGS Schemas.""" +import asyncio + import redis.asyncio as redis from dataclasses_avroschema.pydantic import AvroBaseModel from pydantic import Field @@ -32,14 +34,14 @@ def __init__(self, address: str) -> None: self.model = PydanticRedisStorage( datatype=EarthquakeSchema, redis=connection ) + self.loop = asyncio.new_event_loop() - async def store(self, key: str, item: EarthquakeSchema) -> bool: + def store(self, key: str, item: EarthquakeSchema) -> None: if self.model is None: raise RuntimeError("Model is undefined.") - await self.model.store(key, item) - return True + self.loop.run_until_complete(self.model.store(key, item)) - async def get(self, key: str) -> EarthquakeSchema: + def get(self, key: str) -> EarthquakeSchema: if self.model is None: raise RuntimeError("Model is undefined.") - return await self.model.get(key) + return self.loop.run_until_complete(self.model.get(key))