diff --git a/functions/AwsRssChecker/__init__.py b/functions/AwsRssChecker/__init__.py index 72d3c71..9974cd3 100644 --- a/functions/AwsRssChecker/__init__.py +++ b/functions/AwsRssChecker/__init__.py @@ -16,26 +16,29 @@ QUEUE_NAME = os.environ["QUEUE_NAME"] ENVIRONMENT = os.environ["ENVIRONMENT"] +FEED_CHECKPOINT = "aws-latest-feed" +ARTICLE_CHECKPOINT = "aws-latest-article" -def get_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str) -> Union[int, None]: + +def get_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str) -> Union[float, None]: try: table_service = tables.TableServiceClient.from_connection_string(connection_string) table_client = table_service.get_table_client(table_name) checkpoint = table_client.get_entity(partition_key=partition_key, row_key=row_key) - return int(checkpoint["ts"]) + return float(checkpoint["ts"]) except Exception as e: logging.warning("Exception getting checkpoint: {}".format(e)) return None -def set_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str) -> None: +def set_checkpoint(connection_string: str, table_name: str, partition_key: str, row_key: str, ts: float) -> None: try: table_service = tables.TableServiceClient.from_connection_string(connection_string) table_client = table_service.get_table_client(table_name) checkpoint_out = { "PartitionKey": partition_key, "RowKey": row_key, - "ts": str(int(time.time())) + "ts": str(ts) } table_client.upsert_entity(checkpoint_out) except Exception as e: @@ -64,22 +67,34 @@ def process_entry(entry: feedparser.util.FeedParserDict, last_run: time.struct_t def main(timer: func.TimerRequest) -> None: - checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, "aws", ENVIRONMENT) - if checkpoint is not None: - logging.info("Using {} as checkpoint".format(checkpoint)) - feed = get_rss(FEED_URL, time.gmtime(checkpoint)) + feed_checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, FEED_CHECKPOINT, ENVIRONMENT) + article_checkpoint = get_checkpoint(CONNECTION_STRING, TABLE_NAME, ARTICLE_CHECKPOINT, ENVIRONMENT) + if feed_checkpoint is not None: + logging.info("Using {} as FEED checkpoint".format(feed_checkpoint)) + feed = get_rss(FEED_URL, time.gmtime(feed_checkpoint)) else: - logging.info("No checkpoint - using current time minus 30m") + logging.info("No FEED checkpoint - using current time minus 30m") feed = get_rss(FEED_URL, time.gmtime(time.time() - (30 * 60))) if feed is not None: + if article_checkpoint is not None: + logging.info("Using {} as ARTICLE checkpoint".format(feed_checkpoint)) + else: + logging.info("No ARTICLE checkpoint - using current time minus 30m") + queue_client = queue.QueueClient.from_connection_string(CONNECTION_STRING, QUEUE_NAME, message_encode_policy=queue.BinaryBase64EncodePolicy(), message_decode_policy=queue.BinaryBase64DecodePolicy()) + latest_article = time.localtime(0) for entry in feed.entries: - if checkpoint is not None: - process_entry(entry, time.gmtime(checkpoint), queue_client) + if article_checkpoint is not None: + process_entry(entry, time.gmtime(article_checkpoint), queue_client) else: process_entry(entry, time.gmtime(time.time() - (30 * 60)), queue_client) + if entry.published_parsed > latest_article: + latest_article = entry.published_parsed - set_checkpoint(CONNECTION_STRING, TABLE_NAME, "aws", ENVIRONMENT) + set_checkpoint(CONNECTION_STRING, TABLE_NAME, ARTICLE_CHECKPOINT, ENVIRONMENT, + time.mktime(feed.feed.published_parsed)) + set_checkpoint(CONNECTION_STRING, TABLE_NAME, FEED_CHECKPOINT, ENVIRONMENT, + time.mktime(feed.feed.published_parsed)) diff --git a/pulumi/__main__.py b/pulumi/__main__.py index 320e0b2..4c06f63 100644 --- a/pulumi/__main__.py +++ b/pulumi/__main__.py @@ -12,6 +12,7 @@ account = azure.storage.StorageAccount("table-sa", resource_group_name=resource_group.name, + allow_blob_public_access=False, sku=azure.storage.SkuArgs( name=azure.storage.SkuName.STANDARD_ZRS, ),