Skip to content

Commit

Permalink
Merge pull request #23 from HSF/prodanaly
Browse files Browse the repository at this point in the history
Changes for grand unification
  • Loading branch information
fbarreir authored Dec 9, 2019
2 parents 6556770 + a6a9759 commit 8e1502a
Show file tree
Hide file tree
Showing 23 changed files with 908 additions and 667 deletions.
Empty file added __init__.py
Empty file.
File renamed without changes.
101 changes: 101 additions & 0 deletions examples/k8s/k8s_cvmfs_1.15.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-atlas
provisioner: cvmfs.csi.cern.ch
parameters:
repository: atlas.cern.ch
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-sft
provisioner: cvmfs.csi.cern.ch
parameters:
repository: sft.cern.ch
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-grid
provisioner: cvmfs.csi.cern.ch
parameters:
repository: grid.cern.ch
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-atlas-condb
provisioner: cvmfs.csi.cern.ch
parameters:
repository: atlas-condb.cern.ch
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-atlas-nightlies
provisioner: cvmfs.csi.cern.ch
parameters:
repository: atlas-nightlies.cern.ch
---
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-atlas-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-atlas
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-sft-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-sft
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-grid-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-grid

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-atlas-condb-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-atlas-condb
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-atlas-nightlies-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-atlas-nightlies
701 changes: 353 additions & 348 deletions pandaharvester/harvesterbody/submitter.py

Large diffs are not rendered by default.

363 changes: 189 additions & 174 deletions pandaharvester/harvesterbody/worker_adjuster.py

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions pandaharvester/harvesterbody/worker_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def get_plugin(self, queue_config):
return self.pluginFactory.get_plugin(queue_config.workerMaker)

# make workers
def make_workers(self, jobchunk_list, queue_config, n_ready, resource_type, maker=None):
tmpLog = core_utils.make_logger(_logger, 'queue={0} rtype={1}'.format(queue_config.queueName, resource_type),
def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_type, maker=None):
tmpLog = core_utils.make_logger(_logger, 'queue={0} jtype={1} rtype={2}'.format(queue_config.queueName, job_type, resource_type),
method_name='make_workers')
tmpLog.debug('start')
try:
Expand All @@ -38,7 +38,7 @@ def make_workers(self, jobchunk_list, queue_config, n_ready, resource_type, make
for iChunk, jobChunk in enumerate(jobchunk_list):
# make a worker
if iChunk >= n_ready:
workSpec = maker.make_worker(jobChunk, queue_config, resource_type)
workSpec = maker.make_worker(jobChunk, queue_config, job_type, resource_type)
else:
# use ready worker
if iChunk < len(readyWorkers):
Expand All @@ -65,35 +65,35 @@ def make_workers(self, jobchunk_list, queue_config, n_ready, resource_type, make
return [], jobchunk_list

# get number of jobs per worker
def get_num_jobs_per_worker(self, queue_config, n_workers, resource_type, maker=None):
def get_num_jobs_per_worker(self, queue_config, n_workers, job_type, resource_type, maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
return maker.get_num_jobs_per_worker(n_workers)

# get number of workers per job
def get_num_workers_per_job(self, queue_config, n_workers, resource_type, maker=None):
def get_num_workers_per_job(self, queue_config, n_workers, job_type, resource_type, maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
return maker.get_num_workers_per_job(n_workers)

# check number of ready resources
def num_ready_resources(self, queue_config, resource_type, maker=None):
def num_ready_resources(self, queue_config, job_type, resource_type, maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
return maker.num_ready_resources()

# get upper limit on the cumulative total of workers per job
def get_max_workers_per_job_in_total(self, queue_config, resource_type, maker=None):
def get_max_workers_per_job_in_total(self, queue_config, job_type, resource_type, maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
return maker.get_max_workers_per_job_in_total()

# get upper limit on the number of new workers per job in a cycle
def get_max_workers_per_job_per_cycle(self, queue_config, resource_type, maker=None):
def get_max_workers_per_job_per_cycle(self, queue_config, job_type, resource_type, maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/harvestercommunicator/panda_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ def update_worker_stats(self, site_name, stats):
data['siteName'] = site_name
data['paramsList'] = json.dumps(stats)
tmpLog.debug('update stats for {0}, stats: {1}'.format(site_name, stats))
tmpStat, tmpRes = self.post_ssl('reportWorkerStats', data)
tmpStat, tmpRes = self.post_ssl('reportWorkerStats_jobtype', data)
errStr = 'OK'
if tmpStat is False:
errStr = core_utils.dump_error_message(tmpLog, tmpRes)
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/harvestercore/command_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class CommandSpec(SpecBase):
)
# commands
COM_reportWorkerStats = 'REPORT_WORKER_STATS'
COM_setNWorkers = 'SET_N_WORKERS'
COM_setNWorkers = 'SET_N_WORKERS_JOBTYPE'
COM_killWorkers = 'KILL_WORKERS'
# mapping between command and receiver
receiver_map = {
Expand Down
12 changes: 6 additions & 6 deletions pandaharvester/harvestercore/core_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,8 @@ def get_queues_config_url():


# get unique queue name
def get_unique_queue_name(queue_name, resource_type):
return '{0}:{1}'.format(queue_name, resource_type)
def get_unique_queue_name(queue_name, resource_type, job_type):
return '{0}:{1}:{2}'.format(queue_name, resource_type, job_type)


# capability to dynamically change plugins
Expand All @@ -613,19 +613,19 @@ def _asdict(self):
return dict(zip(self.attributes, self))


# Make a list of choice candidates accroding to permille weight
# Make a list of choice candidates according to permille weight
def make_choice_list(pdpm={}, default=None):
weight_sum = sum(pdpm.values())
weight_defualt = 1000
weight_default = 1000
ret_list = []
for candidate, weight in iteritems(pdpm):
if weight_sum > 1000:
real_weight = int(weight * 1000 / weight_sum)
else:
real_weight = int(weight)
ret_list.extend([candidate]*real_weight)
weight_defualt -= real_weight
ret_list.extend([default]*weight_defualt)
weight_default -= real_weight
ret_list.extend([default]*weight_default)
return ret_list


Expand Down
Loading

0 comments on commit 8e1502a

Please sign in to comment.