Skip to content

Commit

Permalink
Implement support for job's input and output directories in PMAN's API
Browse files Browse the repository at this point in the history
  • Loading branch information
jbernal0019 committed Aug 31, 2023
1 parent 6398d0a commit 0119a2d
Show file tree
Hide file tree
Showing 20 changed files with 131 additions and 58 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ services:
context: .
args:
ENVIRONMENT: local
stdin_open: true # docker run -i
tty: true # docker run -t
command: /usr/local/src/pman/docker-test-entrypoint.sh
ports:
- "5010:5010"
Expand Down
17 changes: 12 additions & 5 deletions make.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ if (( ! b_skipIntro )) ; then
windowBottom
fi

title -d 1 "Building :dev"
cd $HERE
CMD="docker compose -f docker-compose.yml build"
echo "$CMD" | ./boxes.sh
echo $CMD | sh | ./boxes.sh -c
windowBottom

title -d 1 "Changing permissions to 755 on" "$HERE"
cd $HERE
echo "chmod -R 755 $HERE" | ./boxes.sh
Expand Down Expand Up @@ -206,8 +213,8 @@ windowBottom

title -d 1 "Starting pman containerized dev environment on $ORCHESTRATOR"
if [[ $ORCHESTRATOR == swarm ]]; then
echo "docker stack deploy -c swarm/docker-compose_dev.yml pman_dev_stack" | ./boxes.sh ${LightCyan}
docker stack deploy -c swarm/docker-compose_dev.yml pman_dev_stack
echo "docker stack deploy -c docker-compose.yml pman_dev_stack" | ./boxes.sh ${LightCyan}
docker stack deploy -c docker-compose.yml pman_dev_stack
elif [[ $ORCHESTRATOR == kubernetes ]]; then
echo "envsubst < kubernetes/pman_dev.yaml | kubectl apply -f -" | ./boxes.sh ${LightCyan}
envsubst < kubernetes/pman_dev.yaml | kubectl apply -f -
Expand All @@ -219,7 +226,7 @@ title -d 1 "Waiting until pman container is running on $ORCHESTRATOR"
for i in {1..30}; do
sleep 5
if [[ $ORCHESTRATOR == swarm ]]; then
pman_dev=$(docker ps -f label=org.chrisproject.role=pman -q | head -n 1)
pman_dev=$(docker ps -f label=org.opencontainers.image.title=pman -q | head -n 1)
elif [[ $ORCHESTRATOR == kubernetes ]]; then
pman_dev=$(kubectl get pods --selector="app=pman,env=development" --field-selector=status.phase=Running --output=jsonpath='{.items[*].metadata.name}')
fi
Expand All @@ -238,9 +245,9 @@ if (( ! b_skipUnitTests )) ; then
title -d 1 "Running pman tests..."
sleep 5
if [[ $ORCHESTRATOR == swarm ]]; then
docker exec $pman_dev nosetests --exe tests
docker exec $pman_dev pytest tests --color=yes
elif [[ $ORCHESTRATOR == kubernetes ]]; then
kubectl exec $pman_dev -- nosetests --exe tests
kubectl exec $pman_dev -- pytest tests --color=yes
fi
status=$?
title -d 1 "pman test results"
Expand Down
Empty file modified pman/_helpers.py
100644 → 100755
Empty file.
25 changes: 22 additions & 3 deletions pman/abstractmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class JobInfo:
status: JobStatus


class Resources(TypedDict):
class ResourcesDict(TypedDict):
number_of_workers: int
"""
Number of workers for multi-node parallelism.
Expand All @@ -71,6 +71,25 @@ class Resources(TypedDict):
"""


class MountsDict(TypedDict):
inputdir_source: str
"""
Absolute path to the source input directory or otherwise a volume name.
"""
inputdir_target: str
"""
Absolute path to the target input directory (within the container).
"""
outputdir_source: str
"""
Absolute path to the source output directory or otherwise a volume name.
"""
outputdir_target: str
"""
Absolute path to the target output directory (within the container).
"""


class AbstractManager(ABC, Generic[J]):
"""
An ``AbstractManager`` is an API to a service which can schedule
Expand All @@ -85,9 +104,9 @@ def __init__(self, config_dict: dict = None):

@abstractmethod
def schedule_job(self, image: Image, command: List[str], name: JobName,
resources_dict: Resources, env: List[str],
resources_dict: ResourcesDict, env: List[str],
uid: Optional[int], gid: Optional[int],
mountdir: Optional[str] = None) -> J:
mounts_dict: MountsDict) -> J:
"""
Schedule a new job and return the job object.
"""
Expand Down
Empty file modified pman/container_user.py
100644 → 100755
Empty file.
6 changes: 3 additions & 3 deletions pman/cromwell/slurm/wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from serde import from_dict, deserialize
from jinja2 import Environment
from pman.abstractmgr import Image, Resources
from pman.abstractmgr import Image, ResourcesDict
from pman.cromwell.models import StrWdl, RuntimeAttributes
from dataclasses import dataclass

Expand Down Expand Up @@ -56,7 +56,7 @@ class SlurmJob:
image: Image
command: List[str]
sharedir: str
resources_dict: Resources
resources_dict: ResourcesDict
timelimit: int
partition: Optional[str] = None
"""https://slurm.schedmd.com/sbatch.html#OPT_partition"""
Expand Down Expand Up @@ -91,7 +91,7 @@ def from_wdl(cls, wdl: StrWdl) -> 'SlurmJob':
number_of_workers, end = cls._get_resource(wdl, 'number_of_workers', end)
timelimit, end = cls._get_resource(wdl, 'timelimit', end)
partition, _ = cls._find_between(wdl, "slurm_partition: '", "'\n", end)
r = Resources(
r = ResourcesDict(
number_of_workers=int(number_of_workers),
cpu_limit=cls.__serialize_cpu(cpu),
memory_limit=cls.__serialize_mem(memory),
Expand Down
11 changes: 7 additions & 4 deletions pman/cromwellmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import logging
import time
from typing import Optional, List
from .abstractmgr import AbstractManager, ManagerException, JobStatus, JobInfo, Image, JobName, TimeStamp, Resources
from .abstractmgr import (AbstractManager, ManagerException, JobStatus, JobInfo, Image,
JobName, TimeStamp, ResourcesDict, MountsDict)
from .cromwell.models import (
WorkflowId, StrWdl,
WorkflowStatus, WorkflowIdAndStatus, WorkflowQueryResult,
Expand Down Expand Up @@ -61,11 +62,13 @@ def __init__(self, config_dict=None):
self.__client = CromwellClient(auth)

def schedule_job(self, image: Image, command: List[str], name: JobName,
resources_dict: Resources,
resources_dict: ResourcesDict,
env: List[str], uid: Optional[int], gid: Optional[int],
mountdir: Optional[str] = None) -> WorkflowId:
mounts_dict: MountsDict) -> WorkflowId:

# TODO env, uid, gid is ignored
wdl = SlurmJob(image, command, mountdir, resources_dict, self.__timelimit).to_wdl()
wdl = SlurmJob(image, command, mounts_dict, resources_dict,
self.__timelimit).to_wdl()
res = self.__submit(wdl, name)
# Submission does not appear in Cromwell immediately, but pman wants to
# get job info, so we need to wait for Cromwell to catch up.
Expand Down
21 changes: 14 additions & 7 deletions pman/dockermgr.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from docker import DockerClient
from docker.models.containers import Container

from pman.abstractmgr import AbstractManager, Image, JobName, Resources, JobInfo, TimeStamp, ManagerException, JobStatus
from pman.abstractmgr import (AbstractManager, Image, JobName, ResourcesDict,
MountsDict, JobInfo, TimeStamp, ManagerException, JobStatus)
import docker


Expand All @@ -25,9 +26,10 @@ def __init__(self, config_dict=None, docker_client: DockerClient = None):
else:
self.__docker = docker.from_env()

def schedule_job(self, image: Image, command: List[str], name: JobName, resources_dict: Resources, env: List[str],
uid: Optional[int], gid: Optional[int],
mountdir: Optional[str] = None) -> Container:
def schedule_job(self, image: Image, command: List[str], name: JobName,
resources_dict: ResourcesDict, env: List[str], uid: Optional[int],
gid: Optional[int], mounts_dict: MountsDict) -> Container:

if resources_dict['number_of_workers'] != 1:
raise ManagerException(
'Compute environment only supports number_of_workers=1, '
Expand All @@ -37,9 +39,14 @@ def schedule_job(self, image: Image, command: List[str], name: JobName, resource
if resources_dict['gpu_limit'] != 0:
raise ManagerException('Compute environment does not support GPUs yet.')

volumes = {}
if mountdir is not None:
volumes['volumes'] = {mountdir: {'bind': '/share', 'mode': 'rw'}}
volumes = {
'volumes': {
mounts_dict['inputdir_source']: {'bind': mounts_dict['inputdir_target'],
'mode': 'ro'},
mounts_dict['outputdir_source']: {'bind': mounts_dict['outputdir_target'],
'mode': 'rw'},
}
}

limits = {}
if not self.ignore_limits:
Expand Down
30 changes: 19 additions & 11 deletions pman/kubernetesmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from kubernetes.client import V1Pod
from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.rest import ApiException
from .abstractmgr import AbstractManager, ManagerException, JobInfo, JobStatus, TimeStamp, JobName
from .abstractmgr import (AbstractManager, ManagerException, JobInfo, JobStatus,
TimeStamp, JobName)

logger = logging.getLogger(__name__)

Expand All @@ -25,13 +26,13 @@ def __init__(self, config_dict=None):
self.kube_client = k_client.CoreV1Api()
self.kube_v1_batch_client = k_client.BatchV1Api()

def schedule_job(self, image, command, name, resources_dict, env, uid, gid, mountdir=None) -> \
V1Job:
def schedule_job(self, image, command, name, resources_dict, env, uid, gid,
mounts_dict) -> V1Job:
"""
Schedule a new job and return the job object.
"""
job_instance = self.create_job(image, command, name, resources_dict, env, uid, gid,
mountdir)
job_instance = self.create_job(image, command, name, resources_dict, env, uid,
gid, mounts_dict)
job = self.submit_job(job_instance)
return job

Expand Down Expand Up @@ -129,8 +130,8 @@ def remove_job(self, job):
self.kube_v1_batch_client.delete_namespaced_job(job.metadata.name, body=body,
namespace=job_namespace)

def create_job(self, image, command, name, resources_dict, env_l, uid, gid, mountdir=None) -> \
V1Job:
def create_job(self, image, command, name, resources_dict, env_l, uid, gid,
mounts_dict) -> V1Job:
"""
Create and return a new job instance.
"""
Expand Down Expand Up @@ -178,10 +179,17 @@ def create_job(self, image, command, name, resources_dict, env_l, uid, gid, moun
persistent_volume_claim=pvc,

)
volume_mount = k_client.V1VolumeMount(
mount_path='/share', # hard-coded
volume_mount_inputdir = k_client.V1VolumeMount(
mount_path=mounts_dict['inputdir_target'],
name='storebase',
sub_path=mountdir.rsplit('/', maxsplit=1)[-1]
sub_path=mounts_dict['inputdir_source'],
read_only=True
)
volume_mount_outputdir = k_client.V1VolumeMount(
mount_path=mounts_dict['outputdir_target'],
name='storebase',
sub_path=mounts_dict['outputdir_source'],
read_only=False
)

container = k_client.V1Container(
Expand All @@ -191,7 +199,7 @@ def create_job(self, image, command, name, resources_dict, env_l, uid, gid, moun
command=command,
security_context=k_client.V1SecurityContext(**security_context),
resources=k_client.V1ResourceRequirements(limits=limits),
volume_mounts=[volume_mount]
volume_mounts=[volume_mount_inputdir, volume_mount_outputdir]
)

pod_template_metadata = None
Expand Down
25 changes: 20 additions & 5 deletions pman/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .swarmmgr import SwarmManager
from .cromwellmgr import CromwellManager


logger = logging.getLogger(__name__)

parser = reqparse.RequestParser(bundle_errors=True)
Expand All @@ -32,6 +33,8 @@
required=True)
parser.add_argument('type', dest='type', choices=('ds', 'fs', 'ts'), required=True)
parser.add_argument('env', dest='env', type=list, location='json', default=[])
parser.add_argument('input_dir', dest='input_dir', required=True)
parser.add_argument('output_dir', dest='output_dir', required=True)


def get_compute_mgr(container_env):
Expand Down Expand Up @@ -67,7 +70,9 @@ def __init__(self):

def get(self):
return {
'server_version': app.config.get('SERVER_VERSION')
'server_version': app.config.get('SERVER_VERSION'),
'container_env': app.config.get('CONTAINER_ENV'),
'storage_type': app.config.get('STORAGE_TYPE')
}

def post(self):
Expand All @@ -92,23 +97,33 @@ def post(self):
'memory_limit': args.memory_limit,
'gpu_limit': args.gpu_limit,
}
share_dir = None
mounts_dict = {'inputdir_source': '',
'inputdir_target': self.str_app_container_inputdir,
'outputdir_source': '',
'outputdir_target': self.str_app_container_outputdir
}
input_dir = args.input_dir.strip('/')
output_dir = args.output_dir.strip('/')

# hmm, probably unnecessarily relying on invariant that
# STORAGETYPE matches enum value -> STOREBASE is valid and should be used
# Perhaps we should instead simply check STOREBASE only?
storage_type = app.config.get('STORAGE_TYPE')
if storage_type in ('host', 'docker_local_volume'):
storebase = app.config.get('STOREBASE')
share_dir = os.path.join(storebase, 'key-' + job_id)
mounts_dict['inputdir_source'] = os.path.join(storebase, input_dir)
mounts_dict['outputdir_source'] = os.path.join(storebase, output_dir)
elif storage_type == 'kubernetes_pvc':
share_dir = 'key-' + job_id
mounts_dict['inputdir_source'] = input_dir
mounts_dict['outputdir_source'] = output_dir

logger.info(f'Scheduling job {job_id} on the {self.container_env} cluster')

compute_mgr = get_compute_mgr(self.container_env)
try:
job = compute_mgr.schedule_job(args.image, cmd, job_id, resources_dict,
args.env, self.user.get_uid(), self.user.get_gid(), share_dir)
args.env, self.user.get_uid(),
self.user.get_gid(), mounts_dict)
except ManagerException as e:
logger.error(f'Error from {self.container_env} while scheduling job '
f'{job_id}, detail: {str(e)}')
Expand Down
12 changes: 6 additions & 6 deletions pman/swarmmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import docker
from docker.models.services import Service
from .abstractmgr import AbstractManager, ManagerException, JobStatus, JobInfo, Image, TimeStamp, JobName
from .abstractmgr import (AbstractManager, ManagerException, JobStatus, JobInfo, Image,
TimeStamp, JobName)


class SwarmManager(AbstractManager[Service]):
Expand All @@ -19,15 +20,14 @@ def __init__(self, config_dict=None):
else:
self.docker_client = docker.from_env(environment=self.config)

def schedule_job(self, image, command, name, resources_dict, env, uid, gid, mountdir=None) -> \
Service:
def schedule_job(self, image, command, name, resources_dict, env, uid, gid,
mounts_dict) -> Service:
"""
Schedule a new job and return the job (swarm service) object.
"""
restart_policy = docker.types.RestartPolicy(condition='none')
mounts = []
if mountdir is not None:
mounts.append('%s:/share:rw' % mountdir)
mounts = [f'{mounts_dict["inputdir_source"]}:{mounts_dict["inputdir_target"]}:ro',
f'{mounts_dict["outputdir_source"]}:{mounts_dict["outputdir_target"]}:rw']
try:
job = self.docker_client.services.create(image, command,
name=name,
Expand Down
1 change: 1 addition & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
urllib3<2
Flask==1.1.4
Flask-RESTful==0.3.9
docker==6.0.1
Expand Down
4 changes: 3 additions & 1 deletion test_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ body="$(cat << EOF
"gpu_limit": "0",
"image": "$SIMPLEDSAPP",
"entrypoint": ["simpledsapp"],
"type": "ds"
"type": "ds",
"input_dir": "key-$jid/incoming",
"output_dir": "key-$jid/outgoing"
}
EOF
)"
Expand Down
Empty file modified tests/conftest.py
100644 → 100755
Empty file.
Loading

0 comments on commit 0119a2d

Please sign in to comment.