diff --git a/src/isar/config/settings.py b/src/isar/config/settings.py index fd6a4f6a..bf6dd5e5 100644 --- a/src/isar/config/settings.py +++ b/src/isar/config/settings.py @@ -140,6 +140,9 @@ def __init__(self) -> None: # Keyvault name KEYVAULT_NAME: str = Field(default="IsarDevKv") + # Determines whether inspections are uploaded asynchronously or get_inspections in robotinterface + UPLOAD_INSPECTIONS_ASYNC: bool = Field(default=False) + # URL to storage account for Azure Blob Storage BLOB_STORAGE_ACCOUNT_URL: str = Field( default="https://eqrobotdevstorage.blob.core.windows.net" diff --git a/src/isar/script.py b/src/isar/script.py index e3988ee1..a5b5f9ed 100644 --- a/src/isar/script.py +++ b/src/isar/script.py @@ -2,7 +2,7 @@ import time from logging import Logger from threading import Thread -from typing import Any, List +from typing import Any, List, Tuple from injector import Injector @@ -20,6 +20,8 @@ from isar.services.service_connections.mqtt.robot_info_publisher import ( RobotInfoPublisher, ) +from robot_interface.models.inspection.inspection import Inspection +from robot_interface.models.mission.mission import Mission from isar.state_machine.state_machine import StateMachine, main from isar.storage.uploader import Uploader from robot_interface.robot_interface import RobotInterface @@ -69,6 +71,7 @@ def print_setting(setting: str = "", value: Any = "", fillchar: str = " "): print_setting("Using local storage", settings.STORAGE_LOCAL_ENABLED) print_setting("Using blob storage", settings.STORAGE_BLOB_ENABLED) print_setting("Using SLIMM storage", settings.STORAGE_SLIMM_ENABLED) + print_setting("Using async inspection uploading", settings.UPLOAD_INSPECTIONS_ASYNC) print_setting("Plant code", settings.PLANT_CODE) print_setting("Plant name", settings.PLANT_NAME) print_setting("Plant shortname", settings.PLANT_SHORT_NAME) @@ -103,6 +106,16 @@ def start(): target=uploader.run, name="ISAR Uploader", daemon=True ) threads.append(uploader_thread) + + if settings.UPLOAD_INSPECTIONS_ASYNC: + def inspections_callback(inspection: Inspection): + message: Tuple[Inspection, Mission] = ( + inspection, + state_machine.current_mission, + ) + state_machine.queues.upload_queue.put(message) + robot.register_inspection_callback(inspections_callback) + if settings.MQTT_ENABLED: mqtt_client: MqttClient = MqttClient(mqtt_queue=queues.mqtt_queue) @@ -137,6 +150,7 @@ def start(): robot_name=settings.ROBOT_NAME, isar_id=settings.ISAR_ID, ) + if publishers: threads.extend(publishers) diff --git a/src/isar/state_machine/states/monitor.py b/src/isar/state_machine/states/monitor.py index b3eb4935..59dbcfba 100644 --- a/src/isar/state_machine/states/monitor.py +++ b/src/isar/state_machine/states/monitor.py @@ -111,7 +111,7 @@ def _run(self) -> None: self.state_machine.current_task.status = status - if self._should_upload_inspections(): + if not settings.UPLOAD_INSPECTIONS_ASYNC and self._should_upload_inspections(): get_inspection_thread = ThreadedRequest( self._queue_inspections_for_upload ) diff --git a/src/robot_interface/robot_interface.py b/src/robot_interface/robot_interface.py index fb374390..0fa20980 100644 --- a/src/robot_interface/robot_interface.py +++ b/src/robot_interface/robot_interface.py @@ -1,7 +1,7 @@ from abc import ABCMeta, abstractmethod from queue import Queue from threading import Thread -from typing import List +from typing import Callable, List from robot_interface.models.initialize import InitializeParams from robot_interface.models.inspection.inspection import Inspection @@ -180,6 +180,22 @@ def get_inspection(self, task: InspectionTask) -> Inspection: """ raise NotImplementedError + @abstractmethod + def register_inspection_callback(self, callback_function: Callable[[Inspection], None]) -> None: + """Register a function which should be run when inspection data is received + asynchronously. This function should expect to receive an Inspection from. + + Parameters + ---------- + callback_function : Callable[[Inspection] + + Returns + ------- + None + + """ + raise NotImplementedError + @abstractmethod def initialize(self, params: InitializeParams) -> None: """Initializes the robot. The initialization needed is robot dependent and the diff --git a/tests/isar/state_machine/test_state_machine.py b/tests/isar/state_machine/test_state_machine.py index 6cd5bacb..170d5734 100644 --- a/tests/isar/state_machine/test_state_machine.py +++ b/tests/isar/state_machine/test_state_machine.py @@ -8,6 +8,8 @@ from injector import Injector from pytest_mock import MockerFixture +from isar.config.settings import settings + from isar.mission_planner.local_planner import LocalPlanner from isar.models.communication.queues.queues import Queues from isar.services.utilities.scheduling_utilities import SchedulingUtilities @@ -31,6 +33,7 @@ class StateMachineThread(object): def __init__(self, injector) -> None: + settings.UPLOAD_INSPECTIONS_ASYNC = False self.injector: Injector = injector self.state_machine: StateMachine = injector.get(StateMachine) self._thread: Thread = Thread(target=main, args=[self.state_machine]) @@ -42,6 +45,7 @@ def start(self): class UploaderThread(object): def __init__(self, injector) -> None: + settings.UPLOAD_INSPECTIONS_ASYNC = False self.injector: Injector = injector self.uploader: Uploader = Uploader( queues=self.injector.get(Queues), diff --git a/tests/mocks/robot_interface.py b/tests/mocks/robot_interface.py index 2c8231f4..ea88762e 100644 --- a/tests/mocks/robot_interface.py +++ b/tests/mocks/robot_interface.py @@ -2,7 +2,7 @@ from datetime import datetime from queue import Queue from threading import Thread -from typing import List, Sequence +from typing import Callable, List, Sequence from alitra import Frame, Orientation, Pose, Position @@ -61,6 +61,9 @@ def get_inspection(self, task: InspectionTask) -> Inspection: image: Image = Image(mock_image_metadata()) image.data = b"Some binary image data" return image + + def register_inspection_callback(self, callback_function: Callable[[Inspection], None]) -> None: + return def initialize(self, params: InitializeParams) -> None: return