Skip to content

Commit

Permalink
Merge pull request #242 from HSF/flin
Browse files Browse the repository at this point in the history
Token auth between pilot-panda
  • Loading branch information
mightqxc authored Sep 10, 2024
2 parents f85d3ed + f82ab1a commit 24e0a6a
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "15-08-2024 15:48:43 on flin (by mightqxc)"
timestamp = "10-09-2024 09:15:39 on flin (by mightqxc)"
2 changes: 1 addition & 1 deletion pandaharvester/harvesterbody/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False
tmpQueLog.warning(
f"last check period of workerID={workSpec.workerID} is {last_check_period} sec, longer than monitor checkInterval"
)
# prepartion to enqueue fifo
# preparation to enqueue fifo
if (from_fifo) or (
not from_fifo
and timeNow_timestamp - harvester_config.monitor.sleepTime > self.startTimestamp
Expand Down
22 changes: 17 additions & 5 deletions pandaharvester/harvestercore/db_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4883,15 +4883,27 @@ def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]:
if nQueueLimitWorkerCoresRatio is not None:
n_queue_limit_cores_by_ratio = int(worker_stats_map["running"]["core"] * nQueueLimitWorkerCoresRatio / 100)
if nQueueLimitWorkerCoresMin is not None and n_queue_limit_cores_by_ratio < nQueueLimitWorkerCoresMin:
n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, nQueueLimitWorkerCoresMin)
if n_queue_limit_worker_cores_eval is not None:
n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, nQueueLimitWorkerCoresMin)
else:
n_queue_limit_worker_cores_eval = nQueueLimitWorkerCoresMin
else:
n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_cores_by_ratio)
if n_queue_limit_worker_cores_eval is not None:
n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_cores_by_ratio)
else:
n_queue_limit_worker_cores_eval = n_queue_limit_cores_by_ratio
if nQueueLimitWorkerMemoryRatio is not None:
n_queue_limit_mem_by_ratio = int(worker_stats_map["running"]["mem"] * nQueueLimitWorkerMemoryRatio / 100)
if nQueueLimitWorkerMemoryMin is not None and n_queue_limit_mem_by_ratio < nQueueLimitWorkerMemoryMin:
n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, nQueueLimitWorkerMemoryMin)
if n_queue_limit_worker_mem_eval is not None:
n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, nQueueLimitWorkerMemoryMin)
else:
n_queue_limit_worker_mem_eval = nQueueLimitWorkerMemoryMin
else:
n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_mem_by_ratio)
if n_queue_limit_worker_mem_eval is not None:
n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_mem_by_ratio)
else:
n_queue_limit_worker_mem_eval = n_queue_limit_mem_by_ratio
# update map
worker_limits_dict.update(
{
Expand All @@ -4912,7 +4924,7 @@ def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]:
# dump error
core_utils.dump_error_message(_logger)
# return
return {}
return {}, {}

# get worker CE stats
def get_worker_ce_stats(self, site_name):
Expand Down
23 changes: 21 additions & 2 deletions pandaharvester/harvestercredmanager/iam_token_cred_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import traceback

from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
from pandaharvester.harvestermisc.token_utils import (
Expand All @@ -18,7 +19,7 @@
_logger = core_utils.setup_logger("iam_token_cred_manager")

# allowed target types
ALL_TARGET_TYPES = ["common", "ce"]
ALL_TARGET_TYPES = ["common", "ce", "panda"]

# default port for CEs
default_port_map = {
Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(self, **kwarg):
self.client_secret = client_cred_dict["client_secret"]
self.target_type = self.setupMap["target_type"]
self.out_dir = self.setupMap["out_dir"]
self.panda_token_filename = self.setupMap.get("panda_token_filename", "panda_token")
self.lifetime = self.setupMap.get("lifetime", 14 * 24 * 60 * 60)
self.target_list = self.setupMap.get("target_list")
self.target_list_file = self.setupMap.get("target_list_file")
Expand Down Expand Up @@ -123,6 +125,20 @@ def _handle_target_types(self):
self.targets_dict[target] = {}
# scope
self.scope = ""
elif self.target_type == "panda":
# panda server
panda_server_target = None
try:
panda_server_url = harvester_config.pandacon.pandaURLSSL
panda_server_target_match = re.match(r"https://[^:/]+", panda_server_url)
if panda_server_target_match:
panda_server_target = panda_server_target_match[0]
except AttributeError:
pass
self.target_list = [panda_server_target]
for target in self.target_list:
self.targets_dict[target] = {}
self.scope = ""
elif self.target_type == "ce":
try:
# retrieve CEs from CRIC
Expand Down Expand Up @@ -200,7 +216,10 @@ def renew_credential(self):
for target in self.targets_dict:
try:
# write to file
token_filename = endpoint_to_filename(target)
if self.target_type == "panda":
token_filename = self.panda_token_filename
else:
token_filename = endpoint_to_filename(target)
token_path = os.path.join(self.out_dir, token_filename)
# check token freshness
if self._is_fresh(token_path):
Expand Down
11 changes: 5 additions & 6 deletions pandaharvester/harvestermisc/htcondor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,15 @@ def __init__(self, cacheEnable=False, cacheRefreshInterval=None, useCondorHistor
self.useCondorHistory = useCondorHistory
tmpLog.debug("Initialize done")

def get_all(self, batchIDs_list=[], allJobs=False):
def get_all(self, batchIDs_list=[], allJobs=False, to_update_cache=False):
# Make logger
tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobQuery.get_all")
# Get all
tmpLog.debug("Start")
job_ads_all_dict = {}
if self.condor_api == "python":
try:
job_ads_all_dict = self.query_with_python(batchIDs_list, allJobs)
job_ads_all_dict = self.query_with_python(batchIDs_list, allJobs, to_update_cache)
except Exception as e:
tmpLog.error(f"Exception {e.__class__.__name__}: {e}")
raise
Expand Down Expand Up @@ -472,7 +472,7 @@ def _getAttribute_tuple(attribute_xml_element):
return job_ads_all_dict

@CondorClient.renew_session_and_retry
def query_with_python(self, batchIDs_list=[], allJobs=False):
def query_with_python(self, batchIDs_list=[], allJobs=False, to_update_cache=False):
# Make logger
tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobQuery.query_with_python")
# Start query
Expand Down Expand Up @@ -516,7 +516,6 @@ def update_cache(lockInterval=90):
return None

# remove invalid or outdated caches from fifo

def cleanup_cache(timeout=60):
tmpLog.debug("cleanup_cache")
id_list = list()
Expand Down Expand Up @@ -567,7 +566,7 @@ def cleanup_cache(timeout=60):
tmpLog.debug("got lock expired. Clean up and retry...")
cleanup_cache()
continue
elif time.time() <= peeked_tuple.score + self.cacheRefreshInterval:
elif not to_update_cache and time.time() <= peeked_tuple.score + self.cacheRefreshInterval:
# got valid cache
_obj, _last_update = self.cache
if _last_update >= peeked_tuple.score:
Expand All @@ -590,7 +589,7 @@ def cleanup_cache(timeout=60):
time.sleep(random.uniform(1, 5))
continue
else:
# cache expired
# cache expired or force to_update_cache
tmpLog.debug("update cache in fifo")
retVal = update_cache()
if retVal is not None:
Expand Down
4 changes: 3 additions & 1 deletion pandaharvester/harvestermonitor/htcondor_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def report_updated_workers(self, time_window):
job_query = CondorJobQuery(
cacheEnable=self.cacheEnable, cacheRefreshInterval=self.cacheRefreshInterval, useCondorHistory=self.useCondorHistory, id=submissionHost
)
job_ads_all_dict.update(job_query.get_all(allJobs=True))
job_ads_all_dict.update(job_query.get_all(allJobs=True, to_update_cache=True))
tmpLog.debug(f"got information of condor jobs on {submissionHost}")
except Exception as e:
ret_err_str = f"Exception {e.__class__.__name__}: {e}"
Expand All @@ -326,11 +326,13 @@ def report_updated_workers(self, time_window):
if not (job_EnteredCurrentStatus > timeNow - time_window):
continue
workerid = job_ads.get("harvesterWorkerID")
batch_status = job_ads.get("JobStatus")
if workerid is None:
continue
else:
workerid = int(workerid)
workers_to_check_list.append((workerid, job_EnteredCurrentStatus))
tmpLog.debug(f"workerID={workerid} got batchStatus={batch_status} at ts={job_EnteredCurrentStatus}")
tmpLog.debug(f"got {len(workers_to_check_list)} workers")
tmpLog.debug("done")
return workers_to_check_list
27 changes: 26 additions & 1 deletion pandaharvester/harvestersubmitter/htcondor_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def make_a_jdl(
python_version="unknown",
prod_rc_permille=0,
token_dir=None,
panda_token_filename=None,
panda_token_dir=None,
panda_token_key_path=None,
is_gpu_resource=False,
n_core_factor=1,
custom_submit_attr_dict=None,
Expand Down Expand Up @@ -248,6 +251,17 @@ def make_a_jdl(
token_path = os.path.join(token_dir, token_filename)
else:
tmpLog.warning(f"token_path is None: site={panda_queue_name}, token_dir={token_dir} , token_filename={token_filename}")
# get pilot-pandaserver token
panda_token_path = None
if panda_token_dir is not None and panda_token_filename is not None:
panda_token_path = os.path.join(panda_token_dir, panda_token_filename)
else:
# tmpLog.warning(f"panda_token_path is None: panda_token_dir={panda_token_dir} , panda_token_filename={panda_token_filename}")
pass
# get panda token key
panda_token_key_filename = None
if panda_token_key_path is not None:
panda_token_key_filename = os.path.basename(panda_token_key_path)
# custom submit attributes (+key1 = value1 ; +key2 = value2 in JDL)
custom_submit_attr_str_list = []
for attr_key, attr_value in custom_submit_attr_dict.items():
Expand Down Expand Up @@ -313,6 +327,10 @@ def make_a_jdl(
"tokenDir": token_dir,
"tokenFilename": token_filename,
"tokenPath": token_path,
"pandaTokenFilename": panda_token_filename,
"pandaTokenPath": panda_token_path,
"pandaTokenKeyFilename": panda_token_key_filename,
"pandaTokenKeyPath": panda_token_key_path,
"pilotJobLabel": submitter_common.get_joblabel(prod_source_label, is_unified_dispatch),
"pilotJobType": submitter_common.get_pilot_job_type(workspec.jobType, is_unified_dispatch),
"requestGpus": 1 if is_gpu_resource else 0,
Expand Down Expand Up @@ -430,6 +448,10 @@ def __init__(self, **kwarg):
self.tokenDirAnalysis
except AttributeError:
self.tokenDirAnalysis = None
# pilot-pandaserver token
self.pandaTokenFilename = getattr(self, "pandaTokenFilename", None)
self.pandaTokenDir = getattr(self, "pandaTokenDir", None)
self.pandaTokenKeyPath = getattr(self, "pandaTokenKeyPath", None)
# CRIC
try:
self.useCRIC = bool(self.useCRIC)
Expand Down Expand Up @@ -692,7 +714,7 @@ def submit_workers(self, workspec_list):
if ce_info_dict["ce_grid_type"] == "arc":
default_port = None
if ce_info_dict["ce_hostname"] == ce_endpoint_from_queue:
# defaut port
# default port
default_port = 443
else:
# change port 2811 to 443
Expand Down Expand Up @@ -908,6 +930,9 @@ def _choose_credential(workspec):
"pilot_version": pilot_version,
"python_version": python_version,
"token_dir": token_dir,
"panda_token_filename": self.pandaTokenFilename,
"panda_token_dir": self.pandaTokenDir,
"panda_token_key_path": self.pandaTokenKeyPath,
"is_unified_dispatch": is_unified_dispatch,
"prod_rc_permille": self.rcPilotRandomWeightPermille,
"is_gpu_resource": is_gpu_resource,
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/panda_pkg_info.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.5.10"
release_version = "0.5.11"

0 comments on commit 24e0a6a

Please sign in to comment.