Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.7.3.84 #121

Merged
merged 73 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
77fb107
New version
PalNilsson Mar 7, 2024
6413fe3
Printing out script name
PalNilsson Mar 8, 2024
836bc15
Corrected variable name
Mar 8, 2024
cc35a2e
Pylint updates
Mar 8, 2024
cffccd9
Added args object to kwargs. Removed input_dir
PalNilsson Mar 11, 2024
ae72cb7
Receiving pilot args
PalNilsson Mar 11, 2024
0418a19
Comment
Mar 11, 2024
cb80c0c
Merge branch 'next' of https://github.com/PalNilsson/pilot3 into next
Mar 11, 2024
64e9bcc
Debugging analytics
PalNilsson Mar 12, 2024
ecbea38
Merge branch 'next' of https://github.com/PalNilsson/pilot3 into next
Mar 13, 2024
1856680
Corrections
Mar 13, 2024
94e004e
Aborting stage-in after graceful stop
Mar 14, 2024
9b77055
tracereport: also catch gaierror for gethostbyaddr()
olifre Mar 14, 2024
5203d95
Merge pull request #117 from olifre/fix_trace_resolve_exception_handler
PalNilsson Mar 15, 2024
420c4ee
Removed debug messages
Mar 15, 2024
60d15af
Corrected workerAttributesFile usage
Mar 15, 2024
656ad88
Added cvmfs checks (availability and is alive)
PalNilsson Mar 19, 2024
bd925b0
Renamed harvester attributes
Mar 19, 2024
40cb17b
Updated deprecated logger.warn to logger.warning
PalNilsson Mar 20, 2024
c69e02a
80 to 64
Mar 20, 2024
d09d702
eventservice config for AthenaMT
esseivaju Mar 20, 2024
405004e
Merge pull request #118 from esseivaju/athenaMT-EventService
PalNilsson Mar 21, 2024
1310bff
Added CVMFS checks
PalNilsson Mar 21, 2024
868de1d
Merge remote-tracking branch 'upstream/next' into next
PalNilsson Mar 21, 2024
3a9fd35
Merge
PalNilsson Mar 21, 2024
0fab7b6
Merge remote-tracking branch 'origin/next' into next
PalNilsson Mar 21, 2024
f3335c7
fix typo
esseivaju Mar 21, 2024
682b10e
correctly handle yampl message from athenaMT-ES
esseivaju Mar 22, 2024
7d66f6a
Merge pull request #119 from esseivaju/mtes-fix
PalNilsson Mar 25, 2024
536f3e1
Merge remote-tracking branch 'upstream/next' into next
Mar 25, 2024
b4c3d9c
Added comment
Mar 25, 2024
6ababd5
Updated cvmfs checks order
Mar 25, 2024
3bdc978
Pylint updates
Mar 25, 2024
1f0f804
Pylint updates
Mar 25, 2024
0b67039
Pylint updates and refactoring
Mar 25, 2024
ea30100
Pylint updates and refactoring
Mar 25, 2024
a9063e5
Pylint updates
Mar 25, 2024
4769ae4
Pylint updates
Mar 25, 2024
05479b8
Pylint updates
Mar 25, 2024
c213839
Pylint updates
Mar 25, 2024
7b02b8b
Pylint updates
Mar 25, 2024
c98310b
Pylint updates
Mar 25, 2024
583628f
Pylint updates
Mar 25, 2024
bbdf1ab
Pylint updates
Mar 26, 2024
58ce5b8
Timing out remote file open
Mar 29, 2024
e896f8e
Corrected graceful_stop
Mar 29, 2024
a0b1b6d
Corrected file paths for trace curl
Apr 1, 2024
b2f8d29
Reduced number of parameters
Apr 1, 2024
d03a489
Cleanup
PalNilsson Apr 8, 2024
f19398b
Now using compressed JSON
Apr 8, 2024
e52ae08
Updates to request2() and its usage
Apr 10, 2024
4c5a5be
Curl replacement changes
Apr 11, 2024
b842fd5
More urllib tests
PalNilsson Apr 12, 2024
d7e05d6
Curl replacement now working for traces
Apr 12, 2024
cb68adb
Cleanup
PalNilsson Apr 17, 2024
c9318dc
Now using request2() for getJob/updateJob
PalNilsson Apr 17, 2024
7a459bd
Updates
PalNilsson Apr 17, 2024
ff581cf
Refactoring
PalNilsson Apr 18, 2024
2cd5278
Cleanup
Apr 19, 2024
4702344
Reporting GPU info with job metrics
Apr 19, 2024
e3a39c4
Uploading prmon output using urllib instead of curl
Apr 21, 2024
5a83bb0
Added error handling for collector urllib call. Added replica printin…
Apr 26, 2024
6cba880
Added memory_ to remove_unwanted_files for rubin
Apr 26, 2024
bb42bde
Debug info, comments
Apr 26, 2024
590672c
Corrected ARM info in cpuconsumptionunit
Apr 26, 2024
7fae386
Corrected ARM info in cpuconsumptionunit
Apr 29, 2024
aaf9a08
Now using download_file()
Apr 29, 2024
5edd295
Now using download_file() to download trf
Apr 29, 2024
7a3a356
Coments
Apr 29, 2024
413e2a5
Type hints
Apr 29, 2024
97ebb98
Fixed getProxy and voms-proxy-info check
Apr 29, 2024
a02e4bb
Correction for job suspension
Apr 30, 2024
7d3a434
Merge pull request #120 from PalNilsson/next
PalNilsson Apr 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.7.2.4
3.7.3.84
20 changes: 19 additions & 1 deletion pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@

from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import PilotException
from pilot.util.config import config
from pilot.info import infosys
from pilot.util.auxiliary import (
pilot_version_banner,
shell_exit_code,
convert_signal_to_exit_code
)
from pilot.util.config import config
from pilot.util.constants import (
get_pilot_version,
SUCCESS,
Expand All @@ -53,6 +53,10 @@
SERVER_UPDATE_NOT_DONE,
PILOT_MULTIJOB_START_TIME,
)
from pilot.util.cvmfs import (
is_cvmfs_available,
get_last_update
)
from pilot.util.filehandling import (
get_pilot_work_dir,
mkdirs,
Expand All @@ -71,8 +75,12 @@
from pilot.util.networking import dump_ipv6_info
from pilot.util.processgroups import find_defunct_subprocesses
from pilot.util.timing import add_to_pilot_timing
from pilot.util.workernode import get_node_name

errors = ErrorCodes()
mainworkdir = ""
args = None
trace = None


def main() -> int:
Expand Down Expand Up @@ -112,6 +120,15 @@ def main() -> int:
"started", args.queue, args.url, args.port, logger, "IPv6"
) # note: assuming IPv6, fallback in place

# check cvmfs if available
if is_cvmfs_available() is True: # ignore None, False is handled in function
timestamp = get_last_update()
if timestamp and timestamp > 0:
logger.info('CVMFS has been validated')
else:
logger.warning('CVMFS is not responding - aborting pilot')
return errors.CVMFSISNOTALIVE

if not args.rucio_host:
args.rucio_host = config.Rucio.host

Expand Down Expand Up @@ -798,6 +815,7 @@ def send_worker_status(
data["harvesterID"] = os.environ.get("HARVESTER_ID", None)
data["status"] = status
data["site"] = queue
data["node_id"] = get_node_name()

# attempt to send the worker info to the server
if data["workerID"] and data["harvesterID"]:
Expand Down
70 changes: 34 additions & 36 deletions pilot/api/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def fit(self, x: list, y: list, model: str = "linear") -> Any:
"""
try:
self._fit = Fit(x=x, y=y, model=model)
except Exception as e:
raise UnknownException(e)
except Exception as exc:
raise UnknownException(exc) from exc

return self._fit

Expand All @@ -71,12 +71,10 @@ def slope(self) -> float:
:raises NotDefined: exception thrown if fit is not defined.
:return: slope (float).
"""
if self._fit:
slope = self._fit.slope()
else:
if not self._fit:
raise NotDefined("Fit has not been defined")

return slope
return self._fit.slope()

def intersect(self) -> float:
"""
Expand All @@ -85,12 +83,10 @@ def intersect(self) -> float:
:raises NotDefined: exception thrown if fit is not defined
:return: intersect (float).
"""
if self._fit:
intersect = self._fit.intersect()
else:
if not self._fit:
raise NotDefined("Fit has not been defined")

return intersect
return self._fit.intersect()

def chi2(self) -> float:
"""
Expand All @@ -99,12 +95,10 @@ def chi2(self) -> float:
:raises NotDefined: exception thrown if fit is not defined
:return: chi2 (float).
"""
if self._fit:
x2 = self._fit.chi2()
else:
if not self._fit:
raise NotDefined("Fit has not been defined")

return x2
return self._fit.chi2()

def get_table(self, filename: str, header: str = "", separator: str = "\t", convert_to_float: bool = True) -> dict:
"""
Expand Down Expand Up @@ -139,7 +133,8 @@ def get_fitted_data(
:return: {"slope": slope, "chi2": chi2} (dict).
"""
slope = ""
chi2 = ""
intersect = ""
_chi2 = ""
table = self.get_table(filename)

if table:
Expand Down Expand Up @@ -198,24 +193,19 @@ def get_fitted_data(
fit = self.fit(x, y)
_slope = self.slope()
except Exception as exc:
logger.warning(
"failed to fit data, x=%s, y=%s: %s", str(x), str(y), exc
)
logger.warning(f"failed to fit data, x={x}, y={y}: {exc}")
else:
if _slope:
slope = float_to_rounded_string(
fit.slope(), precision=precision
)
chi2 = float_to_rounded_string(fit.chi2(), precision=precision)
slope = float_to_rounded_string(fit.slope(), precision=precision)
fit.set_intersect()
intersect = float_to_rounded_string(fit.intersect(), precision=precision)
_chi2 = float_to_rounded_string(fit.chi2(), precision=precision)
if slope != "":
logger.info(
"current memory leak: %s B/s (using %d data points, chi2=%s)",
slope,
len(x),
chi2,
f"current memory leak: {slope} B/s (using {len(x)} data points, chi2={_chi2})"
)

return {"slope": slope, "chi2": chi2}
return {"slope": slope, "chi2": _chi2, "intersect": intersect}

def find_limit(
self, _x, _y, _chi2_org, norg, change_limit=0.25, edge="right", steps=5
Expand Down Expand Up @@ -244,8 +234,8 @@ def find_limit(
if change < change_limit:
found = True
break
else:
_chi2_prev = _chi2

_chi2_prev = _chi2

if edge == "right":
if not found:
Expand All @@ -254,13 +244,12 @@ def find_limit(
else:
limit = len(_x) - 1
logger.info(f"right removable region: {limit}")
elif not found:
limit = 0
logger.info("left removable region not found")
else:
if not found:
limit = 0
logger.info("left removable region not found")
else:
limit = iterations * 10
logger.info(f"left removable region: {limit}")
limit = iterations * 10
logger.info(f"left removable region: {limit}")

return limit

Expand Down Expand Up @@ -293,7 +282,7 @@ def extract_from_table(self, table, x_name, y_name):
return x, y


class Fit(object):
class Fit():
"""Low-level fitting class."""

_model = "linear" # fitting model
Expand Down Expand Up @@ -325,15 +314,22 @@ def __init__(self, **kwargs):
if len(self._x) != len(self._y):
raise NotSameLength("input data (lists) have different lengths")

logger.info(f'model: {self._model}, x: {self._x}, y: {self._y}')
# base calculations
if self._model == "linear":
self._ss = sum_square_dev(self._x)
logger.info("sum of square deviations: %s", self._ss)
self._ss2 = sum_dev(self._x, self._y)
logger.info("sum of deviations: %s", self._ss2)
self.set_slope()
self._xm = mean(self._x)
logger.info("mean x: %s", self._xm)
self._ym = mean(self._y)
logger.info("mean y: %s", self._ym)
self.set_intersect()
logger.info("intersect: %s", self._intersect)
self.set_chi2()
logger.info("chi2: %s", self._chi2)
else:
logger.warning("'%s' model is not implemented", self._model)
raise NotImplementedError()
Expand Down Expand Up @@ -407,8 +403,10 @@ def set_intersect(self):
"""
if self._ym and self._slope and self._xm:
self._intersect = self._ym - self._slope * self._xm
logger.info("-- intersect: %s", self._intersect)
else:
self._intersect = None
logger.info("could not calculate intersect")

def intersect(self):
"""
Expand Down
29 changes: 26 additions & 3 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,25 @@ def prepare_inputddms(self, files, activities=None):
if not fdat.inputddms and fdat.ddmendpoint:
fdat.inputddms = [fdat.ddmendpoint]

def print_replicas(self, replicas, label='unsorted'):
"""
Print replicas.

:param replicas: list of replicas (Any)
:param label: label (str).
"""
number = 1
maxnumber = 10
self.logger.info(f'{label} list of replicas: (max {maxnumber})')
for pfn, xdat in replicas:
self.logger.debug(f"{number}. "
f"lfn={pfn}, "
f"rse={xdat.get('ddmendpoint')}, "
f"domain={xdat.get('domain')}")
number += 1
if number > maxnumber:
break

@classmethod
def sort_replicas(self, replicas, inputddms):
"""
Expand Down Expand Up @@ -242,7 +261,7 @@ def sort_replicas(self, replicas, inputddms):
continue
xreplicas.append((pfn, xdat))

return replicas
return xreplicas

def resolve_replicas(self, files, use_vp=False):
"""
Expand Down Expand Up @@ -369,7 +388,9 @@ def add_replicas(self, fdat, replica):
sorted_replicas = sorted(iter(list(replica.get('pfns', {}).items())), key=lambda x: x[1]['priority'])

# prefer replicas from inputddms first
#self.print_replicas(sorted_replicas)
xreplicas = self.sort_replicas(sorted_replicas, fdat.inputddms)
self.print_replicas(xreplicas)

for pfn, xdat in xreplicas:

Expand Down Expand Up @@ -494,7 +515,7 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901
raise PilotException('failed to resolve copytool by preferred activities=%s, acopytools=%s' %
(activity, self.acopytools))

# populate inputddms if need
# populate inputddms if needed
self.prepare_inputddms(files)

# initialize ddm_activity name for requested files if not set
Expand Down Expand Up @@ -839,7 +860,9 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): # noqa: C90

# prepare files (resolve protocol/transfer url)
if getattr(copytool, 'require_input_protocols', False) and files:
self.require_protocols(files, copytool, activity, local_dir=kwargs.get('input_dir'))
args = kwargs.get('args')
input_dir = kwargs.get('input_dir') if not args else args.input_dir
self.require_protocols(files, copytool, activity, local_dir=input_dir)

# mark direct access files with status=remote_io
self.set_status_for_direct_access(files, kwargs.get('workdir', ''))
Expand Down
4 changes: 3 additions & 1 deletion pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class ErrorCodes:
REMOTEFILEDICTDOESNOTEXIST = 1374
LEASETIME = 1375
LOGCREATIONTIMEOUT = 1376
CVMFSISNOTALIVE = 1377

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -315,7 +316,8 @@ class ErrorCodes:
CERTIFICATEHASEXPIRED: "Certificate has expired",
REMOTEFILEDICTDOESNOTEXIST: "Remote file open dictionary does not exist",
LEASETIME: "Lease time is up", # internal use only
LOGCREATIONTIMEOUT: "Log file creation timed out"
LOGCREATIONTIMEOUT: "Log file creation timed out",
CVMFSISNOTALIVE: "CVMFS is not responding"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
Loading
Loading