Skip to content

Commit

Permalink
Merge pull request #604 from dynamic-entropy/loadtest-ask-approval
Browse files Browse the repository at this point in the history
Add ask approval for tape rules in loadtest
  • Loading branch information
dynamic-entropy authored Sep 20, 2023
2 parents 229a852 + 8261550 commit e54d834
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions docker/CMSRucioClient/loadtest/loadtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def delete_replicas(client, dest_rse, replicas):


def update_loadtest(
client, source_rse, dest_rse, source_files, rule, dataset, account, activity
client, source_rse, dest_rse, source_files, rule, dataset, account, activity, tape_rses_require_approval
):
links = client.get_distance(source_rse, dest_rse)
if len(links) == 0 and rule is not None:
Expand Down Expand Up @@ -222,6 +222,8 @@ def update_loadtest(
"grouping": "DATASET",
"comment": DEFAULT_RULE_COMMENT,
}
if dest_rse in tape_rses_require_approval:
rule["ask_approval"] = True
logger.debug("Creating rule: %r" % rule)
try:
client.add_replication_rule(**rule)
Expand Down Expand Up @@ -293,6 +295,8 @@ def run(source_rse_expression, dest_rse_expression, account, activity, filesize)
client = Client(account=account)
uploader = UploadClient(_client=client, logger=logger)

tape_rses_require_approval = [rse['rse'] for rse in client.list_rses(rse_expression="requires_approval=True")]

while ACTIVE:
cycle_start = datetime.datetime.utcnow()
source_rses = [item["rse"] for item in client.list_rses(source_rse_expression)]
Expand All @@ -319,24 +323,26 @@ def run(source_rse_expression, dest_rse_expression, account, activity, filesize)
continue
source_files = list(client.list_files("cms", dataset))

dest_rules = { rule["rse_expression"]: rule
for rule in client.list_did_rules("cms", dataset)
}
dest_rules = {rule["rse_expression"]: rule
for rule in client.list_did_rules("cms", dataset)
}
for dest_rse in dest_rses:
dest_rule = dest_rules.get(dest_rse, None)
if dest_rse == source_rse:
rule_attributes = {
"dids": [{"scope": "cms", "name": dataset}],
"copies": 1,
"rse_expression": source_rse,
"account": account,
"activity": "Functional Test",
"ignore_availability": True,
"comment": "Locking loadtest replicas at source"
}
if dest_rse in tape_rses_require_approval:
rule_attributes["ask_approval"] = True
if dest_rule is None:
logger.info(f"Adding new replication rule at src for rse {source_rse}")
client.add_replication_rule (
dids = [{"scope": "cms", "name": dataset}],
copies = 1,
rse_expression = source_rse,
account = account,
activity = "Functional Test",
ignore_availability = True,
comment = "Locking loadtest replicas at source"
)

client.add_replication_rule(**rule_attributes)
else:
update_loadtest(
client,
Expand All @@ -347,6 +353,7 @@ def run(source_rse_expression, dest_rse_expression, account, activity, filesize)
dataset,
account,
activity,
tape_rses_require_approval
)

cycle_time = (datetime.datetime.utcnow() - cycle_start).total_seconds()
Expand Down

0 comments on commit e54d834

Please sign in to comment.