Skip to content

Commit

Permalink
Accept inspection data asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
andchiind committed Oct 25, 2024
1 parent 15afff2 commit a9e950c
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/isar/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ def __init__(self) -> None:
# Keyvault name
KEYVAULT_NAME: str = Field(default="IsarDevKv")

# Determines whether inspections are uploaded asynchronously or get_inspections in robotinterface
UPLOAD_INSPECTIONS_ASYNC: bool = Field(default=False)

# URL to storage account for Azure Blob Storage
BLOB_STORAGE_ACCOUNT_URL: str = Field(
default="https://eqrobotdevstorage.blob.core.windows.net"
Expand Down
16 changes: 15 additions & 1 deletion src/isar/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from logging import Logger
from threading import Thread
from typing import Any, List
from typing import Any, List, Tuple

from injector import Injector

Expand All @@ -20,6 +20,8 @@
from isar.services.service_connections.mqtt.robot_info_publisher import (
RobotInfoPublisher,
)
from robot_interface.models.inspection.inspection import Inspection
from robot_interface.models.mission.mission import Mission
from isar.state_machine.state_machine import StateMachine, main
from isar.storage.uploader import Uploader
from robot_interface.robot_interface import RobotInterface
Expand Down Expand Up @@ -69,6 +71,7 @@ def print_setting(setting: str = "", value: Any = "", fillchar: str = " "):
print_setting("Using local storage", settings.STORAGE_LOCAL_ENABLED)
print_setting("Using blob storage", settings.STORAGE_BLOB_ENABLED)
print_setting("Using SLIMM storage", settings.STORAGE_SLIMM_ENABLED)
print_setting("Using async inspection uploading", settings.UPLOAD_INSPECTIONS_ASYNC)
print_setting("Plant code", settings.PLANT_CODE)
print_setting("Plant name", settings.PLANT_NAME)
print_setting("Plant shortname", settings.PLANT_SHORT_NAME)
Expand Down Expand Up @@ -103,6 +106,16 @@ def start():
target=uploader.run, name="ISAR Uploader", daemon=True
)
threads.append(uploader_thread)

if settings.UPLOAD_INSPECTIONS_ASYNC:
def inspections_callback(inspection: Inspection):
message: Tuple[Inspection, Mission] = (
inspection,
state_machine.current_mission,
)
state_machine.queues.upload_queue.put(message)
robot.register_inspection_callback(inspections_callback)

if settings.MQTT_ENABLED:
mqtt_client: MqttClient = MqttClient(mqtt_queue=queues.mqtt_queue)

Expand Down Expand Up @@ -137,6 +150,7 @@ def start():
robot_name=settings.ROBOT_NAME,
isar_id=settings.ISAR_ID,
)

if publishers:
threads.extend(publishers)

Expand Down
2 changes: 1 addition & 1 deletion src/isar/state_machine/states/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _run(self) -> None:

self.state_machine.current_task.status = status

if self._should_upload_inspections():
if not settings.UPLOAD_INSPECTIONS_ASYNC and self._should_upload_inspections():
get_inspection_thread = ThreadedRequest(
self._queue_inspections_for_upload
)
Expand Down
18 changes: 17 additions & 1 deletion src/robot_interface/robot_interface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABCMeta, abstractmethod
from queue import Queue
from threading import Thread
from typing import List
from typing import Callable, List

from robot_interface.models.initialize import InitializeParams
from robot_interface.models.inspection.inspection import Inspection
Expand Down Expand Up @@ -180,6 +180,22 @@ def get_inspection(self, task: InspectionTask) -> Inspection:
"""
raise NotImplementedError

@abstractmethod
def register_inspection_callback(self, callback_function: Callable[[Inspection], None]) -> None:
"""Register a function which should be run when inspection data is received
asynchronously. This function should expect to receive an Inspection from.
Parameters
----------
callback_function : Callable[[Inspection]
Returns
-------
None
"""
raise NotImplementedError

@abstractmethod
def initialize(self, params: InitializeParams) -> None:
"""Initializes the robot. The initialization needed is robot dependent and the
Expand Down
4 changes: 4 additions & 0 deletions tests/isar/state_machine/test_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from injector import Injector
from pytest_mock import MockerFixture

from isar.config.settings import settings

from isar.mission_planner.local_planner import LocalPlanner
from isar.models.communication.queues.queues import Queues
from isar.services.utilities.scheduling_utilities import SchedulingUtilities
Expand All @@ -31,6 +33,7 @@

class StateMachineThread(object):
def __init__(self, injector) -> None:
settings.UPLOAD_INSPECTIONS_ASYNC = False
self.injector: Injector = injector
self.state_machine: StateMachine = injector.get(StateMachine)
self._thread: Thread = Thread(target=main, args=[self.state_machine])
Expand All @@ -42,6 +45,7 @@ def start(self):

class UploaderThread(object):
def __init__(self, injector) -> None:
settings.UPLOAD_INSPECTIONS_ASYNC = False
self.injector: Injector = injector
self.uploader: Uploader = Uploader(
queues=self.injector.get(Queues),
Expand Down
5 changes: 4 additions & 1 deletion tests/mocks/robot_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime
from queue import Queue
from threading import Thread
from typing import List, Sequence
from typing import Callable, List, Sequence

from alitra import Frame, Orientation, Pose, Position

Expand Down Expand Up @@ -61,6 +61,9 @@ def get_inspection(self, task: InspectionTask) -> Inspection:
image: Image = Image(mock_image_metadata())
image.data = b"Some binary image data"
return image

def register_inspection_callback(self, callback_function: Callable[[Inspection], None]) -> None:
return

def initialize(self, params: InitializeParams) -> None:
return
Expand Down

0 comments on commit a9e950c

Please sign in to comment.