diff --git a/docker-compose.yml b/docker-compose.yml index a4aa65ac..d01d486b 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,8 @@ services: context: . args: ENVIRONMENT: local + stdin_open: true # docker run -i + tty: true # docker run -t command: /usr/local/src/pman/docker-test-entrypoint.sh ports: - "5010:5010" diff --git a/make.sh b/make.sh index 2f2316c0..a51bc323 100755 --- a/make.sh +++ b/make.sh @@ -170,6 +170,13 @@ if (( ! b_skipIntro )) ; then windowBottom fi +title -d 1 "Building :dev" + cd $HERE + CMD="docker compose -f docker-compose.yml build" + echo "$CMD" | ./boxes.sh + echo $CMD | sh | ./boxes.sh -c +windowBottom + title -d 1 "Changing permissions to 755 on" "$HERE" cd $HERE echo "chmod -R 755 $HERE" | ./boxes.sh @@ -206,8 +213,8 @@ windowBottom title -d 1 "Starting pman containerized dev environment on $ORCHESTRATOR" if [[ $ORCHESTRATOR == swarm ]]; then - echo "docker stack deploy -c swarm/docker-compose_dev.yml pman_dev_stack" | ./boxes.sh ${LightCyan} - docker stack deploy -c swarm/docker-compose_dev.yml pman_dev_stack + echo "docker stack deploy -c docker-compose.yml pman_dev_stack" | ./boxes.sh ${LightCyan} + docker stack deploy -c docker-compose.yml pman_dev_stack elif [[ $ORCHESTRATOR == kubernetes ]]; then echo "envsubst < kubernetes/pman_dev.yaml | kubectl apply -f -" | ./boxes.sh ${LightCyan} envsubst < kubernetes/pman_dev.yaml | kubectl apply -f - @@ -219,7 +226,7 @@ title -d 1 "Waiting until pman container is running on $ORCHESTRATOR" for i in {1..30}; do sleep 5 if [[ $ORCHESTRATOR == swarm ]]; then - pman_dev=$(docker ps -f label=org.chrisproject.role=pman -q | head -n 1) + pman_dev=$(docker ps -f label=org.opencontainers.image.title=pman -q | head -n 1) elif [[ $ORCHESTRATOR == kubernetes ]]; then pman_dev=$(kubectl get pods --selector="app=pman,env=development" --field-selector=status.phase=Running --output=jsonpath='{.items[*].metadata.name}') fi @@ -238,9 +245,9 @@ if (( ! b_skipUnitTests )) ; then title -d 1 "Running pman tests..." sleep 5 if [[ $ORCHESTRATOR == swarm ]]; then - docker exec $pman_dev nosetests --exe tests + docker exec $pman_dev pytest tests --color=yes elif [[ $ORCHESTRATOR == kubernetes ]]; then - kubectl exec $pman_dev -- nosetests --exe tests + kubectl exec $pman_dev -- pytest tests --color=yes fi status=$? title -d 1 "pman test results" diff --git a/pman/_helpers.py b/pman/_helpers.py old mode 100644 new mode 100755 diff --git a/pman/abstractmgr.py b/pman/abstractmgr.py index 6ee20838..b9ffc99b 100755 --- a/pman/abstractmgr.py +++ b/pman/abstractmgr.py @@ -46,7 +46,7 @@ class JobInfo: status: JobStatus -class Resources(TypedDict): +class ResourcesDict(TypedDict): number_of_workers: int """ Number of workers for multi-node parallelism. @@ -71,6 +71,25 @@ class Resources(TypedDict): """ +class MountsDict(TypedDict): + inputdir_source: str + """ + Absolute path to the source input directory or otherwise a volume name. + """ + inputdir_target: str + """ + Absolute path to the target input directory (within the container). + """ + outputdir_source: str + """ + Absolute path to the source output directory or otherwise a volume name. + """ + outputdir_target: str + """ + Absolute path to the target output directory (within the container). + """ + + class AbstractManager(ABC, Generic[J]): """ An ``AbstractManager`` is an API to a service which can schedule @@ -85,9 +104,9 @@ def __init__(self, config_dict: dict = None): @abstractmethod def schedule_job(self, image: Image, command: List[str], name: JobName, - resources_dict: Resources, env: List[str], + resources_dict: ResourcesDict, env: List[str], uid: Optional[int], gid: Optional[int], - mountdir: Optional[str] = None) -> J: + mounts_dict: MountsDict) -> J: """ Schedule a new job and return the job object. """ diff --git a/pman/container_user.py b/pman/container_user.py old mode 100644 new mode 100755 diff --git a/pman/cromwell/slurm/wdl.py b/pman/cromwell/slurm/wdl.py index 27bc3628..b8a8c46a 100755 --- a/pman/cromwell/slurm/wdl.py +++ b/pman/cromwell/slurm/wdl.py @@ -10,7 +10,7 @@ from serde import from_dict, deserialize from jinja2 import Environment -from pman.abstractmgr import Image, Resources +from pman.abstractmgr import Image, ResourcesDict from pman.cromwell.models import StrWdl, RuntimeAttributes from dataclasses import dataclass @@ -56,7 +56,7 @@ class SlurmJob: image: Image command: List[str] sharedir: str - resources_dict: Resources + resources_dict: ResourcesDict timelimit: int partition: Optional[str] = None """https://slurm.schedmd.com/sbatch.html#OPT_partition""" @@ -91,7 +91,7 @@ def from_wdl(cls, wdl: StrWdl) -> 'SlurmJob': number_of_workers, end = cls._get_resource(wdl, 'number_of_workers', end) timelimit, end = cls._get_resource(wdl, 'timelimit', end) partition, _ = cls._find_between(wdl, "slurm_partition: '", "'\n", end) - r = Resources( + r = ResourcesDict( number_of_workers=int(number_of_workers), cpu_limit=cls.__serialize_cpu(cpu), memory_limit=cls.__serialize_mem(memory), diff --git a/pman/cromwellmgr.py b/pman/cromwellmgr.py index e2fb2c9f..aa7c60ab 100755 --- a/pman/cromwellmgr.py +++ b/pman/cromwellmgr.py @@ -10,7 +10,8 @@ import logging import time from typing import Optional, List -from .abstractmgr import AbstractManager, ManagerException, JobStatus, JobInfo, Image, JobName, TimeStamp, Resources +from .abstractmgr import (AbstractManager, ManagerException, JobStatus, JobInfo, Image, + JobName, TimeStamp, ResourcesDict, MountsDict) from .cromwell.models import ( WorkflowId, StrWdl, WorkflowStatus, WorkflowIdAndStatus, WorkflowQueryResult, @@ -61,11 +62,13 @@ def __init__(self, config_dict=None): self.__client = CromwellClient(auth) def schedule_job(self, image: Image, command: List[str], name: JobName, - resources_dict: Resources, + resources_dict: ResourcesDict, env: List[str], uid: Optional[int], gid: Optional[int], - mountdir: Optional[str] = None) -> WorkflowId: + mounts_dict: MountsDict) -> WorkflowId: + # TODO env, uid, gid is ignored - wdl = SlurmJob(image, command, mountdir, resources_dict, self.__timelimit).to_wdl() + wdl = SlurmJob(image, command, mounts_dict, resources_dict, + self.__timelimit).to_wdl() res = self.__submit(wdl, name) # Submission does not appear in Cromwell immediately, but pman wants to # get job info, so we need to wait for Cromwell to catch up. diff --git a/pman/dockermgr.py b/pman/dockermgr.py old mode 100644 new mode 100755 index 316fa9c5..6dddf0be --- a/pman/dockermgr.py +++ b/pman/dockermgr.py @@ -4,7 +4,8 @@ from docker import DockerClient from docker.models.containers import Container -from pman.abstractmgr import AbstractManager, Image, JobName, Resources, JobInfo, TimeStamp, ManagerException, JobStatus +from pman.abstractmgr import (AbstractManager, Image, JobName, ResourcesDict, + MountsDict, JobInfo, TimeStamp, ManagerException, JobStatus) import docker @@ -25,9 +26,10 @@ def __init__(self, config_dict=None, docker_client: DockerClient = None): else: self.__docker = docker.from_env() - def schedule_job(self, image: Image, command: List[str], name: JobName, resources_dict: Resources, env: List[str], - uid: Optional[int], gid: Optional[int], - mountdir: Optional[str] = None) -> Container: + def schedule_job(self, image: Image, command: List[str], name: JobName, + resources_dict: ResourcesDict, env: List[str], uid: Optional[int], + gid: Optional[int], mounts_dict: MountsDict) -> Container: + if resources_dict['number_of_workers'] != 1: raise ManagerException( 'Compute environment only supports number_of_workers=1, ' @@ -37,9 +39,14 @@ def schedule_job(self, image: Image, command: List[str], name: JobName, resource if resources_dict['gpu_limit'] != 0: raise ManagerException('Compute environment does not support GPUs yet.') - volumes = {} - if mountdir is not None: - volumes['volumes'] = {mountdir: {'bind': '/share', 'mode': 'rw'}} + volumes = { + 'volumes': { + mounts_dict['inputdir_source']: {'bind': mounts_dict['inputdir_target'], + 'mode': 'ro'}, + mounts_dict['outputdir_source']: {'bind': mounts_dict['outputdir_target'], + 'mode': 'rw'}, + } + } limits = {} if not self.ignore_limits: diff --git a/pman/kubernetesmgr.py b/pman/kubernetesmgr.py index 6a92943f..55942c8a 100755 --- a/pman/kubernetesmgr.py +++ b/pman/kubernetesmgr.py @@ -11,7 +11,8 @@ from kubernetes.client import V1Pod from kubernetes.client.models.v1_job import V1Job from kubernetes.client.rest import ApiException -from .abstractmgr import AbstractManager, ManagerException, JobInfo, JobStatus, TimeStamp, JobName +from .abstractmgr import (AbstractManager, ManagerException, JobInfo, JobStatus, + TimeStamp, JobName) logger = logging.getLogger(__name__) @@ -25,13 +26,13 @@ def __init__(self, config_dict=None): self.kube_client = k_client.CoreV1Api() self.kube_v1_batch_client = k_client.BatchV1Api() - def schedule_job(self, image, command, name, resources_dict, env, uid, gid, mountdir=None) -> \ - V1Job: + def schedule_job(self, image, command, name, resources_dict, env, uid, gid, + mounts_dict) -> V1Job: """ Schedule a new job and return the job object. """ - job_instance = self.create_job(image, command, name, resources_dict, env, uid, gid, - mountdir) + job_instance = self.create_job(image, command, name, resources_dict, env, uid, + gid, mounts_dict) job = self.submit_job(job_instance) return job @@ -129,8 +130,8 @@ def remove_job(self, job): self.kube_v1_batch_client.delete_namespaced_job(job.metadata.name, body=body, namespace=job_namespace) - def create_job(self, image, command, name, resources_dict, env_l, uid, gid, mountdir=None) -> \ - V1Job: + def create_job(self, image, command, name, resources_dict, env_l, uid, gid, + mounts_dict) -> V1Job: """ Create and return a new job instance. """ @@ -178,10 +179,17 @@ def create_job(self, image, command, name, resources_dict, env_l, uid, gid, moun persistent_volume_claim=pvc, ) - volume_mount = k_client.V1VolumeMount( - mount_path='/share', # hard-coded + volume_mount_inputdir = k_client.V1VolumeMount( + mount_path=mounts_dict['inputdir_target'], name='storebase', - sub_path=mountdir.rsplit('/', maxsplit=1)[-1] + sub_path=mounts_dict['inputdir_source'], + read_only=True + ) + volume_mount_outputdir = k_client.V1VolumeMount( + mount_path=mounts_dict['outputdir_target'], + name='storebase', + sub_path=mounts_dict['outputdir_source'], + read_only=False ) container = k_client.V1Container( @@ -191,7 +199,7 @@ def create_job(self, image, command, name, resources_dict, env_l, uid, gid, moun command=command, security_context=k_client.V1SecurityContext(**security_context), resources=k_client.V1ResourceRequirements(limits=limits), - volume_mounts=[volume_mount] + volume_mounts=[volume_mount_inputdir, volume_mount_outputdir] ) pod_template_metadata = None diff --git a/pman/resources.py b/pman/resources.py index d5ceb942..01deb1b0 100755 --- a/pman/resources.py +++ b/pman/resources.py @@ -14,6 +14,7 @@ from .swarmmgr import SwarmManager from .cromwellmgr import CromwellManager + logger = logging.getLogger(__name__) parser = reqparse.RequestParser(bundle_errors=True) @@ -32,6 +33,8 @@ required=True) parser.add_argument('type', dest='type', choices=('ds', 'fs', 'ts'), required=True) parser.add_argument('env', dest='env', type=list, location='json', default=[]) +parser.add_argument('input_dir', dest='input_dir', required=True) +parser.add_argument('output_dir', dest='output_dir', required=True) def get_compute_mgr(container_env): @@ -67,7 +70,9 @@ def __init__(self): def get(self): return { - 'server_version': app.config.get('SERVER_VERSION') + 'server_version': app.config.get('SERVER_VERSION'), + 'container_env': app.config.get('CONTAINER_ENV'), + 'storage_type': app.config.get('STORAGE_TYPE') } def post(self): @@ -92,23 +97,33 @@ def post(self): 'memory_limit': args.memory_limit, 'gpu_limit': args.gpu_limit, } - share_dir = None + mounts_dict = {'inputdir_source': '', + 'inputdir_target': self.str_app_container_inputdir, + 'outputdir_source': '', + 'outputdir_target': self.str_app_container_outputdir + } + input_dir = args.input_dir.strip('/') + output_dir = args.output_dir.strip('/') + # hmm, probably unnecessarily relying on invariant that # STORAGETYPE matches enum value -> STOREBASE is valid and should be used # Perhaps we should instead simply check STOREBASE only? storage_type = app.config.get('STORAGE_TYPE') if storage_type in ('host', 'docker_local_volume'): storebase = app.config.get('STOREBASE') - share_dir = os.path.join(storebase, 'key-' + job_id) + mounts_dict['inputdir_source'] = os.path.join(storebase, input_dir) + mounts_dict['outputdir_source'] = os.path.join(storebase, output_dir) elif storage_type == 'kubernetes_pvc': - share_dir = 'key-' + job_id + mounts_dict['inputdir_source'] = input_dir + mounts_dict['outputdir_source'] = output_dir logger.info(f'Scheduling job {job_id} on the {self.container_env} cluster') compute_mgr = get_compute_mgr(self.container_env) try: job = compute_mgr.schedule_job(args.image, cmd, job_id, resources_dict, - args.env, self.user.get_uid(), self.user.get_gid(), share_dir) + args.env, self.user.get_uid(), + self.user.get_gid(), mounts_dict) except ManagerException as e: logger.error(f'Error from {self.container_env} while scheduling job ' f'{job_id}, detail: {str(e)}') diff --git a/pman/swarmmgr.py b/pman/swarmmgr.py index 05c4e73c..3102dc83 100755 --- a/pman/swarmmgr.py +++ b/pman/swarmmgr.py @@ -6,7 +6,8 @@ import docker from docker.models.services import Service -from .abstractmgr import AbstractManager, ManagerException, JobStatus, JobInfo, Image, TimeStamp, JobName +from .abstractmgr import (AbstractManager, ManagerException, JobStatus, JobInfo, Image, + TimeStamp, JobName) class SwarmManager(AbstractManager[Service]): @@ -19,15 +20,14 @@ def __init__(self, config_dict=None): else: self.docker_client = docker.from_env(environment=self.config) - def schedule_job(self, image, command, name, resources_dict, env, uid, gid, mountdir=None) -> \ - Service: + def schedule_job(self, image, command, name, resources_dict, env, uid, gid, + mounts_dict) -> Service: """ Schedule a new job and return the job (swarm service) object. """ restart_policy = docker.types.RestartPolicy(condition='none') - mounts = [] - if mountdir is not None: - mounts.append('%s:/share:rw' % mountdir) + mounts = [f'{mounts_dict["inputdir_source"]}:{mounts_dict["inputdir_target"]}:ro', + f'{mounts_dict["outputdir_source"]}:{mounts_dict["outputdir_target"]}:rw'] try: job = self.docker_client.services.create(image, command, name=name, diff --git a/requirements/base.txt b/requirements/base.txt index c05c955d..9e15beab 100755 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,3 +1,4 @@ +urllib3<2 Flask==1.1.4 Flask-RESTful==0.3.9 docker==6.0.1 diff --git a/test_docker.sh b/test_docker.sh index cbac014f..0e1f4e43 100755 --- a/test_docker.sh +++ b/test_docker.sh @@ -109,7 +109,9 @@ body="$(cat << EOF "gpu_limit": "0", "image": "$SIMPLEDSAPP", "entrypoint": ["simpledsapp"], - "type": "ds" + "type": "ds", + "input_dir": "key-$jid/incoming", + "output_dir": "key-$jid/outgoing" } EOF )" diff --git a/tests/conftest.py b/tests/conftest.py old mode 100644 new mode 100755 diff --git a/tests/cromwell/examples/metadata.py b/tests/cromwell/examples/metadata.py index 1c3034d2..d71c57d4 100755 --- a/tests/cromwell/examples/metadata.py +++ b/tests/cromwell/examples/metadata.py @@ -1,5 +1,5 @@ from pman.cromwell.models import WorkflowId, WorkflowStatus -from pman.abstractmgr import JobInfo, JobStatus, Image, TimeStamp, JobName, Resources +from pman.abstractmgr import JobInfo, JobStatus, Image, TimeStamp, JobName, ResourcesDict from pman.cromwell.slurm.wdl import SlurmJob workflow_uuid = WorkflowId('4165ed81-c121-4a8d-b284-a6dda9ef0aa8') @@ -10,7 +10,7 @@ sharedir='/mounted/storebase/example-jid-1234', timelimit=5, partition='my-slurm-partition', - resources_dict=Resources( + resources_dict=ResourcesDict( number_of_workers=1, cpu_limit=2000, memory_limit=4000, @@ -355,7 +355,7 @@ command=['/usr/local/bin/python', '/usr/local/bin/whatsgood', '--day', 'sunny', '/share/incoming', '/share/outgoing'], sharedir='/storebase/key-vitamin-d', timelimit=50, - resources_dict=Resources( + resources_dict=ResourcesDict( number_of_workers=1, cpu_limit=2000, memory_limit=5678, @@ -391,7 +391,7 @@ command=['/usr/local/bin/python', '/usr/local/bin/fruit_machine', '--salad', 'orange', '/share/incoming', '/share/outgoing'], sharedir='/storebase/key-fruity-fruit', timelimit=70, - resources_dict=Resources( + resources_dict=ResourcesDict( number_of_workers=3, cpu_limit=1000, memory_limit=789, diff --git a/tests/cromwell/examples/wdl.py b/tests/cromwell/examples/wdl.py index 500795d7..9f85c1f3 100755 --- a/tests/cromwell/examples/wdl.py +++ b/tests/cromwell/examples/wdl.py @@ -1,5 +1,5 @@ from collections import namedtuple -from pman.abstractmgr import Resources +from pman.abstractmgr import ResourcesDict from pman.cromwell.slurm.wdl import SlurmJob, Image, StrWdl # Since conversion to WDL is lossy, we need to define the @@ -35,7 +35,7 @@ sharedir='/location/of/bribe', partition=None, timelimit=12, - resources_dict=Resources( + resources_dict=ResourcesDict( cpu_limit=1234, memory_limit=5678, number_of_workers=9, @@ -48,7 +48,7 @@ sharedir='/location/of/bribe', partition=None, timelimit=12, - resources_dict=Resources( + resources_dict=ResourcesDict( cpu_limit=2000, memory_limit=5678, number_of_workers=9, @@ -87,7 +87,7 @@ sharedir='/neuroimaging/data', partition='has-gpu', timelimit=300, - resources_dict=Resources( + resources_dict=ResourcesDict( number_of_workers=5, cpu_limit=7000, memory_limit=9876, @@ -100,7 +100,7 @@ sharedir='/neuroimaging/data', partition='has-gpu', timelimit=300, - resources_dict=Resources( + resources_dict=ResourcesDict( number_of_workers=5, cpu_limit=7000, memory_limit=9876, diff --git a/tests/test_container_user.py b/tests/test_container_user.py old mode 100644 new mode 100755 diff --git a/tests/test_helpers.py b/tests/test_helpers.py old mode 100644 new mode 100755 diff --git a/tests/test_resources.py b/tests/test_resources.py index 89c0b224..78864bbb 100755 --- a/tests/test_resources.py +++ b/tests/test_resources.py @@ -42,10 +42,13 @@ def setUp(self): self.url = url_for('api.joblist') self.job_id = 'chris-jid-1' + self.input_dir = os.path.join('key-' + self.job_id, 'incoming') + self.output_dir = os.path.join('key-' + self.job_id, 'outgoing') - self.share_dir = os.path.join('/var/local/storeBase', 'key-' + self.job_id) - incoming = os.path.join(self.share_dir, 'incoming') - outgoing = os.path.join(self.share_dir, 'outgoing') + + self.job_dir = os.path.join('/var/local/storeBase', 'key-' + self.job_id) + incoming = os.path.join(self.job_dir, 'incoming') + outgoing = os.path.join(self.job_dir, 'outgoing') Path(incoming).mkdir(parents=True, exist_ok=True) Path(outgoing).mkdir(parents=True, exist_ok=True) with open(os.path.join(incoming, 'test.txt'), 'w') as f: @@ -53,7 +56,7 @@ def setUp(self): def tearDown(self): super().tearDown() - shutil.rmtree(self.share_dir) + shutil.rmtree(self.job_dir) def test_get(self): response = self.client.get(self.url) @@ -73,6 +76,9 @@ def test_post(self): 'image': 'fnndsc/pl-simplefsapp', 'entrypoint': ['simplefsapp'], 'type': 'fs', + 'input_dir': self.input_dir, + 'output_dir': self.output_dir + } # make the POST request response = self.client.post(self.url, json=data) diff --git a/unmake.sh b/unmake.sh index a745316c..8199f572 100755 --- a/unmake.sh +++ b/unmake.sh @@ -88,6 +88,9 @@ title -d 1 "Destroying pman containerized dev environment on $ORCHESTRATOR" if [[ $ORCHESTRATOR == swarm ]]; then echo "docker stack rm pman_dev_stack" | ./boxes.sh ${LightCyan} docker stack rm pman_dev_stack + echo "docker volume rm -f pman_dev_stack_storebase" | ./boxes.sh ${LightCyan} + sleep 15 + docker volume rm -f pman_dev_stack_storebase elif [[ $ORCHESTRATOR == kubernetes ]]; then echo "kubectl delete -f kubernetes/pman_dev.yaml" | ./boxes.sh ${LightCyan} kubectl delete -f kubernetes/pman_dev.yaml