Skip to content

Commit

Permalink
Merge pull request #450 from nlrcomcast/syncretry
Browse files Browse the repository at this point in the history
Add new RBUS event for cloud_conn_online to retry webpa sync notifications
  • Loading branch information
sadhyama authored Aug 16, 2024
2 parents 05e994e + 6653350 commit 8003466
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,11 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status());
if(!connection_init)
{
#ifdef ENABLE_WEBCFGBIN
//Sending cloud connection online event only during reconnect
ParodusInfo("Sending cloud connection online event after reconnection\n");
SendConnOnlineEvent();
#endif
int chk_ret = creat("/tmp/webpanotifyready",S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (chk_ret == -1)
{
Expand Down
1 change: 1 addition & 0 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ int main( int argc, char **argv)
registerRbusLogger();
subscribeRBUSevent();
regXmidtSendDataMethod();
regConnOnlineEvent();
#endif
setDefaultValuesToCfg(cfg);
if (0 != parseCommandLine(argc,argv,cfg)) {
Expand Down
9 changes: 8 additions & 1 deletion src/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

#include <pthread.h>
#include <wrp-c.h>
#ifdef ENABLE_WEBCFGBIN
#include <rbus.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
Expand All @@ -43,7 +46,11 @@ typedef struct UpStreamMsg__
/*----------------------------------------------------------------------------*/
/* Function Prototypes */
/*----------------------------------------------------------------------------*/

#ifdef ENABLE_WEBCFGBIN
int regConnOnlineEvent();
rbusError_t SendConnOnlineEvent();
rbusError_t CloudConnSubscribeHandler(rbusHandle_t handle, rbusEventSubAction_t action, const char* eventName, rbusFilter_t filter, int32_t interval, bool* autoPublish);
#endif
void packMetaData();
void *handle_upstream();
void *processUpstreamMessage();
Expand Down
88 changes: 88 additions & 0 deletions src/upstream_rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
#include "heartBeat.h"

#define WEBCFG_UPSTREAM_EVENT "Webconfig.Upstream"
#define CLOUD_CONN_ONLINE "cloud_conn_online_event"
#ifdef WAN_FAILOVER_SUPPORTED
#define WEBPA_INTERFACE "Device.X_RDK_WanManager.CurrentActiveInterface"
#endif

int cloud_online_subscribe = 0;

rbusHandle_t rbus_Handle;
rbusError_t err;

Expand Down Expand Up @@ -241,3 +244,88 @@ void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rb
}
}
#endif

rbusError_t SendConnOnlineEvent()
{
rbusError_t rc = RBUS_ERROR_BUS_ERROR;
if(cloud_online_subscribe)
{
rbusEvent_t event = {0};
rbusObject_t data;
rbusValue_t value;

ParodusInfo("publishing cloud connection online Event\n");

rbusValue_Init(&value);
rbusValue_SetInt32(value, 1);

rbusObject_Init(&data, NULL);
rbusObject_SetValue(data, "value", value);

event.name = CLOUD_CONN_ONLINE;
event.data = data;
event.type = RBUS_EVENT_GENERAL;

rc = rbusEvent_Publish(rbus_Handle, &event);

rbusValue_Release(value);
rbusObject_Release(data);

if(rc != RBUS_ERROR_SUCCESS)
ParodusError("provider: rbusEvent_Publish cloud connection online event failed: %d\n", rc);
}
return rc;
}


rbusError_t CloudConnSubscribeHandler(rbusHandle_t handle, rbusEventSubAction_t action, const char* eventName, rbusFilter_t filter, int32_t interval, bool* autoPublish)
{
(void)handle;
(void)filter;
(void)autoPublish;
(void)interval;

if(eventName == NULL)
{
ParodusError("CloudConnSubscribeHandler: event name is NULL\n");
return RBUS_ERROR_INVALID_INPUT;
}

ParodusInfo("CloudConnSubscribeHandler: action=%s eventName=%s\n", action == RBUS_EVENT_ACTION_SUBSCRIBE ? "subscribe" : "unsubscribe", eventName);

if(!strcmp(CLOUD_CONN_ONLINE, eventName))
{
cloud_online_subscribe = action == RBUS_EVENT_ACTION_SUBSCRIBE ? 1 : 0;
}
else
{
ParodusError("provider: CloudConnSubscribeHandler unexpected eventName %s\n", eventName);
}

return RBUS_ERROR_SUCCESS;
}


int regConnOnlineEvent()
{
rbusError_t ret = RBUS_ERROR_SUCCESS;
rbusDataElement_t SyncRetryElements[1] = {{CLOUD_CONN_ONLINE, RBUS_ELEMENT_TYPE_EVENT, {NULL, NULL, NULL, NULL, CloudConnSubscribeHandler, NULL}}};

ParodusInfo("Registering rbus event %s\n", CLOUD_CONN_ONLINE);
if(!rbus_Handle)
{
ParodusError("regConnOnlineEvent failed as rbus_handle is empty\n");
return -1;
}
ret = rbus_regDataElements(rbus_Handle, 1, SyncRetryElements);
if(ret == RBUS_ERROR_SUCCESS)
{
ParodusInfo("Registered cloud connection online event\n");
}
else
{
ParodusError("Failed to register cloud connection online event %d\n", ret);
}
return ret;
}

13 changes: 13 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,19 @@ target_link_libraries (test_upstream_sock -lcmocka gcov -lcunit -lcimplog
-lwrp-c -luuid -lpthread -lmsgpackc -lnopoll -lnanomsg
-Wl,--no-as-needed -lcjson -lcjwt -ltrower-base64
-lssl -lcrypto -lrt -lm)

#-------------------------------------------------------------------------------
# test_upstream_rbus
#-------------------------------------------------------------------------------
if(ENABLE_WEBCFGBIN)
add_test(NAME test_upstream_rbus COMMAND ${MEMORY_CHECK} ./test_upstream_rbus)
add_executable(test_upstream_rbus test_upstream_rbus.c ../src/upstream_rbus.c)
target_link_libraries (test_upstream_rbus -lcmocka gcov -lcunit -lcimplog
-lwrp-c -luuid -lpthread -lmsgpackc -lnopoll
-Wl,--no-as-needed -lcjson -lcjwt -ltrower-base64
-lssl -lcrypto -lrt -lm -lnanomsg -lrbus)
endif(ENABLE_WEBCFGBIN)


#-------------------------------------------------------------------------------
# test_downstream
Expand Down
9 changes: 9 additions & 0 deletions tests/test_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#include <assert.h>
#include <nopoll.h>
#include <pthread.h>
#ifdef ENABLE_WEBCFGBIN
#include <rbus.h>
#endif

#include "../src/ParodusInternal.h"
#include "../src/connection.h"
Expand Down Expand Up @@ -217,6 +220,12 @@ void setMessageHandlers()
{
}

#ifdef ENABLE_WEBCFGBIN
rbusError_t SendConnOnlineEvent()
{
return;
}
#endif
int allow_insecure_conn (char **server_addr, unsigned int *port)
{
int rtn;
Expand Down
165 changes: 165 additions & 0 deletions tests/test_upstream_rbus.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* Copyright 2010-2016 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <rbus.h>
#include <CUnit/Basic.h>
#include "../src/upstream.h"
#include "../src/ParodusInternal.h"

#define CLOUD_CONN_ONLINE "cloud_conn_online_event"

rbusHandle_t handle;

int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds)
{
return;
}

int sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
{
return;
}
//Test case for rbusRegCloudConnectionOnlineEventRbushandle failure
void test_regConnOnlineEventRbushandle_failure()
{
int result = regConnOnlineEvent();
CU_ASSERT_EQUAL(result, -1);
}

//Test case for rbusRegCloudConnectionOnlineEvent success
void test_regConnOnlineEvent_success()
{
subscribeRBUSevent();
int result = regConnOnlineEvent();
CU_ASSERT_EQUAL(result, 0);
}

//Test case for rbusRegCloudConnectionOnlineEvent failure
void test_regConnOnlineEvent_failure()
{
//Register event two time and it will create error
int result = regConnOnlineEvent();
result = regConnOnlineEvent();
CU_ASSERT_NOT_EQUAL(result, 0);
}

//Test function for CloudConnSubscribeHandler success
void test_CloudConnSubscribeHandler_success()
{
rbusError_t ret = RBUS_ERROR_BUS_ERROR;
ret = CloudConnSubscribeHandler(handle, RBUS_EVENT_ACTION_SUBSCRIBE , CLOUD_CONN_ONLINE, NULL, 0, false);
CU_ASSERT_EQUAL(ret, 0);
}

static void subscribeEventSuccessCallbackHandler(
rbusHandle_t handle,
rbusEvent_t const* event,
rbusEventSubscription_t* subscription)
{
rbusValue_t new_value;
new_value = rbusObject_GetValue(event->data, "value");
int incoming_value = rbusValue_GetInt32(new_value);
ParodusInfo("incoming_value is %d\n", incoming_value);

if(incoming_value == 1)
{
ParodusInfo("rbusEvent_OnSubscribe callback received successfully\n");
CU_ASSERT(1);
}
(void)handle;
(void)subscription;
}

void subscribe_to_event(char * eventname)
{
int rc = RBUS_ERROR_SUCCESS;

ParodusInfo("rbus_open for component %s\n", "consumer");
rc = rbus_open(&handle, "consumer");
if(rc != RBUS_ERROR_SUCCESS)
{
CU_FAIL("rbus_open failed for subscribe_to_event");
}

if(strncmp(eventname, CLOUD_CONN_ONLINE, strlen(CLOUD_CONN_ONLINE)) == 0)
{
ParodusInfo("Inside subscribe_to_event for %s and eventname is %s\n", CLOUD_CONN_ONLINE, eventname);
rc = rbusEvent_Subscribe(handle, eventname, subscribeEventSuccessCallbackHandler, NULL, 0);
}

if(rc != RBUS_ERROR_SUCCESS)
CU_FAIL("subscribe_to_event onsubscribe event failed");
}

void rbushandleclose(char * name)
{
rbusEvent_Unsubscribe(handle, name);
rbus_close(handle);
}

//Test case for SendRbusEventCloudConnOnline Success
void test_SendConnOnlineEvent_success()
{
subscribe_to_event(CLOUD_CONN_ONLINE);
rbusError_t ret = SendConnOnlineEvent();
CU_ASSERT_EQUAL(ret, 0);
rbushandleclose(CLOUD_CONN_ONLINE);
sleep(1);
}

//Test case for SendRbusEventCloudConnOnline failure
void test_SendConnOnlineEvent_failure()
{
rbusError_t ret = SendConnOnlineEvent();
CU_ASSERT_NOT_EQUAL(ret, 0);
}

void add_suites( CU_pSuite *suite )
{
*suite = CU_add_suite( "tests", NULL, NULL );
CU_add_test( *suite, "test regConnOnlineEventRbushandle_failure", test_regConnOnlineEventRbushandle_failure);
CU_add_test( *suite, "test regConnOnlineEvent_success", test_regConnOnlineEvent_success);
CU_add_test( *suite, "test regConnOnlineEvent_failure", test_regConnOnlineEvent_failure);
CU_add_test( *suite, "test CloudConnSubscribeHandler_success", test_CloudConnSubscribeHandler_success);
CU_add_test( *suite, "test SendConnOnlineEvent_success", test_SendConnOnlineEvent_success);
CU_add_test( *suite, "test SendConnOnlineEvent_failure", test_SendConnOnlineEvent_failure);
}


int main( int argc, char *argv[] )
{
unsigned rv = 1;
CU_pSuite suite = NULL;

(void ) argc;
(void ) argv;

if( CUE_SUCCESS == CU_initialize_registry() ) {
add_suites( &suite );
if( NULL != suite ) {
CU_basic_set_mode( CU_BRM_VERBOSE );
CU_basic_run_tests();
printf( "\n" );
CU_basic_show_failures( CU_get_failure_list() );
printf( "\n\n" );
rv = CU_get_number_of_tests_failed();
}

CU_cleanup_registry();
}
return rv;
}

0 comments on commit 8003466

Please sign in to comment.