From 8790825ffa209ba6890d5cb9f09749348cf33897 Mon Sep 17 00:00:00 2001 From: Mark Kemel Date: Sun, 13 Oct 2024 22:47:38 +0300 Subject: [PATCH] Make Agent Register request asynchronous The "Register" agent->controller method call could get stuck when the IP address configured as ControllerHost does not exist. This timeout of about 2 seconds could completely occupy the event loop and would not allow other calls on the DBus API of the Agent. Making the "Register" method call asynchronous we allow the event loop to stay unoccupied and be able to accept SwitchController request. Signed-off-by: Mark Kemel --- src/agent/agent.c | 133 +++++++++++++++++++++++------------ src/agent/agent.h | 4 +- tests/bluechi_test/config.py | 1 + 3 files changed, 93 insertions(+), 45 deletions(-) diff --git a/src/agent/agent.c b/src/agent/agent.c index 5c6725c06c..51ee5e0f92 100644 --- a/src/agent/agent.c +++ b/src/agent/agent.c @@ -169,6 +169,13 @@ static int agent_heartbeat_timer_callback(sd_event_source *event_source, UNUSED Agent *agent = (Agent *) userdata; int r = 0; + if (agent->connection_state == AGENT_CONNECTION_STATE_CONNECTING) { + bc_log_error("Agent connection attempt failed, retrying"); + if (agent->register_call_slot != NULL) { + sd_bus_slot_unrefp(&agent->register_call_slot); + } + agent->connection_state = AGENT_CONNECTION_STATE_RETRY; + } if (agent->connection_state == AGENT_CONNECTION_STATE_CONNECTED && agent_check_controller_liveness(agent)) { r = sd_bus_emit_signal( @@ -532,6 +539,9 @@ void agent_unref(Agent *agent) { sd_event_unrefp(&agent->event); } + if (agent->register_call_slot != NULL) { + sd_bus_slot_unrefp(&agent->register_call_slot); + } if (agent->metrics_slot != NULL) { sd_bus_slot_unrefp(&agent->metrics_slot); } @@ -2684,53 +2694,21 @@ void agent_stop(Agent *agent) { } } -static bool agent_connect(Agent *agent) { - peer_bus_close(agent->peer_dbus); - - bc_log_infof("Connecting to controller on %s", agent->orch_addr); - - agent->peer_dbus = peer_bus_open(agent->event, "peer-bus-to-controller", agent->orch_addr); - if (agent->peer_dbus == NULL) { - bc_log_error("Failed to open peer dbus"); - return false; - } - - bus_socket_set_options(agent->peer_dbus, agent->peer_socket_options); - - int r = sd_bus_add_object_vtable( - agent->peer_dbus, - NULL, - INTERNAL_AGENT_OBJECT_PATH, - INTERNAL_AGENT_INTERFACE, - internal_agent_vtable, - agent); - if (r < 0) { - bc_log_errorf("Failed to add agent vtable: %s", strerror(-r)); - return false; - } +static int agent_process_register_callback(sd_bus_message *m, Agent *agent) { + int r = 0; - _cleanup_sd_bus_message_ sd_bus_message *bus_msg = NULL; - sd_bus_error error = SD_BUS_ERROR_NULL; - r = sd_bus_call_method( - agent->peer_dbus, - BC_DBUS_NAME, - INTERNAL_CONTROLLER_OBJECT_PATH, - INTERNAL_CONTROLLER_INTERFACE, - "Register", - &error, - &bus_msg, - "s", - agent->name); - if (r < 0) { - bc_log_errorf("Registering as '%s' failed: %s", agent->name, error.message); - sd_bus_error_free(&error); - return false; + if (sd_bus_message_is_method_error(m, NULL)) { + bc_log_errorf("Registering as '%s' failed: %s", + agent->name, + sd_bus_message_get_error(m)->message); + return -EPERM; } - r = sd_bus_message_read(bus_msg, ""); + bc_log_info("Register call response received"); + r = sd_bus_message_read(m, ""); if (r < 0) { bc_log_errorf("Failed to parse response message: %s", strerror(-r)); - return false; + return r; } bc_log_infof("Connected to controller as '%s'", agent->name); @@ -2759,7 +2737,7 @@ static bool agent_connect(Agent *agent) { agent); if (r < 0) { bc_log_errorf("Failed to add heartbeat signal match: %s", strerror(-r)); - return false; + return r; } r = sd_bus_match_signal_async( @@ -2774,7 +2752,7 @@ static bool agent_connect(Agent *agent) { agent); if (r < 0) { bc_log_errorf("Failed to request match for Disconnected signal: %s", strerror(-r)); - return false; + return r; } /* re-emit ProxyNew signals */ @@ -2791,6 +2769,73 @@ static bool agent_connect(Agent *agent) { } } + return 0; +} + +static int agent_register_callback(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) { + Agent *agent = (Agent *) userdata; + + if (agent->connection_state != AGENT_CONNECTION_STATE_CONNECTING) { + bc_log_error("Agent is not in CONNECTING state, dropping Register callback"); + if (agent->register_call_slot != NULL) { + sd_bus_slot_unrefp(&agent->register_call_slot); + } + return -EPERM; + } + int r = 0; + r = agent_process_register_callback(m, agent); + if (r < 0) { + agent->connection_state = AGENT_CONNECTION_STATE_RETRY; + return r; + } + return 0; +} + +static bool agent_connect(Agent *agent) { + peer_bus_close(agent->peer_dbus); + + bc_log_infof("Connecting to controller on %s", agent->orch_addr); + agent->connection_state = AGENT_CONNECTION_STATE_CONNECTING; + + agent->peer_dbus = peer_bus_open(agent->event, "peer-bus-to-controller", agent->orch_addr); + if (agent->peer_dbus == NULL) { + bc_log_error("Failed to open peer dbus"); + return false; + } + + bus_socket_set_options(agent->peer_dbus, agent->peer_socket_options); + + int r = sd_bus_add_object_vtable( + agent->peer_dbus, + NULL, + INTERNAL_AGENT_OBJECT_PATH, + INTERNAL_AGENT_INTERFACE, + internal_agent_vtable, + agent); + if (r < 0) { + bc_log_errorf("Failed to add agent vtable: %s", strerror(-r)); + return false; + } + + _cleanup_sd_bus_message_ sd_bus_message *bus_msg = NULL; + sd_bus_error error = SD_BUS_ERROR_NULL; + r = sd_bus_call_method_async( + agent->peer_dbus, + NULL, // No slot needed since peer_dbus is closed on reconnect + BC_DBUS_NAME, + INTERNAL_CONTROLLER_OBJECT_PATH, + INTERNAL_CONTROLLER_INTERFACE, + "Register", + agent_register_callback, + agent, + "s", + agent->name); + if (r < 0) { + bc_log_errorf("Registering as '%s' failed: %s", agent->name, error.message); + sd_bus_error_free(&error); + return false; + } + return true; } diff --git a/src/agent/agent.h b/src/agent/agent.h index 9a41b9c821..6a8d7ba3a8 100644 --- a/src/agent/agent.h +++ b/src/agent/agent.h @@ -48,7 +48,8 @@ typedef struct JobTracker JobTracker; typedef enum { AGENT_CONNECTION_STATE_DISCONNECTED, AGENT_CONNECTION_STATE_CONNECTED, - AGENT_CONNECTION_STATE_RETRY + AGENT_CONNECTION_STATE_RETRY, + AGENT_CONNECTION_STATE_CONNECTING } AgentConnectionState; struct Agent { @@ -82,6 +83,7 @@ struct Agent { sd_bus *systemd_dbus; sd_bus *peer_dbus; + sd_bus_slot *register_call_slot; sd_bus_slot *metrics_slot; LIST_HEAD(SystemdRequest, outstanding_requests); diff --git a/tests/bluechi_test/config.py b/tests/bluechi_test/config.py index 4c1aeca24d..0f77ff1c00 100644 --- a/tests/bluechi_test/config.py +++ b/tests/bluechi_test/config.py @@ -117,6 +117,7 @@ def serialize(self) -> str: ControllerAddress={self.controller_address} HeartbeatInterval={self.heartbeat_interval} ControllerHeartbeatThreshold={self.controller_heartbeat_threshold} +RegisterCallTimeout={self.register_call_timeout} LogLevel={self.log_level} LogTarget={self.log_target} LogIsQuiet={self.log_is_quiet}