Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Update to NodeUpdate #560

Merged
merged 9 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion inbm-lib/inbm_common_lib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
EVENT_CHANNEL = 'manageability/event'
TELEMETRY_CHANNEL = 'manageability/telemetry'
# Used for Node updates to be sent to UDM
UPDATE_CHANNEL = 'manageability/update'
NODE_UPDATE_CHANNEL = 'manageability/nodeupdate'
CONFIG_CHANNEL = 'ma/configuration/update/'

# Request constants
Expand Down
2 changes: 1 addition & 1 deletion inbm/cloudadapter-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The agent subscribes to the following topics:
- Agent events: `manageability/event`
- Responses: `manageability/response`
- Device telemetry: `manageability/telemetry`
- Update from scheduled requests: `manageability/update`
- Update from scheduled requests: `manageability/nodeupdate`

❗`+` is a wild-card indicating single level thus matching `diagnostic/state` or `<another-agent>/state`

Expand Down
4 changes: 2 additions & 2 deletions inbm/cloudadapter-agent/cloudadapter/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ def _bind_agent_to_cloud(self) -> None:
lambda _, payload: self._cloud_publisher.publish_event(payload)
)
self._broker.bind_callback(
TC_TOPIC.UPDATE,
lambda _, payload: self._cloud_publisher.publish_update(payload)
TC_TOPIC.NODE_UPDATE,
lambda _, payload: self._cloud_publisher.publish_node_update(payload)
)

def _bind_ucc_to_agent(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ def bind_callback(self, name: str, callback: Callable) -> None:
"""
pass

def publish_update(self, message: str) -> None:
def publish_node_update(self, message: str) -> None:
"""Publishes an update to the cloud

@param message: (str) The update message to send
@exception PublishError: If publish fails
"""
self._client.publish_update("update", message)
self._client.publish_node_update("update", message)

def publish_event(self, message: str) -> None:
"""Publishes an event to the cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@
class CloudClient:

def __init__(self, connection: MQTTConnection, telemetry: OneWayMessenger, event: OneWayMessenger,
update: OneWayMessenger | None, attribute: OneWayMessenger,
node_update: OneWayMessenger | None, attribute: OneWayMessenger,
handler: ReceiveRespondHandler) -> None:
"""Constructor for CloudClient

@param connection: Connection associated with this CloudClient
@param telemetry: Messenger to send telemetry
@param event: Messenger to send events
@param update: Messenger to send updates
@param node_update: Messenger to send node updates
@param attribute: Messenger to send attributes
@param handler: Handler to deal with cloud method calls
"""
self._connection = connection
self._telemetry = telemetry
self._event = event
self._update: OneWayMessenger | None = update
self._node_update: OneWayMessenger | None = node_update
self._attribute = attribute
self._handler = handler

Expand Down Expand Up @@ -57,18 +57,18 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None:
"""
return self._telemetry.publish(key, value, time)

def publish_update(self, key: str, value: str) -> None:
def publish_node_update(self, key: str, value: str) -> None:
"""Publishes an update to the cloud

@param key: key to publish
@param value: update to publish
@exception PublishError: If publish fails
"""
if self._update is None:
if self._node_update is None:
logger.error("Received update publish request but no update messenger is configured")
return None
else:
return self._update.publish(key, value)
return self._node_update.publish(key, value)

def publish_event(self, key: str, value: str) -> None:
"""Publishes an event to the cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import random
import logging
import threading
import uuid
from google.protobuf.timestamp_pb2 import Timestamp
from typing import Callable, Optional
from datetime import datetime
Expand All @@ -26,6 +27,7 @@
import grpc # type: ignore
from .cloud_client import CloudClient


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -116,15 +118,15 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None:

pass # INBS is not yet ready to receive telemetry

def publish_update(self, key: str, value: str) -> None:
"""Publishes an update to the cloud
def publish_node_update(self, key: str, value: str) -> None:
"""Publishes a node update to the cloud

@param key: key to publish
@param value: node update message to publish
@exception PublishError: If publish fails
"""
if self._grpc_channel is None:
raise PublishError("gRPC channel not set up before calling InbsCloudClient.publish_update")
raise PublishError("gRPC channel not set up before calling InbsCloudClient.publish_node_update")

is_valid = is_valid_json_structure(value, NODE_UPDATE_JSON_SCHEMA_LOCATION)
if not is_valid:
Expand Down Expand Up @@ -156,10 +158,9 @@ def publish_update(self, key: str, value: str) -> None:
actual_end_time=timestamp,
job_state=job_state
)


request = inbs_sb_pb2.SendNodeUpdateRequest(
request_id="notused",
request_id=str(uuid.uuid4()),
job_update=job,
)
logger.debug(f"Sending node update to INBS: request={request}")
Expand Down Expand Up @@ -234,7 +235,6 @@ def _handle_inbm_command_request(
)
continue


if command_type:
if command_type == "update_scheduled_operations":
# Convert operations to Dispatcher's ScheduleRequest
Expand Down
10 changes: 5 additions & 5 deletions inbm/cloudadapter-agent/cloudadapter/cloud/cloud_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def build_messenger_with_config(config: Dict[str, Any]):
telemetry = config.get("telemetry")
attribute = config.get("attribute")
event = config.get("event")
update = config.get("update")
node_update = config.get("node_update")

if telemetry:
telemetry = build_messenger_with_config(telemetry)
Expand All @@ -155,10 +155,10 @@ def build_messenger_with_config(config: Dict[str, Any]):
else:
raise ClientBuildError(
"Missing 'attribute' MQTT config information while setting up cloud connection.")
if update:
update = build_messenger_with_config(update)
if node_update:
node_update = build_messenger_with_config(node_update)
else:
logger.debug("Missing 'update' MQTT config information while setting up cloud connection. TODO: figure out why this doesn't happen for INBS, but does happen for other clouds.")
logger.debug("Missing 'node_update' MQTT config information while setting up cloud connection. TODO: figure out why this doesn't happen for INBS, but does happen for other clouds.")
if event:
event = build_messenger_with_config(event)
else:
Expand Down Expand Up @@ -205,6 +205,6 @@ def build_messenger_with_config(config: Dict[str, Any]):
connection=connection,
telemetry=telemetry,
event=event,
update=update,
node_update=node_update,
attribute=attribute,
handler=handler)
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ def publish_telemetry(self, message: str) -> None:
except PublishError as e:
logger.error(str(e))

def publish_update(self, message: str) -> None:
def publish_node_update(self, message: str) -> None:
"""Send node update to UDM

@param message: (str) JSON formatted SendNodeUpdateRequest
"""
logger.debug(f"Received node update: {message}")
try:
self._adapter.publish_update(message)
self._adapter.publish_node_update(message)
except PublishError as e:
logger.error(str(e))
4 changes: 2 additions & 2 deletions inbm/cloudadapter-agent/cloudadapter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
SPDX-License-Identifier: Apache-2.0
"""

from inbm_common_lib.constants import UPDATE_CHANNEL, TELEMETRY_CHANNEL, RESPONSE_CHANNEL, EVENT_CHANNEL
from inbm_common_lib.constants import NODE_UPDATE_CHANNEL, TELEMETRY_CHANNEL, RESPONSE_CHANNEL, EVENT_CHANNEL
from inbm_lib.constants import DOCKER_STATS
from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_ETC_PATH_PREFIX
from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_SHARE_PATH_PREFIX, BROKER_ETC_PATH
Expand Down Expand Up @@ -39,7 +39,7 @@ class TC_TOPIC:
STATE = tuple([STATE_CHANNEL])
TELEMETRY = tuple([TELEMETRY_CHANNEL]) # Shared by TC and UCC
EVENT = tuple([EVENT_CHANNEL, RESPONSE_CHANNEL]) # TODO: What's up with response?
UPDATE = tuple([UPDATE_CHANNEL]) # Used for Node updates to be sent to UDM
NODE_UPDATE = tuple([NODE_UPDATE_CHANNEL]) # Used for Node updates to be sent to UDM

# ========== Publishing channels

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ def setUp(self) -> None:
self.mock_telemetry = mock.create_autospec(Messenger)
self.mock_attribute = mock.create_autospec(Messenger)
self.mock_event = mock.create_autospec(Messenger)
self.mock_update = mock.create_autospec(Messenger)
self.mock_node_update = mock.create_autospec(Messenger)
self.mock_handler = mock.create_autospec(Handler)

self.cloud_client = CloudClient(
connection=self.mock_connection,
telemetry=self.mock_telemetry,
event=self.mock_event,
update=self.mock_update,
node_update=self.mock_node_update,
attribute=self.mock_attribute,
handler=self.mock_handler
)
Expand All @@ -47,10 +47,10 @@ def test_publish_attribute_succeeds(self) -> None:
self.cloud_client.publish_attribute(*args)
assert self.mock_attribute.publish.call_count == 1

def test_publish_update_succeeds(self) -> None:
def test_publish_node_update_succeeds(self) -> None:
args = ("key", "value")
self.cloud_client.publish_update(*args)
assert self.mock_update.publish.call_count == 1
self.cloud_client.publish_node_update(*args)
assert self.mock_node_update.publish.call_count == 1

def test_publish_event_succeeds(self) -> None:
args = ("key", "value")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import threading
import pytest
from mock import MagicMock, Mock, patch
import queue
Expand Down Expand Up @@ -52,7 +51,7 @@ def test_publish_telemetry(self, inbs_client: InbsCloudClient) -> None:
key="example_key", value="example_value", time=datetime.now()
)

def test_publish_update(self, inbs_client: InbsCloudClient) -> None:
def test_publish_node_update(self, inbs_client: InbsCloudClient) -> None:
mock_channel = MagicMock()
mock_channel.SendNodeUpdateRequest.return_value = "MockResponse"
inbs_client._grpc_channel = mock_channel
Expand All @@ -62,7 +61,7 @@ def test_publish_update(self, inbs_client: InbsCloudClient) -> None:

# Call the publish_update method
with patch('cloudadapter.cloud.client.inbs_cloud_client.is_valid_json_structure', return_value=True):
inbs_client.publish_update(key, value)
inbs_client.publish_node_update(key, value)

# Assert that the gRPC channel's SendNodeUpdate method was called
mock_channel.SendNodeUpdate.assert_called_once()
Expand All @@ -76,9 +75,9 @@ def test_publish_update_failure_no_grpc_channel(self, inbs_client: InbsCloudClie
key = 'test-key'
value = '{"job_id": "12345", "status": 200, "message": "Update successful"}'

# Call the publish_update method and expect a PublishError
# Call the publish_node_update method and expect a PublishError
with pytest.raises(PublishError):
inbs_client.publish_update(key, value)
inbs_client.publish_node_update(key, value)

def test_publish_event(self, inbs_client: InbsCloudClient) -> None:
# this is not expected to do anything yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def setUp(self) -> None:
"pub": "event_pub",
"format": "event_format"
},
"update": {
"pub": "update_pub",
"format": "update_format"
"node_update": {
"pub": "node_update_pub",
"format": "node_update_format"
},
"telemetry": {
"pub": "telemetry_pub",
Expand Down Expand Up @@ -93,9 +93,9 @@ def setUp(self) -> None:
"pub": "event_pub",
"format": "event_format"
},
"update": {
"pub": "update_pub",
"format": "update_format"
"node_update": {
"pub": "node_update_pub",
"format": "node_update_format"
},
"command": {
"pub": "manageability/request/command",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ def setUp(self, MockedAdapter, mock_logger) -> None:
self.MockedAdapter = MockedAdapter
self.cloud_publisher = CloudPublisher(self.MockedAdapter("config"))

def test_publish_update_succeed(self) -> None:
def test_publish_node_update_succeed(self) -> None:
update = "update"
self.cloud_publisher.publish_update(update)
self.cloud_publisher.publish_node_update(update)

mocked = self.MockedAdapter.return_value
mocked.publish_update.assert_called_once_with(update)
mocked.publish_node_update.assert_called_once_with(update)

@mock.patch("cloudadapter.cloud.cloud_publisher.logger")
def test_publish_update_with_adapter__succeeds(self, mock_logger) -> None:
self.MockedAdapter.return_value.publish_update.return_value = None
self.cloud_publisher.publish_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}')
def test_publish_node_update_with_adapter__succeeds(self, mock_logger) -> None:
self.MockedAdapter.return_value.publish_node_update.return_value = None
self.cloud_publisher.publish_node_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}')
assert mock_logger.error.call_count == 0

@mock.patch("cloudadapter.cloud.cloud_publisher.logger")
def test_publish_update_with_adapter_fails(self, mock_logger) -> None:
self.MockedAdapter.return_value.publish_update.side_effect = PublishError("Error!")
self.cloud_publisher.publish_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}')
def test_publish_node_update_with_adapter_fails(self, mock_logger) -> None:
self.MockedAdapter.return_value.publish_node_update.side_effect = PublishError("Error!")
self.cloud_publisher.publish_node_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}')
assert mock_logger.error.call_count == 1

def test_publish_event_succeed(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions inbm/dispatcher-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The agent publishes to the following topics:
- Dynamic telemetry updates: `telemetry/update`
- Informs diagnostic-agent remediation manager to remove a specific container: `remediation/container`
- Informs diagnostic-agent remediation manager to remove a specific image:`remediation/image`
- Sends the result of a scheduled request received from UDMScheduled node update. This result is actually sent as a request to cloudadapter: `manageability/nodeupdate`
- dispatcher-agent state: dispatcher/state` when dead/running


Expand Down
2 changes: 1 addition & 1 deletion inbm/dispatcher-agent/dispatcher/aota/aota_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from dispatcher.common.result_constants import INSTALL_FAILURE, CODE_OK
from dispatcher.config_dbs import ConfigDbs
from dispatcher.constants import TELEMETRY_UPDATE_CHANNEL, UMASK_OTA
from dispatcher.constants import UMASK_OTA
from dispatcher.packageinstaller.package_installer import TrtlContainer
from dispatcher.packagemanager.local_repo import DirectoryRepo
from dispatcher.packagemanager.package_manager import get
Expand Down
8 changes: 4 additions & 4 deletions inbm/dispatcher-agent/dispatcher/dispatcher_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from inbm_lib.json_validator import is_valid_json_structure
from inbm_lib.constants import NODE_UPDATE_JSON_SCHEMA_LOCATION

from inbm_common_lib.constants import RESPONSE_CHANNEL, EVENT_CHANNEL, UPDATE_CHANNEL
from inbm_common_lib.constants import RESPONSE_CHANNEL, EVENT_CHANNEL, NODE_UPDATE_CHANNEL

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,9 +48,9 @@ def send_update(self, message: str) -> None:
@param message: message to be published to cloud
@param job_id: Job ID used to track the request in both UDM and TC
"""
logger.debug(f"Sending node update for to {UPDATE_CHANNEL} with message: {message}")
self.mqtt_publish(topic=UPDATE_CHANNEL, payload=message)
logger.debug(f"Sending node update for to {NODE_UPDATE_CHANNEL} with message: {message}")
self.mqtt_publish(topic=NODE_UPDATE_CHANNEL, payload=message)

def _check_db_for_started_job(self) -> Optional[Schedule]:
sqliteMgr = SqliteManager()
schedule = sqliteMgr.get_any_started_schedule()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ topic write manageability/event
topic write manageability/cmd/+
topic write manageability/response
topic write manageability/response/+
topic write manageability/update
topic write manageability/nodeupdate
topic write ma/configuration/update/+
topic write dispatcher/query

Expand All @@ -23,7 +23,7 @@ topic read manageability/response
topic read manageability/response/+
topic read manageability/telemetry
topic read manageability/event
topic read manageability/update
topic read manageability/nodeupdate

user inbc-program
topic write manageability/request/#
Expand Down