Skip to content

Commit

Permalink
Merge pull request #91 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.6.3.8
  • Loading branch information
PalNilsson authored Jul 12, 2023
2 parents f026dad + 92b2805 commit 6dac888
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 105 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.2.9
3.6.3.8
1 change: 1 addition & 0 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def main() -> int:

# set the site name for rucio
environ['PILOT_RUCIO_SITENAME'] = os.environ.get('PILOT_RUCIO_SITENAME', '') or infosys.queuedata.site
logger.debug(f'PILOT_RUCIO_SITENAME={os.environ.get("PILOT_RUCIO_SITENAME")}')

# store the site name as set with a pilot option
environ['PILOT_SITENAME'] = infosys.queuedata.resource #args.site # TODO: replace with singleton
Expand Down
65 changes: 19 additions & 46 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ def list_replicas(self, xfiles, use_vp):
# load replicas from Rucio
from rucio.client import Client
rucio_client = Client()
location = self.detect_client_location(use_vp=use_vp)
if not location:
raise PilotException("Failed to get client location for Rucio", code=ErrorCodes.RUCIOLOCATIONFAILED)
location, diagnostics = self.detect_client_location(use_vp=use_vp)
if diagnostics:
self.logger.warning(f'failed to get client location for rucio: {diagnostics}')
#raise PilotException(f"failed to get client location for rucio: {diagnostics}", code=ErrorCodes.RUCIOLOCATIONFAILED)

query = {
'schemes': ['srm', 'root', 'davs', 'gsiftp', 'https', 'storm', 'file'],
Expand Down Expand Up @@ -381,13 +382,13 @@ def add_replicas(self, fdat, replica):

return fdat

@classmethod
def detect_client_location(self, use_vp: bool = False) -> dict:
"""
Open a UDP socket to a machine on the internet, to get the local IPv4 and IPv6
addresses of the requesting client.
"""

diagnostics = ''
client_location = {}

ip = '0.0.0.0'
Expand All @@ -397,8 +398,11 @@ def detect_client_location(self, use_vp: bool = False) -> dict:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception as exc:
self.logger.warning(f'failed to get socket info: {exc}')
diagnostics = f'failed to get socket info: {exc}'
self.logger.warning(diagnostics)
client_location['ip'] = ip
site = os.environ.get('PILOT_RUCIO_SITENAME', 'unknown')
client_location['site'] = site

if use_vp:
latitude = os.environ.get('RUCIO_LATITUDE')
Expand All @@ -408,54 +412,23 @@ def detect_client_location(self, use_vp: bool = False) -> dict:
client_location['latitude'] = float(latitude)
client_location['longitude'] = float(longitude)
except ValueError:
self.logger.warning(f'client set latitude (\"{latitude}\") and longitude (\"{longitude}\") are not valid')
diagnostics = f'client set latitude (\"{latitude}\") and longitude (\"{longitude}\") are not valid'
self.logger.warning(diagnostics)
else:
try:
response = requests.post('https://location.cern.workers.dev',
json={"site": client_location.get('site')},
timeout=1)
json={"site": site},
timeout=10)
if response.status_code == 200 and 'application/json' in response.headers.get('Content-Type', ''):
client_location = response.json()
# put back the site
client_location['site'] = site
except Exception as exc:
self.logger.warning(f'no requests module: {exc}')

client_location['site'] = os.environ.get('PILOT_RUCIO_SITENAME', 'unknown')
return client_location

@classmethod
def detect_client_location_old(self):
"""
Open a UDP socket to a machine on the internet, to get the local IPv4 and IPv6
addresses of the requesting client.
"""
diagnostics = f'requests.post failed: {exc}'
self.logger.warning(diagnostics)

ip = '0.0.0.0'
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception:
pass

ip6 = '::'
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.connect(("2001:4860:4860:0:0:0:0:8888", 80))
ip6 = s.getsockname()[0]
except Exception:
pass

site = os.environ.get('PILOT_RUCIO_SITENAME', 'unknown')
# site = os.environ.get('SITE_NAME',
# os.environ.get('ATLAS_SITE_NAME',
# os.environ.get('OSG_SITE_NAME',
# 'ROAMING')))

return {'ip': ip,
'ip6': ip6,
'fqdn': socket.getfqdn(),
'site': site}
self.logger.debug(f'will use client_location={client_location}')
return client_location, diagnostics

def transfer_files(self, copytool, files, **kwargs):
"""
Expand Down
18 changes: 0 additions & 18 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,24 +761,6 @@ def get_input_file_dictionary(indata):
return ret


def filter_files_for_log(directory):
"""
Create a file list recursi
:param directory:
:return:
"""
filtered_files = []
maxfilesize = 10
for root, _, filenames in os.walk(directory):
for filename in filenames:
location = os.path.join(root, filename)
if os.path.exists(location): # do not include broken links
if os.path.getsize(location) < maxfilesize:
filtered_files.append(location)

return filtered_files


def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], output_files=[], piloterrors=[], debugmode=False):
"""
Create the tarball for the job.
Expand Down
21 changes: 11 additions & 10 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2814,16 +2814,17 @@ def job_monitor(queues, traces, args): # noqa: C901
# sleep for a while if stage-in has not completed
time.sleep(1)
continue

if args.workflow == 'stager':
if args.pod:
# wait maximum args.leasetime, then abort
time.sleep(10)
time_now = int(time.time())
if time_now - start_time >= args.leasetime:
logger.warning(f'lease time is up: {time_now - start_time} s has passed since start - abort stager pilot')
else:
continue
elif not queues.finished_data_in.empty():
# stage-in has finished, or there were no input files to begin with, job object ends up in finished_data_in queue
if args.workflow == 'stager':
if args.pod:
# wait maximum args.leasetime seconds, then abort
time.sleep(10)
time_now = int(time.time())
if time_now - start_time >= args.leasetime:
logger.warning(f'lease time is up: {time_now - start_time} s has passed since start - abort stager pilot')
else:
continue
else:
logger.debug('stage-in has finished - no need for job_monitor to continue')
break
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '9' # build number should be reset to '1' for every new development cycle
REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '8' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
52 changes: 24 additions & 28 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,40 +652,36 @@ def check_work_dir(job):
if os.path.exists(job.workdir):
# get the limit of the workdir
maxwdirsize = get_max_allowed_work_dir_size()
workdirsize = get_disk_usage(job.workdir)

if os.path.exists(job.workdir):
workdirsize = get_disk_usage(job.workdir)
# is user dir within allowed size limit?
if workdirsize > maxwdirsize:
exit_code = errors.USERDIRTOOLARGE
diagnostics = f'work directory ({job.workdir}) is too large: {workdirsize} B (must be < {maxwdirsize} B)'
logger.fatal(diagnostics)

# is user dir within allowed size limit?
if workdirsize > maxwdirsize:
exit_code = errors.USERDIRTOOLARGE
diagnostics = f'work directory ({job.workdir}) is too large: {workdirsize} B (must be < {maxwdirsize} B)'
logger.fatal(diagnostics)
cmd = 'ls -altrR %s' % job.workdir
_ec, stdout, stderr = execute(cmd, mute=True)
logger.info(f'{cmd}:\n{stdout}')

cmd = 'ls -altrR %s' % job.workdir
_ec, stdout, stderr = execute(cmd, mute=True)
logger.info(f'{cmd}:\n{stdout}')
# kill the job
set_pilot_state(job=job, state="failed")
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
kill_processes(job.pid)

# kill the job
set_pilot_state(job=job, state="failed")
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
kill_processes(job.pid)

# remove any lingering input files from the work dir
lfns, guids = job.get_lfns_and_guids()
if lfns:
remove_files(lfns, workdir=job.workdir)

# remeasure the size of the workdir at this point since the value is stored below
workdirsize = get_disk_usage(job.workdir)
else:
logger.info(f'size of work directory {job.workdir}: {workdirsize} B (within {maxwdirsize} B limit)')
# remove any lingering input files from the work dir
lfns, guids = job.get_lfns_and_guids()
if lfns:
remove_files(lfns, workdir=job.workdir)

# Store the measured disk space (the max value will later be sent with the job metrics)
if workdirsize > 0:
job.add_workdir_size(workdirsize)
# re-measure the size of the workdir at this point since the value is stored below
workdirsize = get_disk_usage(job.workdir)
else:
logger.warning(f'job work dir does not exist: {job.workdir}')
logger.info(f'size of work directory {job.workdir}: {workdirsize} B (within {maxwdirsize} B limit)')

# Store the measured disk space (the max value will later be sent with the job metrics)
if workdirsize > 0:
job.add_workdir_size(workdirsize)
else:
logger.warning('skipping size check of workdir since it has not been created yet')

Expand Down

0 comments on commit 6dac888

Please sign in to comment.