Skip to content

Commit

Permalink
Merge pull request #144 from HSF/dev
Browse files Browse the repository at this point in the history
improve eventbus and add coordinator
  • Loading branch information
wguanicedew authored Apr 18, 2023
2 parents 3c026f4 + 2028653 commit 82a0976
Show file tree
Hide file tree
Showing 64 changed files with 4,120 additions and 693 deletions.
29 changes: 17 additions & 12 deletions atlas/lib/idds/atlas/workflowv2/atlaspandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2020 - 2022
# - Wen Guan, <[email protected]>, 2020 - 2023


try:
Expand Down Expand Up @@ -47,6 +47,7 @@ def __init__(self, task_parameters=None,
# maxwalltime=90000, maxattempt=5, core_count=1,
# encode_command_line=False,
num_retries=5,
use_rucio=False,
# task_log=None,
# task_cloud=None,
# task_rss=0
Expand Down Expand Up @@ -84,6 +85,7 @@ def __init__(self, task_parameters=None,
self.retry_number = 0
self.num_retries = num_retries

self.use_rucio = use_rucio
self.load_panda_urls()

def my_condition(self):
Expand Down Expand Up @@ -155,6 +157,8 @@ def set_agent_attributes(self, attrs, req_attributes=None):
super(ATLASPandaWork, self).set_agent_attributes(attrs)
if self.agent_attributes and 'num_retries' in self.agent_attributes and self.agent_attributes['num_retries']:
self.num_retries = int(self.agent_attributes['num_retries'])
if self.class_name in attrs and 'use_rucio' in attrs[self.class_name]:
self.use_rucio = attrs[self.class_name]['use_rucio']

def parse_task_parameters(self, task_parameters):
if self.task_parameters:
Expand Down Expand Up @@ -307,17 +311,18 @@ def poll_external_collection(self, coll):
else:
try:
if not coll.coll_type == CollectionType.PseudoDataset:
client = self.get_rucio_client()
did_meta = client.get_metadata(scope=coll.scope, name=coll.name)

coll.coll_metadata['bytes'] = did_meta['bytes']
coll.coll_metadata['total_files'] = did_meta['length']
coll.coll_metadata['availability'] = did_meta['availability']
coll.coll_metadata['events'] = did_meta['events']
coll.coll_metadata['is_open'] = did_meta['is_open']
coll.coll_metadata['run_number'] = did_meta['run_number']
coll.coll_metadata['did_type'] = did_meta['did_type']
coll.coll_metadata['list_all_files'] = False
if self.use_rucio:
client = self.get_rucio_client()
did_meta = client.get_metadata(scope=coll.scope, name=coll.name)

coll.coll_metadata['bytes'] = did_meta['bytes']
coll.coll_metadata['total_files'] = did_meta['length']
coll.coll_metadata['availability'] = did_meta['availability']
coll.coll_metadata['events'] = did_meta['events']
coll.coll_metadata['is_open'] = did_meta['is_open']
coll.coll_metadata['run_number'] = did_meta['run_number']
coll.coll_metadata['did_type'] = did_meta['did_type']
coll.coll_metadata['list_all_files'] = False

if 'is_open' in coll.coll_metadata and not coll.coll_metadata['is_open']:
coll_status = CollectionStatus.Closed
Expand Down
7 changes: 7 additions & 0 deletions common/lib/idds/common/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ def get_user_name_from_dn1(dn):
username = up.sub('', dn)
up2 = re.compile('/CN=[0-9]+')
username = up2.sub('', username)
up2 = re.compile('/CN=[0-9]+')
username = up2.sub('', username)
up3 = re.compile(' [0-9]+')
username = up3.sub('', username)
up4 = re.compile('_[0-9]+')
Expand Down Expand Up @@ -421,15 +423,20 @@ def get_user_name_from_dn2(dn):
username = up.sub('', dn)
up2 = re.compile(',CN=[0-9]+')
username = up2.sub('', username)
up2 = re.compile('CN=[0-9]+,')
username = up2.sub(',', username)
up3 = re.compile(' [0-9]+')
username = up3.sub('', username)
up4 = re.compile('_[0-9]+')
username = up4.sub('', username)
username = username.replace(',CN=proxy', '')
username = username.replace(',CN=limited proxy', '')
username = username.replace('limited proxy', '')
username = re.sub(',CN=Robot:[^/]+,', ',', username)
username = re.sub(',CN=Robot:[^/]+', '', username)
username = re.sub(',CN=Robot[^/]+,', ',', username)
username = re.sub(',CN=Robot[^/]+', '', username)
username = re.sub(',CN=nickname:[^/]+,', ',', username)
username = re.sub(',CN=nickname:[^/]+', '', username)
pat = re.compile('.*,CN=([^\,]+),CN=([^\,]+)') # noqa W605
mat = pat.match(username)
Expand Down
23 changes: 22 additions & 1 deletion common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2022
# - Wen Guan, <[email protected]>, 2019 - 2023

"""
Constants.
Expand All @@ -32,6 +32,8 @@ class Sections:
Consumer = 'consumer'
EventBus = 'eventbus'
Cache = 'cache'
Archiver = 'archiver'
Coordinator = 'coordinator'


class HTTP_STATUS_CODE:
Expand Down Expand Up @@ -288,13 +290,20 @@ class ContentStatus(IDDSEnum):
Mapped = 7
FakeAvailable = 8
Missing = 9
Cancelled = 10


class ContentLocking(IDDSEnum):
Idle = 0
Locking = 1


class ContentFetchStatus(IDDSEnum):
New = 0
Fetching = 1
Fetched = 2


class GranularityType(IDDSEnum):
File = 0
Event = 1
Expand Down Expand Up @@ -338,6 +347,12 @@ class ProcessingLocking(IDDSEnum):
Locking = 1


class HealthStatus(IDDSEnum):
Default = 0
InActive = 1
Active = 2


class MessageType(IDDSEnum):
StageInFile = 0
StageInCollection = 1
Expand Down Expand Up @@ -469,6 +484,12 @@ class CommandLocation(IDDSEnum):
Other = 6


class ReturnCode(IDDSEnum):
Ok = 0
Failed = 255
Locked = 1


def get_work_status_from_transform_processing_status(status):
if status in [ProcessingStatus.New, TransformStatus.New]:
return WorkStatus.New
Expand Down
Loading

0 comments on commit 82a0976

Please sign in to comment.