diff --git a/nlds/rabbit/consumer.py b/nlds/rabbit/consumer.py index dcf5f37..fb3b01d 100644 --- a/nlds/rabbit/consumer.py +++ b/nlds/rabbit/consumer.py @@ -282,6 +282,15 @@ def send_pathlist( self.publish_message(monitoring_rk, body_json) self.sent_message_count += 1 + def send_complete(self, + routing_key: str, + body_json: Dict[str, Any], + ): + body_json[MSG.DETAILS][MSG.STATE] = State.COMPLETE + monitoring_rk = ".".join([routing_key.split(".")[0], RK.MONITOR_PUT, RK.START]) + self.publish_message(monitoring_rk, body_json) + self.sent_message_count += 1 + def setup_logging( self, enable=False, diff --git a/nlds_processors/nlds_worker.py b/nlds_processors/nlds_worker.py index e2ac530..ed4d84f 100644 --- a/nlds_processors/nlds_worker.py +++ b/nlds_processors/nlds_worker.py @@ -139,7 +139,7 @@ def _process_rk_catalog_put_complete(self, body_json: Dict) -> None: self.publish_and_log_message(new_routing_key, body_json) def _process_rk_transfer_put_complete(self, body_json: Dict) -> None: - # After a successfull TRANSFER_PUT, the catalog is updated with the locations + # After a successful TRANSFER_PUT, the catalog is updated with the locations # of the files on the OBJECT STORAGE self.log( f"Transfer successful, sending filelist with object storage locations to " @@ -154,6 +154,13 @@ def _process_rk_transfer_put_complete(self, body_json: Dict) -> None: ) self.publish_and_log_message(new_routing_key, body_json) + def _process_rk_transfer_get_complete(self, body_json: Dict) -> None: + # After a successful TRANSFER_GET, the sub records in the Monitor need to be + # notified that they have complete + + new_routing_key = ".".join([RK.ROOT, RK.TRANSFER_GET, RK.COMPLETE]) + self.send_complete(new_routing_key, body_json) + def _process_rk_transfer_put_failed(self, body_json: Dict) -> None: self.log( f"Transfer unsuccessful, sending failed files back to catalog " @@ -203,13 +210,20 @@ def _process_rk_archive_get_complete(self, rk_parts: List, body_json: Dict) -> N def _process_rk_catalog_update_complete( self, rk_parts: List, body_json: Dict ) -> None: - # After a successful CATALOG_UPDATE, as part of the GET workflow, the files - # are got from the Object Store using TRANSFER_GET - # this is the same as the when catalog get is complete try: api_method = body_json[MSG.DETAILS][MSG.API_ACTION] + # For the GET and GETLIST method, this code path occurs after the files have + # been fetched from Tape. They are now passed to TRANSFER_GET, which is + # the same code path as after a CATALOG_GET, i.e. the files are now fetched + # from the Object Store if api_method == RK.GET or api_method == RK.GETLIST: self._process_rk_catalog_get_complete(rk_parts, body_json) + # For the PUT and PUTLIST method, this is the final state - i.e. the catalog + # is updated to contain the new Object Store path + elif api_method == RK.PUT or api_method == RK.PUTLIST: + new_routing_key = ".".join([RK.ROOT, RK.CATALOG_UPDATE, RK.COMPLETE]) + self.send_complete(new_routing_key, body_json) + except KeyError: self.log( f"Message did not contain an appropriate api_action.", @@ -302,16 +316,9 @@ def _process_rk_archive_put_complete(self, rk_parts: List, body_json: Dict) -> N def _process_rk_catalog_archive_update_complete( self, rk_parts: List, body_json: Dict ) -> None: - self.log( - "Checksum successfully updated for aggregation, archive-put " - "is now complete.", - RK.LOG_INFO, - ) - # forward confirmation to monitor - self.log(f"Sending message to {RK.MONITOR} queue", RK.LOG_INFO) new_routing_key = ".".join([RK.ROOT, RK.MONITOR_PUT, RK.START]) - self.publish_and_log_message(new_routing_key, body_json) + self.send_complete(new_routing_key, body_json) def _process_rk_archive_put_failed(self, body_json: Dict) -> None: self.log( @@ -373,10 +380,16 @@ def callback( elif rk_parts[1] == f"{RK.TRANSFER_PUT}": self._process_rk_transfer_put_complete(body_json) + # If transfer_get completed then finish get workflow + elif rk_parts[1] == f"{RK.TRANSFER_GET}": + self._process_rk_transfer_get_complete(body_json) + # if catalog_get completed then we need to decide whether it was # part of a regular get or an archive_put workflow elif rk_parts[1] == f"{RK.CATALOG_GET}": self._process_rk_catalog_get_complete(rk_parts, body_json) + elif rk_parts[1] == f"{RK.CATALOG_PUT}": + self._process_rk_catalog_put_complete(rk_parts, body_json) # If finished with archive retrieval then pass for catalog-update elif rk_parts[1] == f"{RK.ARCHIVE_GET}": @@ -395,6 +408,11 @@ def callback( elif rk_parts[1] == f"{RK.CATALOG_UPDATE}": self._process_rk_catalog_update_complete(rk_parts, body_json) + # if finished with catalog archive update then mark ARCHIVE_PUT flow as + # complete + elif rk_parts[1] == f"{RK.CATALOG_ARCHIVE_UPDATE}": + self._process_rk_catalog_archive_update_complete(rk_parts, body_json) + # If a archive-restore has happened from the catalog then we need to get from # archive before we can do the transfer from object store. elif rk_parts[2] == f"{RK.ARCHIVE_RESTORE}":