Skip to content

Commit

Permalink
for taskQueuedTime
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaeno committed Dec 10, 2024
1 parent 343c4a6 commit fbc2fc4
Show file tree
Hide file tree
Showing 8 changed files with 579 additions and 291 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.5.0"
release_version = "0.5.1"
35 changes: 35 additions & 0 deletions pandaserver/taskbuffer/JobSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class JobSpec(object):
"requestType": "rt",
"jobCloning": "sc",
"scoutJob": "sj",
"taskQueuedTime": "tq",
"usePrefetcher": "up",
"useSecrets": "us",
"useZipToPin": "uz",
Expand Down Expand Up @@ -917,6 +918,22 @@ def set_input_output_file_types(self) -> None:
out_types = sorted(list(out_types))
self.outputFileType = ",".join(out_types)[: self._limitLength["outputFileType"]]

# set task queued time
def set_task_queued_time(self, queued_time: float | None):
"""
Set task queued time in job metrics. Skip if queued_time is None
:param queued_time: task queued time in seconds since epoch
"""
if queued_time is None:
return
if self.specialHandling in [None, "", "NULL"]:
items = []
else:
items = self.specialHandling.split(",")
items.append(f"{self._tagForSH['taskQueuedTime']}={queued_time}")
self.specialHandling = ",".join(items)


# utils

Expand All @@ -927,3 +944,21 @@ def push_status_changes(special_handling):
items = special_handling.split(",")
return JobSpec._tagForSH["pushStatusChanges"] in items
return False


# get task queued time
def get_task_queued_time(special_handling) -> datetime.datetime | None:
"""
Get task queued time from job metrics
:param special_handling: special handling string
:return: task queued time. None if unset
"""
try:
if special_handling is not None:
for item in special_handling.split(","):
if item.startswith(f"{JobSpec._tagForSH['taskQueuedTime']}="):
return datetime.datetime.fromtimestamp(float(item.split("=")[-1]), datetime.timezone.utc)
except Exception:
pass
return None
333 changes: 43 additions & 290 deletions pandaserver/taskbuffer/OraDBProxy.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pandaserver.srvcore import CoreUtils
from pandaserver.taskbuffer import ErrorCode, EventServiceUtils, JobUtils, ProcessGroups
from pandaserver.taskbuffer.DBProxyPool import DBProxyPool
from pandaserver.taskbuffer.JobSpec import get_task_queued_time

_logger = PandaLogger().getLogger("TaskBuffer")

Expand Down Expand Up @@ -370,6 +371,7 @@ def storeJobs(
use_secrets = job.use_secrets()
push_changes = job.push_status_changes()
is_push_job = job.is_push_job()
task_queued_time = get_task_queued_time(job.specialHandling)
# reset specialHandling
specialHandling = specialHandling[:-1]
job.specialHandling = specialHandling
Expand All @@ -385,6 +387,8 @@ def storeJobs(
job.set_push_status_changes()
if is_push_job:
job.set_push_job()
if task_queued_time:
job.set_task_queued_time(task_queued_time.timestamp())
# set DDM backend
if ddmBackEnd is not None:
job.setDdmBackEnd(ddmBackEnd)
Expand Down
Empty file.
58 changes: 58 additions & 0 deletions pandaserver/taskbuffer/db_proxy_mods/base_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import sys
import traceback

from pandacommon.pandalogger.LogWrapper import LogWrapper
from pandacommon.pandalogger.PandaLogger import PandaLogger


# Base class for DB proxy modules
class BaseModule:
# constructor
def __init__(self, log_stream: PandaLogger):
self._log_stream = log_stream
self.conn = None
self.cur = None

# abstract method to commit
def _commit(self):
"""
Commit the transaction
"""
raise NotImplementedError("commit is not implemented")

# abstract method to rollback
def _rollback(self):
"""
Rollback the transaction
"""
raise NotImplementedError("rollback is not implemented")

# dump error message
def dump_error_message(self, tmp_log: LogWrapper):
"""
Dump error message to the log
:param tmp_log: log wrapper
"""
# error
err_type, err_value = sys.exc_info()[:2]
err_str = f"{err_type.__name__} {err_value}"
err_str.strip()
err_str += " "
err_str += traceback.format_exc()
tmp_log.error(err_str)

# create method name and logger
def create_method_name_logger(self, comment: str, tag: str = None) -> tuple[str, LogWrapper]:
"""
Create method name and logger from function comment
param comment: comment of the function
param tag: tag to add to the method name
return: (method name, log wrapper)
"""
method_name = comment.split(" ")[-2].split(".")[-1]
if tag is not None:
method_name += f" < {tag} >"
tmp_log = LogWrapper(self._log_stream, method_name)
return method_name, tmp_log
Loading

0 comments on commit fbc2fc4

Please sign in to comment.