Skip to content

Commit

Permalink
Merge pull request #583 from dynamic-entropy/lock_loadtest_files_at_s…
Browse files Browse the repository at this point in the history
…ource

Add locks to loadtest files at source
  • Loading branch information
dynamic-entropy authored Sep 1, 2023
2 parents f47a763 + f1e07d0 commit c0f92c3
Showing 1 changed file with 27 additions and 28 deletions.
55 changes: 27 additions & 28 deletions docker/CMSRucioClient/loadtest/loadtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,40 +315,39 @@ def run(source_rse_expression, dest_rse_expression, account, activity, filesize)
client, uploader, source_rse, filesize, next_filenumber
)
if not success:
logger.error(
f"RSE {source_rse} has no source files and could not upload, skipping"
)
logger.error(f"RSE {source_rse} has no source files and could not upload, skipping")
continue
source_files = list(client.list_files("cms", dataset))

dest_rules = client.list_replication_rules(
{
"scope": "cms",
"name": dataset,
"account": account,
"activity": activity,
}
)
dest_rules = {
rule["rse_expression"]: rule
for rule in dest_rules
if rule["source_replica_expression"] == source_rse
dest_rules = { rule["rse_expression"]: rule
for rule in client.list_did_rules("cms", dataset)
}

for dest_rse in dest_rses:
if dest_rse == source_rse:
continue
dest_rule = dest_rules.get(dest_rse, None)
update_loadtest(
client,
source_rse,
dest_rse,
source_files,
dest_rule,
dataset,
account,
activity,
)
if dest_rse == source_rse:
if dest_rule is None:
logger.info(f"Adding new replication rule at src for rse {source_rse}")
client.add_replication_rule (
dids = [{"scope": "cms", "name": dataset}],
copies = 1,
rse_expression = source_rse,
account = account,
activity = "Functional Test",
ignore_availability = True,
comment = "Locking loadtest replicas at source"
)

else:
update_loadtest(
client,
source_rse,
dest_rse,
source_files,
dest_rule,
dataset,
account,
activity,
)

cycle_time = (datetime.datetime.utcnow() - cycle_start).total_seconds()
logger.info(f"Completed loadtest cycle in {cycle_time}s")
Expand Down

0 comments on commit c0f92c3

Please sign in to comment.