Skip to content

Commit

Permalink
Merge pull request #120 from PalNilsson/next
Browse files Browse the repository at this point in the history
3.7.3.84
  • Loading branch information
PalNilsson authored Apr 30, 2024
2 parents 7d66f6a + a02e4bb commit 7d3a434
Show file tree
Hide file tree
Showing 40 changed files with 1,156 additions and 501 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.7.2.4
3.7.3.84
20 changes: 19 additions & 1 deletion pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@

from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import PilotException
from pilot.util.config import config
from pilot.info import infosys
from pilot.util.auxiliary import (
pilot_version_banner,
shell_exit_code,
convert_signal_to_exit_code
)
from pilot.util.config import config
from pilot.util.constants import (
get_pilot_version,
SUCCESS,
Expand All @@ -53,6 +53,10 @@
SERVER_UPDATE_NOT_DONE,
PILOT_MULTIJOB_START_TIME,
)
from pilot.util.cvmfs import (
is_cvmfs_available,
get_last_update
)
from pilot.util.filehandling import (
get_pilot_work_dir,
mkdirs,
Expand All @@ -71,8 +75,12 @@
from pilot.util.networking import dump_ipv6_info
from pilot.util.processgroups import find_defunct_subprocesses
from pilot.util.timing import add_to_pilot_timing
from pilot.util.workernode import get_node_name

errors = ErrorCodes()
mainworkdir = ""
args = None
trace = None


def main() -> int:
Expand Down Expand Up @@ -112,6 +120,15 @@ def main() -> int:
"started", args.queue, args.url, args.port, logger, "IPv6"
) # note: assuming IPv6, fallback in place

# check cvmfs if available
if is_cvmfs_available() is True: # ignore None, False is handled in function
timestamp = get_last_update()
if timestamp and timestamp > 0:
logger.info('CVMFS has been validated')
else:
logger.warning('CVMFS is not responding - aborting pilot')
return errors.CVMFSISNOTALIVE

if not args.rucio_host:
args.rucio_host = config.Rucio.host

Expand Down Expand Up @@ -798,6 +815,7 @@ def send_worker_status(
data["harvesterID"] = os.environ.get("HARVESTER_ID", None)
data["status"] = status
data["site"] = queue
data["node_id"] = get_node_name()

# attempt to send the worker info to the server
if data["workerID"] and data["harvesterID"]:
Expand Down
70 changes: 34 additions & 36 deletions pilot/api/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def fit(self, x: list, y: list, model: str = "linear") -> Any:
"""
try:
self._fit = Fit(x=x, y=y, model=model)
except Exception as e:
raise UnknownException(e)
except Exception as exc:
raise UnknownException(exc) from exc

return self._fit

Expand All @@ -71,12 +71,10 @@ def slope(self) -> float:
:raises NotDefined: exception thrown if fit is not defined.
:return: slope (float).
"""
if self._fit:
slope = self._fit.slope()
else:
if not self._fit:
raise NotDefined("Fit has not been defined")

return slope
return self._fit.slope()

def intersect(self) -> float:
"""
Expand All @@ -85,12 +83,10 @@ def intersect(self) -> float:
:raises NotDefined: exception thrown if fit is not defined
:return: intersect (float).
"""
if self._fit:
intersect = self._fit.intersect()
else:
if not self._fit:
raise NotDefined("Fit has not been defined")

return intersect
return self._fit.intersect()

def chi2(self) -> float:
"""
Expand All @@ -99,12 +95,10 @@ def chi2(self) -> float:
:raises NotDefined: exception thrown if fit is not defined
:return: chi2 (float).
"""
if self._fit:
x2 = self._fit.chi2()
else:
if not self._fit:
raise NotDefined("Fit has not been defined")

return x2
return self._fit.chi2()

def get_table(self, filename: str, header: str = "", separator: str = "\t", convert_to_float: bool = True) -> dict:
"""
Expand Down Expand Up @@ -139,7 +133,8 @@ def get_fitted_data(
:return: {"slope": slope, "chi2": chi2} (dict).
"""
slope = ""
chi2 = ""
intersect = ""
_chi2 = ""
table = self.get_table(filename)

if table:
Expand Down Expand Up @@ -198,24 +193,19 @@ def get_fitted_data(
fit = self.fit(x, y)
_slope = self.slope()
except Exception as exc:
logger.warning(
"failed to fit data, x=%s, y=%s: %s", str(x), str(y), exc
)
logger.warning(f"failed to fit data, x={x}, y={y}: {exc}")
else:
if _slope:
slope = float_to_rounded_string(
fit.slope(), precision=precision
)
chi2 = float_to_rounded_string(fit.chi2(), precision=precision)
slope = float_to_rounded_string(fit.slope(), precision=precision)
fit.set_intersect()
intersect = float_to_rounded_string(fit.intersect(), precision=precision)
_chi2 = float_to_rounded_string(fit.chi2(), precision=precision)
if slope != "":
logger.info(
"current memory leak: %s B/s (using %d data points, chi2=%s)",
slope,
len(x),
chi2,
f"current memory leak: {slope} B/s (using {len(x)} data points, chi2={_chi2})"
)

return {"slope": slope, "chi2": chi2}
return {"slope": slope, "chi2": _chi2, "intersect": intersect}

def find_limit(
self, _x, _y, _chi2_org, norg, change_limit=0.25, edge="right", steps=5
Expand Down Expand Up @@ -244,8 +234,8 @@ def find_limit(
if change < change_limit:
found = True
break
else:
_chi2_prev = _chi2

_chi2_prev = _chi2

if edge == "right":
if not found:
Expand All @@ -254,13 +244,12 @@ def find_limit(
else:
limit = len(_x) - 1
logger.info(f"right removable region: {limit}")
elif not found:
limit = 0
logger.info("left removable region not found")
else:
if not found:
limit = 0
logger.info("left removable region not found")
else:
limit = iterations * 10
logger.info(f"left removable region: {limit}")
limit = iterations * 10
logger.info(f"left removable region: {limit}")

return limit

Expand Down Expand Up @@ -293,7 +282,7 @@ def extract_from_table(self, table, x_name, y_name):
return x, y


class Fit(object):
class Fit():
"""Low-level fitting class."""

_model = "linear" # fitting model
Expand Down Expand Up @@ -325,15 +314,22 @@ def __init__(self, **kwargs):
if len(self._x) != len(self._y):
raise NotSameLength("input data (lists) have different lengths")

logger.info(f'model: {self._model}, x: {self._x}, y: {self._y}')
# base calculations
if self._model == "linear":
self._ss = sum_square_dev(self._x)
logger.info("sum of square deviations: %s", self._ss)
self._ss2 = sum_dev(self._x, self._y)
logger.info("sum of deviations: %s", self._ss2)
self.set_slope()
self._xm = mean(self._x)
logger.info("mean x: %s", self._xm)
self._ym = mean(self._y)
logger.info("mean y: %s", self._ym)
self.set_intersect()
logger.info("intersect: %s", self._intersect)
self.set_chi2()
logger.info("chi2: %s", self._chi2)
else:
logger.warning("'%s' model is not implemented", self._model)
raise NotImplementedError()
Expand Down Expand Up @@ -407,8 +403,10 @@ def set_intersect(self):
"""
if self._ym and self._slope and self._xm:
self._intersect = self._ym - self._slope * self._xm
logger.info("-- intersect: %s", self._intersect)
else:
self._intersect = None
logger.info("could not calculate intersect")

def intersect(self):
"""
Expand Down
29 changes: 26 additions & 3 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,25 @@ def prepare_inputddms(self, files, activities=None):
if not fdat.inputddms and fdat.ddmendpoint:
fdat.inputddms = [fdat.ddmendpoint]

def print_replicas(self, replicas, label='unsorted'):
"""
Print replicas.
:param replicas: list of replicas (Any)
:param label: label (str).
"""
number = 1
maxnumber = 10
self.logger.info(f'{label} list of replicas: (max {maxnumber})')
for pfn, xdat in replicas:
self.logger.debug(f"{number}. "
f"lfn={pfn}, "
f"rse={xdat.get('ddmendpoint')}, "
f"domain={xdat.get('domain')}")
number += 1
if number > maxnumber:
break

@classmethod
def sort_replicas(self, replicas, inputddms):
"""
Expand Down Expand Up @@ -242,7 +261,7 @@ def sort_replicas(self, replicas, inputddms):
continue
xreplicas.append((pfn, xdat))

return replicas
return xreplicas

def resolve_replicas(self, files, use_vp=False):
"""
Expand Down Expand Up @@ -369,7 +388,9 @@ def add_replicas(self, fdat, replica):
sorted_replicas = sorted(iter(list(replica.get('pfns', {}).items())), key=lambda x: x[1]['priority'])

# prefer replicas from inputddms first
#self.print_replicas(sorted_replicas)
xreplicas = self.sort_replicas(sorted_replicas, fdat.inputddms)
self.print_replicas(xreplicas)

for pfn, xdat in xreplicas:

Expand Down Expand Up @@ -494,7 +515,7 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901
raise PilotException('failed to resolve copytool by preferred activities=%s, acopytools=%s' %
(activity, self.acopytools))

# populate inputddms if need
# populate inputddms if needed
self.prepare_inputddms(files)

# initialize ddm_activity name for requested files if not set
Expand Down Expand Up @@ -839,7 +860,9 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): # noqa: C90

# prepare files (resolve protocol/transfer url)
if getattr(copytool, 'require_input_protocols', False) and files:
self.require_protocols(files, copytool, activity, local_dir=kwargs.get('input_dir'))
args = kwargs.get('args')
input_dir = kwargs.get('input_dir') if not args else args.input_dir
self.require_protocols(files, copytool, activity, local_dir=input_dir)

# mark direct access files with status=remote_io
self.set_status_for_direct_access(files, kwargs.get('workdir', ''))
Expand Down
4 changes: 3 additions & 1 deletion pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class ErrorCodes:
REMOTEFILEDICTDOESNOTEXIST = 1374
LEASETIME = 1375
LOGCREATIONTIMEOUT = 1376
CVMFSISNOTALIVE = 1377

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -315,7 +316,8 @@ class ErrorCodes:
CERTIFICATEHASEXPIRED: "Certificate has expired",
REMOTEFILEDICTDOESNOTEXIST: "Remote file open dictionary does not exist",
LEASETIME: "Lease time is up", # internal use only
LOGCREATIONTIMEOUT: "Log file creation timed out"
LOGCREATIONTIMEOUT: "Log file creation timed out",
CVMFSISNOTALIVE: "CVMFS is not responding"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
Loading

0 comments on commit 7d3a434

Please sign in to comment.