Skip to content

Commit

Permalink
Merge pull request #67 from wguanicedew/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
tmaeno authored Mar 9, 2022
2 parents e605b8b + 91d175f commit 012a503
Show file tree
Hide file tree
Showing 33 changed files with 595 additions and 62 deletions.
2 changes: 1 addition & 1 deletion atlas/lib/idds/atlas/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.10.2"
release_version = "0.10.3"
4 changes: 2 additions & 2 deletions atlas/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ dependencies:
- nose # nose test tools
- rucio-clients
- rucio-clients-atlas
- idds-common==0.10.2
- idds-workflow==0.10.2
- idds-common==0.10.3
- idds-workflow==0.10.3
11 changes: 11 additions & 0 deletions client/bin/idds
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ def abort_requests(args):
wm.abort(request_id=args.request_id, workload_id=args.workload_id)


def abort_tasks(args):
wm = ClientManager(host=args.host, setup_client=True)
wm.abort_tasks(request_id=args.request_id, workload_id=args.workload_id, task_id=args.task_id)

def suspend_requests(args):
wm = ClientManager(host=args.host, setup_client=True)
wm.suspend(request_id=args.request_id, workload_id=args.workload_id)
Expand Down Expand Up @@ -200,6 +204,13 @@ def get_parser():
abort_parser.add_argument('--request_id', dest='request_id', action='store', type=int, help='The request id')
abort_parser.add_argument('--workload_id', dest='workload_id', action='store', type=int, help='The workload id')

# abort tasks
abort_tasks_parser = subparsers.add_parser('abort_tasks', help='Abort tasks')
abort_tasks_parser.set_defaults(function=abort_tasks)
abort_tasks_parser.add_argument('--request_id', dest='request_id', action='store', type=int, help='The request id')
abort_tasks_parser.add_argument('--workload_id', dest='workload_id', action='store', type=int, help='The workload id')
abort_tasks_parser.add_argument('--task_id', dest='task_id', action='store', type=int, help='The task id')

# suspend requests
suspend_parser = subparsers.add_parser('suspend_requests', help='Suspend requests')
suspend_parser.set_defaults(function=suspend_requests)
Expand Down
37 changes: 36 additions & 1 deletion client/lib/idds/client/clientmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from idds.client.client import Client
from idds.common import exceptions
from idds.common.config import get_local_cfg_file, get_local_config_root, get_local_config_value
from idds.common.constants import RequestType, RequestStatus
from idds.common.constants import RequestType, RequestStatus, ProcessingStatus
# from idds.common.utils import get_rest_host, exception_handler
from idds.common.utils import exception_handler

Expand Down Expand Up @@ -87,6 +87,8 @@ def setup_client(self, auth_setup=False):
client_host = self.auth_type_host
else:
client_host = self.host
if self.auth_type is None:
self.auth_type = 'x509_proxy'
self.client = Client(host=client_host,
auth={'auth_type': self.auth_type,
'client_proxy': self.x509_proxy,
Expand Down Expand Up @@ -361,6 +363,39 @@ def abort(self, request_id=None, workload_id=None):
else:
return (-1, 'No matching requests')

@exception_handler
def abort_tasks(self, request_id=None, workload_id=None, task_id=None):
"""
Abort tasks.
:param workload_id: the workload id.
:param request_id: the request.
:param task_id: The task id.
"""
self.setup_client()

if request_id is None and workload_id is None:
logging.error("Both request_id and workload_id are None. One of them should not be None")
return (-1, "Both request_id and workload_id are None. One of them should not be None")
if task_id is None:
logging.error("The task_id is required for killing tasks. If you want to kill the whole workflow, please try another API.")
return (-1, "The task_id is required for killing tasks")

reqs = self.client.get_requests(request_id=request_id, workload_id=workload_id, with_processing=True)
if reqs:
rets = []
for req in reqs:
if str(req['processing_workload_id']) == str(task_id):
logging.info("Aborting task: (request_id: %s, task_id: %s)" % (req['request_id'], task_id))
self.client.send_message(request_id=req['request_id'], msg={'command': 'update_processing',
'parameters': [{'status': ProcessingStatus.ToCancel, 'workload_id': task_id}]})
logging.info("Abort task registered successfully: (request_id %s, task_id: %s)" % (req['request_id'], task_id))
ret = (0, "Abort task registered successfully: (request_id %s, task_id: %s)" % (req['request_id'], task_id))
rets.append(ret)
return rets
else:
return (-1, 'No matching requests')

@exception_handler
def suspend(self, request_id=None, workload_id=None):
"""
Expand Down
4 changes: 2 additions & 2 deletions client/lib/idds/client/requestclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def update_request(self, request_id, parameters):
r = self.get_request_response(url, type='PUT', data=data)
return r

def get_requests(self, request_id=None, workload_id=None, with_detail=False, with_metadata=False):
def get_requests(self, request_id=None, workload_id=None, with_detail=False, with_metadata=False, with_transform=False, with_processing=False):
"""
Get request from the Head service.
Expand All @@ -94,7 +94,7 @@ def get_requests(self, request_id=None, workload_id=None, with_detail=False, wit
request_id = 'null'
if workload_id is None:
workload_id = 'null'
url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id), str(with_detail), str(with_metadata)))
url = self.build_url(self.host, path=os.path.join(path, str(request_id), str(workload_id), str(with_detail), str(with_metadata), str(with_transform), str(with_processing)))

requests = self.get_request_response(url, type='GET')

Expand Down
2 changes: 1 addition & 1 deletion client/lib/idds/client/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.10.2"
release_version = "0.10.3"
4 changes: 2 additions & 2 deletions client/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ dependencies:
- pytest # python testing tool
- nose # nose test tools
- tabulate
- idds-common==0.10.2
- idds-workflow==0.10.2
- idds-common==0.10.3
- idds-workflow==0.10.3
74 changes: 61 additions & 13 deletions common/lib/idds/common/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def verify_id_token(self, vo, token):
try:
allow_vos = self.get_allow_vos()
if vo not in allow_vos:
return False, "VO %s is not allowed." % vo
return False, "VO %s is not allowed." % vo, None

auth_config = self.get_auth_config(vo)
endpoint_config = self.get_endpoint_config(auth_config)
Expand All @@ -251,16 +251,20 @@ def verify_id_token(self, vo, token):
audience = decoded_token['aud']
if auth_config['client_id'] != audience:
# discovery_endpoint = auth_config['oidc_config_url']
return False, "The audience of the token doesn't match vo configuration."
return False, "The audience of the token doesn't match vo configuration.", None

public_key = self.get_public_key(token, endpoint_config['jwks_uri'])
# decode token only with RS256
decoded = jwt.decode(token, public_key, verify=True, algorithms='RS256',
audience=audience, issuer=endpoint_config['issuer'])
decoded['vo'] = vo
return True, decoded
if 'name' in decoded:
username = decoded['name']
else:
username = None
return True, decoded, username
except Exception as error:
return False, 'Failed to verify oidc token: ' + str(error)
return False, 'Failed to verify oidc token: ' + str(error), None


class OIDCAuthenticationUtils(object):
Expand Down Expand Up @@ -347,7 +351,8 @@ def get_allow_user_list(self):
return []


def get_user_name_from_dn(dn):
# "/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=wguan/CN=667815/CN=Wen Guan/CN=1883443395"
def get_user_name_from_dn1(dn):
try:
up = re.compile('/(DC|O|OU|C|L)=[^\/]+') # noqa W605
username = up.sub('', dn)
Expand Down Expand Up @@ -382,11 +387,53 @@ def get_user_name_from_dn(dn):
return dn


# 'CN=203633261,CN=Wen Guan,CN=667815,CN=wguan,OU=Users,OU=Organic Units,DC=cern,DC=ch'
def get_user_name_from_dn2(dn):
try:
up = re.compile(',(DC|O|OU|C|L)=[^\,]+') # noqa W605
username = up.sub('', dn)
up2 = re.compile(',CN=[0-9]+')
username = up2.sub('', username)
up3 = re.compile(' [0-9]+')
username = up3.sub('', username)
up4 = re.compile('_[0-9]+')
username = up4.sub('', username)
username = username.replace(',CN=proxy', '')
username = username.replace(',CN=limited proxy', '')
username = username.replace('limited proxy', '')
username = re.sub(',CN=Robot:[^/]+', '', username)
username = re.sub(',CN=nickname:[^/]+', '', username)
pat = re.compile('.*,CN=([^\,]+),CN=([^\,]+)') # noqa W605
mat = pat.match(username)
if mat:
username = mat.group(1)
else:
username = username.replace(',CN=', '')
if username.lower().find(',email') > 0:
username = username[:username.lower().find(',email')]
pat = re.compile('.*(limited.*proxy).*')
mat = pat.match(username)
if mat:
username = mat.group(1)
username = username.replace('(', '')
username = username.replace(')', '')
username = username.replace("'", '')
return username
except Exception:
return dn


def get_user_name_from_dn(dn):
dn = get_user_name_from_dn1(dn)
dn = get_user_name_from_dn2(dn)
return dn


def authenticate_x509(vo, dn, client_cert):
if not dn:
return False, "User DN cannot be found."
return False, "User DN cannot be found.", None
if not client_cert:
return False, "Client certificate proxy cannot be found."
return False, "Client certificate proxy cannot be found.", None

# certDecoded = x509.load_pem_x509_certificate(str.encode(client_cert), default_backend())
# print(certDecoded.issuer)
Expand All @@ -403,7 +450,7 @@ def authenticate_x509(vo, dn, client_cert):
break

if not matched:
return False, "User %s is not allowed" % str(dn)
return False, "User %s is not allowed" % str(dn), None

if matched:
# username = get_user_name_from_dn(dn)
Expand All @@ -412,14 +459,15 @@ def authenticate_x509(vo, dn, client_cert):
pat = re.compile(ban_user)
mat = pat.match(dn)
if mat:
return False, "User %s is banned" % str(dn)
return True, None
return False, "User %s is banned" % str(dn), None
username = get_user_name_from_dn(dn)
return True, None, username


def authenticate_oidc(vo, token):
oidc_auth = OIDCAuthentication()
status, data = oidc_auth.verify_id_token(vo, token)
status, data, username = oidc_auth.verify_id_token(vo, token)
if status:
return status, data
return status, data, username
else:
return status, data
return status, data, username
2 changes: 1 addition & 1 deletion common/lib/idds/common/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.10.2"
release_version = "0.10.3"
2 changes: 1 addition & 1 deletion doma/lib/idds/doma/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2020 - 2021


release_version = "0.10.2"
release_version = "0.10.3"
7 changes: 5 additions & 2 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2020 - 2021
# - Wen Guan, <[email protected]>, 2020 - 2022
# - Sergey Padolski, <[email protected]>, 2020


Expand Down Expand Up @@ -48,6 +48,7 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None,
num_retries=5,
task_log=None,
task_cloud=None,
task_site=None,
task_rss=1000):

super(DomaPanDAWork, self).__init__(executable=executable, arguments=arguments,
Expand Down Expand Up @@ -89,6 +90,7 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None,

self.encode_command_line = encode_command_line
self.task_cloud = task_cloud
self.task_site = task_site
self.task_rss = task_rss

self.retry_number = 0
Expand Down Expand Up @@ -402,7 +404,7 @@ def create_processing(self, input_output_maps=[]):
task_param_map['noInput'] = True
task_param_map['pfnList'] = in_files
task_param_map['taskName'] = self.task_name
task_param_map['userName'] = 'iDDS'
task_param_map['userName'] = self.username if self.username else 'iDDS'
task_param_map['taskPriority'] = 900
task_param_map['architecture'] = ''
task_param_map['transUses'] = ''
Expand All @@ -420,6 +422,7 @@ def create_processing(self, input_output_maps=[]):
task_param_map['coreCount'] = self.core_count
task_param_map['skipScout'] = True
task_param_map['cloud'] = self.task_cloud
task_param_map['PandaSite'] = self.task_site
if self.task_rss and self.task_rss > 0:
task_param_map['ramCount'] = self.task_rss
task_param_map['ramUnit'] = 'MB'
Expand Down
4 changes: 2 additions & 2 deletions doma/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ dependencies:
- pytest # python testing tool
- nose # nose test tools
- panda-client # panda client
- idds-common==0.10.2
- idds-workflow==0.10.2
- idds-common==0.10.3
- idds-workflow==0.10.3
2 changes: 2 additions & 0 deletions main/etc/idds/idds.cfg.template
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ atlaslocalpandawork.work_dir = /data/idds_processing

[conductor]
retrieve_bulk_size = 10
threshold_to_release_messages = 1000
random_delay = 60
plugin.notifier = idds.atlas.notifier.messaging.MessagingSender
# plugin.notifier.brokers = atlas-test-mb.cern.ch
plugin.notifier.brokers = atlas-mb.cern.ch
Expand Down
Loading

0 comments on commit 012a503

Please sign in to comment.