diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index 210ff763098..91c7c1156e4 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -529,20 +529,19 @@ def __del__(self): except Exception: pass - def _except(self, methodName, x, err, cmd="", print=True): + def _except(self, methodName, x, err, cmd="", debug=True): """ print MySQL error or exception return S_ERROR with Exception """ - try: raise x except MySQLdb.Error as e: - if print: + if debug: self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", "%d: %s" % (e.args[0], e.args[1])) return S_ERROR(DErrno.EMYSQL, "%s: ( %d: %s )" % (err, e.args[0], e.args[1])) except Exception as e: - if print: + if debug: self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", repr(e)) return S_ERROR(DErrno.EMYSQL, f"{err}: ({repr(e)})") @@ -757,8 +756,8 @@ def _update(self, cmd, *, conn=None, debug=True): :param debug: print or not the errors - return S_OK with number of updated registers upon success - return S_ERROR upon error + :return: S_OK with number of updated registers upon success. + S_ERROR upon error. """ self.log.debug(f"_update: {self._safeCmd(cmd)}") @@ -786,6 +785,41 @@ def _update(self, cmd, *, conn=None, debug=True): return retDict + @captureOptimizerTraces + def _updatemany(self, cmd, data, *, conn=None, debug=True): + """execute MySQL updatemany command + + :param debug: print or not the errors + + :return: S_OK with number of updated registers upon success. + S_ERROR upon error. + """ + + self.log.debug(f"_updatemany: {self._safeCmd(cmd)}") + if conn: + connection = conn + else: + retDict = self._getConnection() + if not retDict["OK"]: + return retDict + connection = retDict["Value"] + + try: + cursor = connection.cursor() + res = cursor.executemany(cmd, data) + retDict = S_OK(res) + if cursor.lastrowid: + retDict["lastRowId"] = cursor.lastrowid + except Exception as x: + retDict = self._except("_updatemany", x, "Execution failed.", cmd, debug) + + try: + cursor.close() + except Exception: + pass + + return retDict + def _transaction(self, cmdList, conn=None): """dummy transaction support diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 5fca9478adf..100f39c813d 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -449,6 +449,9 @@ def setJobAttribute(self, jobID, attrName, attrValue, update=False, myDate=None, :return: S_OK/S_ERROR """ + if not jobID: + return S_OK() + if attrName not in self.jobAttributeNames: return S_ERROR(EWMSJMAN, "Request to set non-existing job attribute") @@ -506,6 +509,9 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No :return: S_OK/S_ERROR """ + if not jobID: + return S_OK() + jobIDList = jobID if not isinstance(jobID, (list, tuple)): jobIDList = [jobID] diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py index bf813a3803c..fbc2ea3f2f3 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py @@ -46,33 +46,61 @@ def addLoggingRecord( event = f"status/minor/app={status}/{minorStatus}/{applicationStatus}" self.log.info("Adding record for job ", str(jobID) + ": '" + event + "' from " + source) - try: + def _get_date(date): + # We need to specify that timezone is UTC because otherwise timestamp + # assumes local time while we mean UTC. if not date: - # Make the UTC datetime string and float - _date = datetime.datetime.utcnow() + # Make the UTC datetime + return datetime.datetime.utcnow() elif isinstance(date, str): # The date is provided as a string in UTC - _date = TimeUtilities.fromString(date) + return TimeUtilities.fromString(date) elif isinstance(date, datetime.datetime): - _date = date + return date + else: + raise Exception("Incorrect date for the logging record") + + try: + if isinstance(date, list): + _date = [] + for d in date: + try: + _date.append(_get_date(d)) + except Exception: + self.log.exception("Exception while date evaluation") + _date.append(datetime.datetime.utcnow()) else: - self.log.error("Incorrect date for the logging record") - _date = datetime.datetime.utcnow() + _date = _get_date(date) except Exception: self.log.exception("Exception while date evaluation") - _date = datetime.datetime.utcnow() - - # We need to specify that timezone is UTC because otherwise timestamp - # assumes local time while we mean UTC. - epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER + _date = [datetime.datetime.utcnow()] cmd = ( "INSERT INTO LoggingInfo (JobId, Status, MinorStatus, ApplicationStatus, " - + "StatusTime, StatusTimeOrder, StatusSource) VALUES (%d,'%s','%s','%s','%s',%f,'%s')" - % (int(jobID), status, minorStatus, applicationStatus[:255], str(_date), epoc, source[:32]) + + "StatusTime, StatusTimeOrder, StatusSource) VALUES " ) - return self._update(cmd) + if not isinstance(jobID, list): + jobID = [jobID] + + if isinstance(status, str): + status = [status] * len(jobID) + if isinstance(minorStatus, str): + minorStatus = [minorStatus] * len(jobID) + if isinstance(applicationStatus, str): + applicationStatus = [applicationStatus[:255]] * len(jobID) + if isinstance(_date, datetime.datetime): + _date = [_date] * len(jobID) + + epocs = [] + for dt in _date: + epoc = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER + epocs.append(epoc) + cmd = cmd + "(%s, %s, %s, %s, %s, %s, %s)" + data = list( + zip(jobID, status, minorStatus, applicationStatus, _date, epocs, [source[:32]] * len(jobID), strict=True) + ) + return self._updatemany(cmd, data) ############################################################################# def getJobLoggingInfo(self, jobID): diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index d34b2712042..3ab3a3e880b 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -169,6 +169,9 @@ def export_submitJob(self, jobDesc): jobDescList = [jobDesc] jobIDList = [] + statusList = [] + minorStatusList = [] + timeStampList = [] if parametricJob: initialStatus = JobStatus.SUBMITTING @@ -206,13 +209,25 @@ def export_submitJob(self, jobDesc): return result jobID = result["JobID"] - self.log.info(f'Job added to the JobDB", "{jobID} for {self.owner}/{self.ownerGroup}') - - self.jobLoggingDB.addLoggingRecord( - jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager" - ) + self.log.info(f"Job added to the JobDB", f"{jobID} for {self.owner}/{self.ownerGroup}") jobIDList.append(jobID) + statusList.append(result["Status"]) + minorStatusList.append(result["MinorStatus"]) + timeStampList.append(result["TimeStamp"]) + + # insert records in logging DB + + # For parametric jobs I can insert logging records in a bulk + if parametricJob and len(set(jobIDList)) == len(jobIDList): + result = self.jobLoggingDB.addLoggingRecord( + jobIDList, statusList, minorStatusList, date=timeStampList, source="JobManager" + ) + else: + for jobID, status, minorStatus, timeStamp in zip(jobIDList, statusList, minorStatusList, timeStampList): + result = self.jobLoggingDB.addLoggingRecord( + jobID, status, minorStatus, date=timeStamp, source="JobManager" + ) if parametricJob: result = S_OK(jobIDList) diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py index de3445925cf..277ba822030 100755 --- a/tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py @@ -46,4 +46,22 @@ def test_JobStatus(jobLoggingDB: JobLoggingDB): result = jobLoggingDB.getWMSTimeStamps(1) assert result["OK"] is True, result["Message"] + now = datetime.datetime.utcnow() + result = jobLoggingDB.addLoggingRecord( + [2, 3, 4, 5], + status=["testing", "testing", "testing", "testing"], + minorStatus=["mn", "mn", "mn", "mn"], + date=[now, now, now, now], + source="Unittest", + ) + assert result["OK"] is True, result["Message"] + + result = jobLoggingDB.getJobLoggingInfo(2) + assert result["OK"] is True, result["Message"] + assert result["Value"][-1][0:3] == ("testing", "mn", "Unknown") + + result = jobLoggingDB.getJobLoggingInfo(5) + assert result["OK"] is True, result["Message"] + assert result["Value"][-1][0:3] == ("testing", "mn", "Unknown") + jobLoggingDB.deleteJob(1)