Skip to content

Commit

Permalink
Merge pull request #236 from HSF/flin
Browse files Browse the repository at this point in the history
shared file messenger: per queue control of payload interaction filenames ; fix jobspec when no job logs
  • Loading branch information
mightqxc authored Jun 26, 2024
2 parents a95d4d2 + fddfe3e commit b5ef5f0
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 110 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "10-06-2024 11:59:07 on flin (by mightqxc)"
timestamp = "26-06-2024 13:30:57 on flin (by mightqxc)"
3 changes: 2 additions & 1 deletion pandaharvester/harvestercore/job_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ def get_output_file_attributes(self):
scopes = self.jobParams["scopeOut"].split(",")
scopeLog = self.jobParams["scopeLog"]
logLFN = self.jobParams["logFile"]
scopes.insert(lfns.index(logLFN), scopeLog)
if scopeLog and logLFN:
scopes.insert(lfns.index(logLFN), scopeLog)
datasets = self.jobParams["realDatasets"].split(",")
endpoints = self.jobParams["ddmEndPointOut"].split(",")
for lfn, scope, dataset, endpoint in zip(lfns, scopes, datasets, endpoints):
Expand Down
40 changes: 40 additions & 0 deletions pandaharvester/harvestermessenger/base_messenger.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,52 @@
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore.plugin_base import PluginBase


# get payload interaction attributes from harvester config
def get_payload_interaction_attr(attr, default=None):
return getattr(harvester_config.payload_interaction, attr, default)


# base messenger
class BaseMessenger(PluginBase):
# constructor
def __init__(self, **kwarg):
self._load_default_attrs()
PluginBase.__init__(self, **kwarg)

# load default messenger attributes
def _load_default_attrs(self):
# json for worker attributes
self.jsonAttrsFileName = get_payload_interaction_attr("workerAttributesFile")
# json for job report
self.jsonJobReport = get_payload_interaction_attr("jobReportFile")
# json for outputs
self.jsonOutputsFileName = get_payload_interaction_attr("eventStatusDumpJsonFile")
# xml for outputs
self.xmlOutputsBaseFileName = get_payload_interaction_attr("eventStatusDumpXmlFile")
# json for job request
self.jsonJobRequestFileName = get_payload_interaction_attr("jobRequestFile")
# json for job spec
self.jobSpecFileName = get_payload_interaction_attr("jobSpecFile", "pandaJobData.out")
# json for event request
self.jsonEventsRequestFileName = get_payload_interaction_attr("eventRequestFile")
# json to feed events
self.jsonEventsFeedFileName = get_payload_interaction_attr("eventRangesFile")
# json to update events
self.jsonEventsUpdateFileName = get_payload_interaction_attr("updateEventsFile")
# PFC for input files
self.xmlPoolCatalogFileName = get_payload_interaction_attr("xmlPoolCatalogFile")
# json to get PandaIDs
self.pandaIDsFile = get_payload_interaction_attr("pandaIDsFile")
# json to kill worker itself
self.killWorkerFile = get_payload_interaction_attr("killWorkerFile", "kill_worker.json")
# json for heartbeats from the worker
self.heartbeatFile = get_payload_interaction_attr("heartbeatFile", "worker_heartbeat.json")
# task specific persistent dir
self.taskWorkBaseDir = get_payload_interaction_attr("taskWorkBaseDir", "/tmp/workdir")
# task-level work state file
self.taskWorkStateFile = get_payload_interaction_attr("taskWorkStateFile", "state.json")

# get access point
def get_access_point(self, workspec, panda_id):
pass
Expand Down
23 changes: 13 additions & 10 deletions pandaharvester/harvestermessenger/http_server_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def set_logger(master_logger):
shared_file_messenger.set_logger(master_logger)


messenger_inst = shared_file_messenger.SharedFileMessenger()


# handler for http front-end
class HttpHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -91,34 +94,34 @@ def do_POST(self):
opType = None
filePath = ""
if methodName == "requestJobs":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.jsonJobRequestFileName)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonJobRequestFileName)
opType = "w"
elif methodName == "getJobs":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.jobSpecFileName)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jobSpecFileName)
opType = "r"
elif methodName == "requestEventRanges":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.jsonEventsRequestFileName)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonEventsRequestFileName)
opType = "w"
elif methodName == "getEventRanges":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.jsonEventsFeedFileName)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonEventsFeedFileName)
opType = "r"
elif methodName == "updateJobs":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.jsonAttrsFileName)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonAttrsFileName)
opType = "w"
elif methodName == "uploadJobReport":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.jsonJobReport)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonJobReport)
opType = "w"
elif methodName == "uploadEventOutputDump":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.jsonOutputsFileName)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.jsonOutputsFileName)
opType = "w"
elif methodName == "setPandaIDs":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.pandaIDsFile)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.pandaIDsFile)
opType = "w"
elif methodName == "killWorker":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.killWorkerFile)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.killWorkerFile)
opType = "w"
elif methodName == "heartbeat":
filePath = os.path.join(workSpec.get_access_point(), shared_file_messenger.heartbeatFile)
filePath = os.path.join(workSpec.get_access_point(), messenger_inst.heartbeatFile)
opType = "w"
else:
self.send_response(501)
Expand Down
Loading

0 comments on commit b5ef5f0

Please sign in to comment.