Skip to content

Commit

Permalink
Improves checkpoint logic (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Phuurl authored Apr 20, 2022
1 parent b61fe8d commit 9a6c161
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
39 changes: 27 additions & 12 deletions functions/AwsRssChecker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,29 @@
QUEUE_NAME = os.environ["QUEUE_NAME"]
ENVIRONMENT = os.environ["ENVIRONMENT"]

FEED_CHECKPOINT = "aws-latest-feed"
ARTICLE_CHECKPOINT = "aws-latest-article"

def get_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str) -> Union[int, None]:

def get_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str) -> Union[float, None]:
try:
table_service = tables.TableServiceClient.from_connection_string(connection_string)
table_client = table_service.get_table_client(table_name)
checkpoint = table_client.get_entity(partition_key=partition_key, row_key=row_key)
return int(checkpoint["ts"])
return float(checkpoint["ts"])
except Exception as e:
logging.warning("Exception getting checkpoint: {}".format(e))
return None


def set_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str) -> None:
def set_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str, ts: float) -> None:
try:
table_service = tables.TableServiceClient.from_connection_string(connection_string)
table_client = table_service.get_table_client(table_name)
checkpoint_out = {
"PartitionKey": partition_key,
"RowKey": row_key,
"ts": str(int(time.time()))
"ts": str(ts)
}
table_client.upsert_entity(checkpoint_out)
except Exception as e:
Expand Down Expand Up @@ -64,22 +67,34 @@ def process_entry(entry: feedparser.util.FeedParserDict, last_run: time.struct_t


def main(timer: func.TimerRequest) -> None:
checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, "aws", ENVIRONMENT)
if checkpoint is not None:
logging.info("Using {} as checkpoint".format(checkpoint))
feed = get_rss(FEED_URL, time.gmtime(checkpoint))
feed_checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, FEED_CHECKPOINT, ENVIRONMENT)
article_checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, ARTICLE_CHECKPOINT, ENVIRONMENT)
if feed_checkpoint is not None:
logging.info("Using {} as FEED checkpoint".format(feed_checkpoint))
feed = get_rss(FEED_URL, time.gmtime(feed_checkpoint))
else:
logging.info("No checkpoint - using current time minus 30m")
logging.info("No FEED checkpoint - using current time minus 30m")
feed = get_rss(FEED_URL, time.gmtime(time.time() - (30 * 60)))

if feed is not None:
if article_checkpoint is not None:
logging.info("Using {} as ARTICLE checkpoint".format(feed_checkpoint))
else:
logging.info("No ARTICLE checkpoint - using current time minus 30m")

queue_client = queue.QueueClient.from_connection_string(CONNECTION_STRING, QUEUE_NAME,
message_encode_policy=queue.BinaryBase64EncodePolicy(),
message_decode_policy=queue.BinaryBase64DecodePolicy())
latest_article = time.localtime(0)
for entry in feed.entries:
if checkpoint is not None:
process_entry(entry, time.gmtime(checkpoint), queue_client)
if article_checkpoint is not None:
process_entry(entry, time.gmtime(article_checkpoint), queue_client)
else:
process_entry(entry, time.gmtime(time.time() - (30 * 60)), queue_client)
if entry.published_parsed > latest_article:
latest_article = entry.published_parsed

set_checkpoint(CONNECTION_STRING, TABLE_NAME, "aws", ENVIRONMENT)
set_checkpoint(CONNECTION_STRING, TABLE_NAME, ARTICLE_CHECKPOINT, ENVIRONMENT,
time.mktime(feed.feed.published_parsed))
set_checkpoint(CONNECTION_STRING, TABLE_NAME, FEED_CHECKPOINT, ENVIRONMENT,
time.mktime(feed.feed.published_parsed))
1 change: 1 addition & 0 deletions pulumi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

account = azure.storage.StorageAccount("table-sa",
resource_group_name=resource_group.name,
allow_blob_public_access=False,
sku=azure.storage.SkuArgs(
name=azure.storage.SkuName.STANDARD_ZRS,
),
Expand Down

0 comments on commit 9a6c161

Please sign in to comment.