Skip to content

Commit

Permalink
Merge pull request #13 from johandahlberg/quarantine_runfolders
Browse files Browse the repository at this point in the history
Quarantine runfolders after pickup
  • Loading branch information
Johan Hermansson committed May 11, 2016
2 parents 80f4062 + 82ca795 commit 5b45346
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 66 deletions.
2 changes: 1 addition & 1 deletion requirements/prod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/arteria-project/arteria-core.git@v1.0.1#egg=arteria-core
git+https://github.com/arteria-project/arteria-core.git@v1.1.0#egg=arteria-core
jsonpickle==0.9.2
tornado==4.2.1
PyYAML==3.11
1 change: 1 addition & 0 deletions runfolder/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def start():
routes = [
(r"/api/1.0/runfolders", ListAvailableRunfoldersHandler, args),
(r"/api/1.0/runfolders/next", NextAvailableRunfolderHandler, args),
(r"/api/1.0/runfolders/pickup", PickupAvailableRunfolderHandler, args),
(r"/api/1.0/runfolders/path(/.*)", RunfolderHandler, args),
(r"/api/1.0/runfolders/test/markasready/path(/.*)", TestFakeSequencerReadyHandler, args)
]
Expand Down
33 changes: 27 additions & 6 deletions runfolder/handlers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@

import tornado.web

import arteria
from arteria.web.state import State
from arteria.exceptions import InvalidArteriaStateException
from arteria.web.handlers import BaseRestHandler
from runfolder.services import *
import tornado.web

from runfolder.services import *

class BaseRunfolderHandler(BaseRestHandler):
"""Provides core logic for all runfolder handlers"""
Expand Down Expand Up @@ -40,7 +44,7 @@ def get(self):
def get_runfolders():
try:
# TODO: This list should be paged. The unfiltered list can be large
state = self.get_argument("state", RunfolderState.READY)
state = self.get_argument("state", State.READY)
if state == "*":
state = None
for runfolder_info in self.runfolder_svc.list_runfolders(state):
Expand All @@ -56,15 +60,32 @@ class NextAvailableRunfolderHandler(BaseRunfolderHandler):
"""Handles fetching the next available runfolder"""
def get(self):
"""
Returns the next runfolder to process. Note that it's currently assumed
that only one process polls this endpoint. No locking mechanism is in place.
Returns the next runfolder to process. Note that it will not lock the runfolder, and unless its
state is changed by the polling client quickly enough it will be presented again.
"""
runfolder_info = self.runfolder_svc.next_runfolder()
if runfolder_info:
self.append_runfolder_link(runfolder_info)
self.write_object(runfolder_info)


class PickupAvailableRunfolderHandler(BaseRunfolderHandler):
"""Handles fetching the next available runfolder"""
def get(self):
"""
Returns the next runfolder to process and set it's state to PENDING.
"""
runfolder_info = self.runfolder_svc.next_runfolder()
if runfolder_info:
self.append_runfolder_link(runfolder_info)
self.runfolder_svc.set_runfolder_state(runfolder_info.path, State.PENDING)
runfolder_info.state = State.PENDING
self.write_object(runfolder_info)
else:
self.set_status(204, reason="No ready runfolders available.")
self.write(dict())


class RunfolderHandler(BaseRunfolderHandler):
"""Handles a particular runfolder, identified by path"""
def get(self, path):
Expand Down Expand Up @@ -96,7 +117,7 @@ def post(self, path):

try:
self.runfolder_svc.set_runfolder_state(path, state)
except InvalidRunfolderState:
except InvalidArteriaStateException:
raise tornado.web.HTTPError(400, "The state '{}' is not valid".format(state))

@arteria.undocumented
Expand Down
61 changes: 13 additions & 48 deletions runfolder/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,8 @@
import logging
from runfolder import __version__ as version


class Enum(set):
"""
Defines an enumeration which values are a string representation of the
specified attribute, i.e. EnumInstance.VAL1 == "VAL1"
Usage: EnumInstance = Enum(["VAL1", "VAL2"])
print EnumInstance.VAL1
> "VAL1"
if "VAL3" not in EnumInstance:
raise ...
"""
def __getattr__(self, name):
if name in self:
return name
raise AttributeError

def __setattr__(self, key, value):
raise NotImplementedError("Values cannot be set directly")

"""
NONE: Not ready for processing or invalid
READY: Ready for processing
STARTED: Started processing the runfolder
DONE: Done processing the runfolder
ERROR: Started processing the runfolder but there was an error
"""
RunfolderState = Enum(["NONE", "READY", "STARTED", "DONE", "ERROR"])

from arteria.web.state import State
from arteria.web.state import validate_state

class RunfolderInfo:
"""
Expand Down Expand Up @@ -163,7 +134,7 @@ def get_runfolder_by_path(self, path):
def _get_runfolder_state_from_state_file(self, runfolder):
"""
Reads the state in the state file at .arteria/state, returns
RunfolderState.NONE if nothing is available
State.NONE if nothing is available
"""
state_file = os.path.join(runfolder, ".arteria", "state")
if self._file_exists(state_file):
Expand All @@ -172,34 +143,28 @@ def _get_runfolder_state_from_state_file(self, runfolder):
state = state.strip()
return state
else:
return RunfolderState.NONE
return State.NONE

def get_runfolder_state(self, runfolder):
"""
Returns the state of a runfolder. The possible states are defined in
RunfolderState
State
If the file .arteria/state exists, it will determine the state. If it doesn't
exist, the existence of the marker file RTAComplete.txt determines the state.
"""
state = self._get_runfolder_state_from_state_file(runfolder)
if state == RunfolderState.NONE:
if state == State.NONE:
completed_marker = os.path.join(runfolder, "RTAComplete.txt")
ready = self._file_exists(completed_marker)
if ready:
state = RunfolderState.READY
state = State.READY
return state

@staticmethod
def validate_state(state):
"""Raises InvalidRunfolderState if the state is not known"""
if state not in RunfolderState:
raise InvalidRunfolderState("The state '{}' is not valid".format(state))

@staticmethod
def set_runfolder_state(runfolder, state):
"""Sets the state of a runfolder"""
RunfolderService.validate_state(state)
validate_state(state)
arteria_dir = os.path.join(runfolder, ".arteria")
state_file = os.path.join(arteria_dir, "state")
if not os.path.exists(arteria_dir):
Expand All @@ -210,8 +175,8 @@ def set_runfolder_state(runfolder, state):
def is_runfolder_ready(self, directory):
"""Returns True if the runfolder is ready"""
state = self.get_runfolder_state(directory)
self._logger.debug("Checking {0}. state={1}".format(directory, state))
return state == RunfolderState.READY
from arteria.testhelpers import TestFunctionDelta, BaseRestTest
return state == State.READY

def _monitored_directories(self):
"""Lists all directories monitored for new runfolders"""
Expand All @@ -225,7 +190,7 @@ def _monitored_directories(self):

def next_runfolder(self):
"""Returns the next available runfolder. Returns None if there is none available."""
available = self.list_runfolders(state=RunfolderState.READY)
available = self.list_runfolders(state=State.READY)
try:
first = available.next()
except StopIteration:
Expand All @@ -236,7 +201,7 @@ def next_runfolder(self):
return first

def list_available_runfolders(self):
return self.list_runfolders(RunfolderState.READY)
return self.list_runfolders(State.READY)

def list_runfolders(self, state):
"""
Expand All @@ -245,7 +210,7 @@ def list_runfolders(self, state):
"""
runfolders = self._enumerate_runfolders()
if state:
RunfolderService.validate_state(state)
validate_state(state)
return (runfolder for runfolder in runfolders if runfolder.state == state)
else:
return runfolders
Expand Down
36 changes: 34 additions & 2 deletions runfolder_tests/integration/rest_tests.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import unittest
import time
from arteria.testhelpers import TestFunctionDelta, BaseRestTest
import os
import logging
import requests
import jsonpickle
import mock
import shutil

from arteria.testhelpers import TestFunctionDelta, BaseRestTest
from arteria.web.state import State


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -85,6 +89,9 @@ def test_can_create_and_update_state(self):
matching = [runfolder for runfolder in runfolders if runfolder["path"] == path]
self.assertEqual(len(matching), 1)

# Remove the path created, so it does not interfere with other tests
shutil.rmtree(path)

# TODO: Change state to "processing" and ensure it doesn't show up in /runfolders
self.messages_logged.assert_changed_by_total(2)

Expand All @@ -98,14 +105,39 @@ def test_updating_state_removes_runfolder_from_candidates(self):
path = self._create_ready_runfolder()
self.assertTrue(self._exists(path))
# Mark the folder as processing
self.post("./runfolders/path{}".format(path), {"state": "STARTED"}, expect=200)
self.post("./runfolders/path{}".format(path), {"state": State.STARTED}, expect=200)
# Ensure that the folder is not listed anymore:
self.assertFalse(self._exists(path))
# Remove the path created, so it does not interfere with other tests
shutil.rmtree(path)

def test_invalid_state_is_not_accepted(self):
path = self._create_ready_runfolder()
self.assertTrue(self._exists(path))
self.post("./runfolders/path{}".format(path), {"state": "NOT-AVAILABLE"}, expect=400)
# Remove the path created, so it does not interfere with other tests
shutil.rmtree(path)

def test_pickup_runfolder(self):
path = self._create_ready_runfolder()
self.assertTrue(self._exists(path))
response = self.get("./runfolders/pickup", expect=200)
response_json = jsonpickle.loads(response.text)
self.assertEqual(response_json["path"], path)
self.assertEqual(response_json["state"], State.PENDING)
# Remove the path created, so it does not interfere with other tests
shutil.rmtree(path)


def test_next_runfolder(self):
path = self._create_ready_runfolder()
self.assertTrue(self._exists(path))
response = self.get("./runfolders/next", expect=200)
response_json = jsonpickle.loads(response.text)
self.assertEqual(response_json["path"], path)
self.assertEqual(response_json["state"], State.READY)
# Remove the path created, so it does not interfere with other tests
shutil.rmtree(path)

def _exists(self, path):
resp = self.get("./runfolders")
Expand Down
17 changes: 8 additions & 9 deletions runfolder_tests/unit/runfolder_tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import unittest
import logging
from runfolder.services import RunfolderService, RunfolderState

from arteria.web.state import State

from runfolder.services import RunfolderService


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -34,8 +38,8 @@ def test_list_available_runfolders(self):
self.assertEqual(len(runfolders), 2)

runfolders_str = sorted([str(runfolder) for runfolder in runfolders])
expected = ["READY: /data/testarteria1/mon1/runfolder001@localhost",
"READY: /data/testarteria1/mon2/runfolder001@localhost"]
expected = ["ready: /data/testarteria1/mon1/runfolder001@localhost",
"ready: /data/testarteria1/mon2/runfolder001@localhost"]
self.assertEqual(runfolders_str, expected)

def test_next_runfolder(self):
Expand All @@ -54,14 +58,9 @@ def test_next_runfolder(self):

# Test
runfolder = runfolder_svc.next_runfolder()
expected = "READY: /data/testarteria1/mon1/runfolder001@localhost"
expected = "ready: /data/testarteria1/mon1/runfolder001@localhost"
self.assertEqual(str(runfolder), expected)

def test_runfolder_state_cant_be_set(self):
def assign_by_accident():
RunfolderState.READY = "by accident"
self.assertRaises(NotImplementedError, assign_by_accident)

def test_monitored_directory_validates(self):
configuration_svc = dict()
configuration_svc["monitored_directories"] = ["/data/testarteria1/runfolders"]
Expand Down

0 comments on commit 5b45346

Please sign in to comment.