Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docker extension #443

Draft
wants to merge 7 commits into
base: dev-jobqueue-old
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pywps/app/Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pywps.response.execute import ExecuteResponse
from pywps.app.WPSRequest import WPSRequest
import pywps.configuration as config
import pywps.processing
from pywps._compat import PY2
from pywps.exceptions import (StorageNotSupported, OperationNotSupported,
ServerBusy, NoApplicableCode)
Expand Down Expand Up @@ -172,20 +173,32 @@ def _execute_process(self, async, wps_request, wps_response):
if running >= maxparallel and maxparallel != -1:
raise ServerBusy('Maximum number of parallel running processes reached. Please try later.')
wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0)
wps_response = self._run_process(wps_request, wps_response)
wps_response = self._run_sync(wps_request, wps_response)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it called run_sync? docker will only be run in async mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed that with @jachym and we concluded that the docker should be run even in sync mode once the mode parameter is set to docker in config file.


return wps_response

# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_async(self, wps_request, wps_response):
import pywps.processing
process = pywps.processing.Process(
process=self,
wps_request=wps_request,
wps_response=wps_response)
process.start()

def _run_sync(self, wps_request, wps_response):
mode = config.get_config_value('processing', 'mode')
if mode=='docker':
process = pywps.processing.Container(
process=self,
wps_request=wps_request,
wps_response=wps_response)
process.start()
else:
wps_response = self._run_process(wps_request, wps_response)

return wps_response

# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_process(self, wps_request, wps_response):
Expand Down
4 changes: 4 additions & 0 deletions pywps/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,7 @@ class SchedulerNotAvailable(NoApplicableCode):
"""Job scheduler not available exception implementation
"""
code = 400

class NoAvailablePort(NoApplicableCode):
"""No port available for a new docker container"""
code = 400
4 changes: 4 additions & 0 deletions pywps/processing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pywps.configuration as config
from pywps.processing.basic import MultiProcessing
from pywps.processing.scheduler import Scheduler
from pywps.processing.container import Container
# api only
from pywps.processing.basic import Processing # noqa: F401
from pywps.processing.job import Job # noqa: F401
Expand All @@ -15,6 +16,7 @@

MULTIPROCESSING = 'multiprocessing'
SCHEDULER = 'scheduler'
DOCKER = 'docker'
DEFAULT = MULTIPROCESSING


Expand All @@ -29,6 +31,8 @@ def Process(process, wps_request, wps_response):
LOGGER.info("Processing mode: {}".format(mode))
if mode == SCHEDULER:
process = Scheduler(process, wps_request, wps_response)
elif mode == DOCKER:
process = Container(process, wps_request, wps_response)
else:
process = MultiProcessing(process, wps_request, wps_response)
return process
175 changes: 175 additions & 0 deletions pywps/processing/container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
##################################################################
# Copyright 2018 Open Source Geospatial Foundation and others #
# licensed under MIT, Please consult LICENSE.txt for details #
##################################################################

import os
import pywps.configuration as config
from pywps.processing.basic import Processing

from owslib.wps import WebProcessingService as WPS
from pywps.response.status import WPS_STATUS
from pywps.exceptions import NoAvailablePort

import docker
import socket
import time
import threading

from pywps.inout.basic import LiteralInput, ComplexInput, BBoxInput
import owslib
from pywps.dblog import store_status


import logging
LOGGER = logging.getLogger("PYWPS")


class ClientError:
pass


class Container(Processing):
def __init__(self, process, wps_request, wps_response):
super().__init__(process, wps_request, wps_response)
self.port = self._assign_port()
self.client = docker.from_env()
self.cntnr = self._create()

def _create(self):
cntnr_img = config.get_config_value("processing", "docker_img")
prcs_inp_dir = self.job.wps_response.process.workdir
prcs_out_dir = config.get_config_value("server", "outputpath")
dckr_inp_dir = config.get_config_value("processing", "dckr_inp_dir")
dckr_out_dir = config.get_config_value("processing", "dckr_out_dir")
container = self.client.containers.create(cntnr_img, ports={"5000/tcp": self.port}, detach=True,
volumes={
prcs_out_dir: {'bind': dckr_out_dir, 'mode': 'rw'},
prcs_inp_dir: {'bind': dckr_inp_dir, 'mode': 'ro'}
})
return container

def _assign_port(self):
port_min = int(config.get_config_value("processing", "port_min"))
port_max = int(config.get_config_value("processing", "port_max"))
for port in range(port_min, port_max):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
res = sock.connect_ex(('127.0.0.1', port))
# TODO find better solution for errno
if res != 0:
return port
raise NoAvailablePort("No port from range {}-{} available.".format(port_min, port_max))

def start(self):
self.cntnr.start()
# it takes some time to start the container
time.sleep(1)
self._execute()

if self.job.process.async:
self._parse_status()
daemon = threading.Thread(target=check_status, args=(self,))
daemon.start()
else:
self._parse_outputs()
daemon = threading.Thread(target=self.dirty_clean)
daemon.start()

def stop(self):
self.cntnr.stop()

def cancel(self):
self.cntnr.kill()

def pause(self):
self.cntnr.pause()

def unpause(self):
self.cntnr.unpause()

def _execute(self):
url_execute = "http://localhost:{}/wps".format(self.port)
inputs = get_inputs(self.job.wps_request.inputs)
output = get_output(self.job.wps_request.outputs)
wps = WPS(url=url_execute, skip_caps=True)
if self.job.process.async:
mode = "async"
else:
mode = "sync"
self.execution = wps.execute(self.job.wps_request.identifier, inputs=inputs, output=output, mode=mode)

def _parse_outputs(self):
for output in self.execution.processOutputs:
# TODO what if len(data) > 1 ??
if output.data:
self.job.wps_response.outputs[output.identifier].data = output.data[0]
if output.reference:
rp = output.reference[output.reference.index('outputs/'):]
self.job.wps_response.outputs[output.identifier].file = rp

self.job.wps_response.update_status_succeeded('PyWPS Process {} finished'.format(self.job.process.title))
store_status(self.job.wps_response.uuid, self.job.wps_response.status, self.job.wps_response.message)

def _parse_status(self):
self.job.process.status_url = self.execution.statusLocation
self.job.wps_response.update_status(message=self.execution.statusMessage)

def dirty_clean(self):
self.cntnr.stop()
self.cntnr.remove()
self.job.process.clean()
self.update_status()

def update_status(self):
self.job.wps_response.message = 'PyWPS Process {} finished'.format(self.job.process.title)
self.job.wps_response.percentage = 100
self.job.wps_response.status = WPS_STATUS.SUCCEEDED
store_status(self.job.wps_response.uuid, self.job.wps_response.status, self.job.wps_response.message,
self.job.wps_response.percentage)


def get_inputs(job_inputs):
"""
Return all inputs in [(input_name1, input_value1), (input_name2, input_value2)]
Return value can be used for WPS.execute method.
:return: input values
:rtype:list of tuples
"""
the_inputs = []
for key in job_inputs.keys():
inp = job_inputs[key][0]
if isinstance(inp, LiteralInput):
ows_inp = str(inp.data)
elif isinstance(inp, ComplexInput):
fp = os.path.basename(inp.file)
dckr_inp_dir = config.get_config_value('processing', 'dckr_inp_dir')
ows_inp = owslib.wps.ComplexDataInput("file://" + os.path.join(dckr_inp_dir, fp))
elif isinstance(inp, BBoxInput):
ows_inp = owslib.wps.BoundingBoxDataInput(inp.data)
else:
raise Exception
the_inputs.append((key, ows_inp))

return the_inputs


def get_output(job_output):
"""
Return all outputs name
Return value can be used for WPS.execute method.
:return: output names
:rtype:list
"""
the_output = []
for key in job_output.keys():
the_output.append((key, job_output[key]['asReference']))
return the_output


def check_status(container):
sleep_secs = int(config.get_config_value('processing', 'sleep_secs'))
while True:
container.execution.checkStatus(sleepSecs=sleep_secs)
if container.execution.isComplete():
container.dirty_clean()
break
2 changes: 1 addition & 1 deletion pywps/processing/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def dump(self):
LOGGER.debug('dump job ...')
import dill
filename = tempfile.mkstemp(prefix='job_', suffix='.dump', dir=self.workdir)[1]
with open(filename, 'w') as fp:
with open(filename, 'wb') as fp:
dill.dump(self, fp)
LOGGER.debug("dumped job status to {}".format(filename))
return filename
Expand Down
10 changes: 10 additions & 0 deletions pywps/response/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def update_status(self, message, status_percentage=None):
status_percentage = self.status_percentage
self._update_status(self.status, message, status_percentage, False)

def update_status_succeeded(self, message):
"""
Update status report of succeeded process instance.

This method is for Docker container processing.

:param str message: Message you need to share with the client
"""
self._update_status(WPS_STATUS.SUCCEEDED, message, 100, True)

def _update_status_doc(self):
try:
# rebuild the doc
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
flufl.enum
flufl.enum
jinja2
jsonschema
lxml
Expand All @@ -8,3 +8,4 @@ python-dateutil
requests
SQLAlchemy
werkzeug
docker
lazaa32 marked this conversation as resolved.
Show resolved Hide resolved