Skip to content

Commit

Permalink
Change monitor workflow so that the nlds_worker now explictly sends a…
Browse files Browse the repository at this point in the history
… COMPLETE message to the monitor when a workflow is complete
  • Loading branch information
nmassey001 committed Nov 18, 2024
1 parent d11ce65 commit 9760cd4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
9 changes: 9 additions & 0 deletions nlds/rabbit/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,15 @@ def send_pathlist(
self.publish_message(monitoring_rk, body_json)
self.sent_message_count += 1

def send_complete(self,
routing_key: str,
body_json: Dict[str, Any],
):
body_json[MSG.DETAILS][MSG.STATE] = State.COMPLETE
monitoring_rk = ".".join([routing_key.split(".")[0], RK.MONITOR_PUT, RK.START])
self.publish_message(monitoring_rk, body_json)
self.sent_message_count += 1

def setup_logging(
self,
enable=False,
Expand Down
42 changes: 30 additions & 12 deletions nlds_processors/nlds_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _process_rk_catalog_put_complete(self, body_json: Dict) -> None:
self.publish_and_log_message(new_routing_key, body_json)

def _process_rk_transfer_put_complete(self, body_json: Dict) -> None:
# After a successfull TRANSFER_PUT, the catalog is updated with the locations
# After a successful TRANSFER_PUT, the catalog is updated with the locations
# of the files on the OBJECT STORAGE
self.log(
f"Transfer successful, sending filelist with object storage locations to "
Expand All @@ -154,6 +154,13 @@ def _process_rk_transfer_put_complete(self, body_json: Dict) -> None:
)
self.publish_and_log_message(new_routing_key, body_json)

def _process_rk_transfer_get_complete(self, body_json: Dict) -> None:
# After a successful TRANSFER_GET, the sub records in the Monitor need to be
# notified that they have complete

new_routing_key = ".".join([RK.ROOT, RK.TRANSFER_GET, RK.COMPLETE])
self.send_complete(new_routing_key, body_json)

def _process_rk_transfer_put_failed(self, body_json: Dict) -> None:
self.log(
f"Transfer unsuccessful, sending failed files back to catalog "
Expand Down Expand Up @@ -203,13 +210,20 @@ def _process_rk_archive_get_complete(self, rk_parts: List, body_json: Dict) -> N
def _process_rk_catalog_update_complete(
self, rk_parts: List, body_json: Dict
) -> None:
# After a successful CATALOG_UPDATE, as part of the GET workflow, the files
# are got from the Object Store using TRANSFER_GET
# this is the same as the when catalog get is complete
try:
api_method = body_json[MSG.DETAILS][MSG.API_ACTION]
# For the GET and GETLIST method, this code path occurs after the files have
# been fetched from Tape. They are now passed to TRANSFER_GET, which is
# the same code path as after a CATALOG_GET, i.e. the files are now fetched
# from the Object Store
if api_method == RK.GET or api_method == RK.GETLIST:
self._process_rk_catalog_get_complete(rk_parts, body_json)
# For the PUT and PUTLIST method, this is the final state - i.e. the catalog
# is updated to contain the new Object Store path
elif api_method == RK.PUT or api_method == RK.PUTLIST:
new_routing_key = ".".join([RK.ROOT, RK.CATALOG_UPDATE, RK.COMPLETE])
self.send_complete(new_routing_key, body_json)

except KeyError:
self.log(
f"Message did not contain an appropriate api_action.",
Expand Down Expand Up @@ -302,16 +316,9 @@ def _process_rk_archive_put_complete(self, rk_parts: List, body_json: Dict) -> N
def _process_rk_catalog_archive_update_complete(
self, rk_parts: List, body_json: Dict
) -> None:
self.log(
"Checksum successfully updated for aggregation, archive-put "
"is now complete.",
RK.LOG_INFO,
)

# forward confirmation to monitor
self.log(f"Sending message to {RK.MONITOR} queue", RK.LOG_INFO)
new_routing_key = ".".join([RK.ROOT, RK.MONITOR_PUT, RK.START])
self.publish_and_log_message(new_routing_key, body_json)
self.send_complete(new_routing_key, body_json)

def _process_rk_archive_put_failed(self, body_json: Dict) -> None:
self.log(
Expand Down Expand Up @@ -373,10 +380,16 @@ def callback(
elif rk_parts[1] == f"{RK.TRANSFER_PUT}":
self._process_rk_transfer_put_complete(body_json)

# If transfer_get completed then finish get workflow
elif rk_parts[1] == f"{RK.TRANSFER_GET}":
self._process_rk_transfer_get_complete(body_json)

# if catalog_get completed then we need to decide whether it was
# part of a regular get or an archive_put workflow
elif rk_parts[1] == f"{RK.CATALOG_GET}":
self._process_rk_catalog_get_complete(rk_parts, body_json)
elif rk_parts[1] == f"{RK.CATALOG_PUT}":
self._process_rk_catalog_put_complete(rk_parts, body_json)

# If finished with archive retrieval then pass for catalog-update
elif rk_parts[1] == f"{RK.ARCHIVE_GET}":
Expand All @@ -395,6 +408,11 @@ def callback(
elif rk_parts[1] == f"{RK.CATALOG_UPDATE}":
self._process_rk_catalog_update_complete(rk_parts, body_json)

# if finished with catalog archive update then mark ARCHIVE_PUT flow as
# complete
elif rk_parts[1] == f"{RK.CATALOG_ARCHIVE_UPDATE}":
self._process_rk_catalog_archive_update_complete(rk_parts, body_json)

# If a archive-restore has happened from the catalog then we need to get from
# archive before we can do the transfer from object store.
elif rk_parts[2] == f"{RK.ARCHIVE_RESTORE}":
Expand Down

0 comments on commit 9760cd4

Please sign in to comment.