Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deepstream 5.0 Subscription Implementation #1

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2e29abb
Add msgapi_subscribe implementation
Shubhamturakhia Apr 5, 2021
2a7a3d6
Modify broker.c file
Shubhamturakhia Apr 6, 2021
9a5bc7a
Added subcribe implementation
Shubhamturakhia Apr 7, 2021
b2c6940
Added Subscribe implementation
Shubhamturakhia Apr 7, 2021
bf90453
Minor changes
Shubhamturakhia Apr 7, 2021
67dffec
Changes according to PR review
Shubhamturakhia Apr 7, 2021
5ca5006
Add callback wrapper
Shubhamturakhia Apr 8, 2021
4821a94
Added unsubscribe feature
Shubhamturakhia Apr 12, 2021
adc30df
Changes as per PR review
Shubhamturakhia Apr 12, 2021
f8f0099
Minor change
Shubhamturakhia Apr 14, 2021
7aa84e7
Minor change
Shubhamturakhia Apr 14, 2021
9a8ff42
Fix formatting
Shubhamturakhia Apr 14, 2021
bd3310d
Update aws_protocol_adaptor/device_client/aws_nvmsgbroker.c
Shubhamturakhia Apr 14, 2021
7e4b8d9
Add msgapi connection signature mwthod
Shubhamturakhia Apr 15, 2021
f2cd2b9
Merge branch 'DS-5.0-subscription-implementation' of https://github.c…
Shubhamturakhia Apr 15, 2021
3707004
added connection signature method
Shubhamturakhia Apr 16, 2021
2fd20b1
Minor changes
Shubhamturakhia Apr 16, 2021
072dfbb
Modified utilities for conn signature
Shubhamturakhia Apr 16, 2021
93deacc
Formatting and minor refactoring
Shubhamturakhia Apr 19, 2021
9c1449a
Changes as per PR review
Shubhamturakhia Apr 20, 2021
fa94cc3
Modifications for Hash code
Shubhamturakhia Apr 21, 2021
525ee45
Changes as per review
Shubhamturakhia Apr 21, 2021
579d7b4
Minor indentation change
Shubhamturakhia Apr 21, 2021
d330130
Hash code refactoring
Shubhamturakhia Apr 22, 2021
819cdc0
Add hash code implementation for config file
Shubhamturakhia Apr 23, 2021
0bac762
Fix Incorrect Return Types
lummish Apr 23, 2021
ef454fb
Minor changes
Shubhamturakhia Apr 23, 2021
9c61435
Resolve Compilation Errors
lummish Apr 23, 2021
d9fa5a6
Link OpenSSL in Makefile
lummish Apr 23, 2021
54fff91
Statically Link to libssl and libcrypto
lummish Apr 27, 2021
c99d2de
Reorder SSL Linkages
lummish Apr 27, 2021
0c7895e
Reorder Linkage to SSL Libraries
lummish Apr 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions aws_protocol_adaptor/device_client/aws_nvmsgbroker.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ NvDsMsgApiHandle nvds_msgapi_connect(char *connection_str, nvds_msgapi_connect_c
disconnect_cb = connect_cb;
if (config_path == NULL)
{
IOT_ERROR("Essensial args missing for function nvds_msgapi_connect\n");
IOT_ERROR("Essential args missing for function nvds_msgapi_connect\n");
return NULL;
}

Expand Down Expand Up @@ -142,7 +142,7 @@ NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr)
{
if ((h_ptr == NULL))
{
IOT_ERROR("Essensial args missing for function nvds_msgapi_disconnect\n");
IOT_ERROR("Essential args missing for function nvds_msgapi_disconnect\n");
return NVDS_MSGAPI_ERR;
}
IoT_Error_t rc = FAILURE;
Expand Down Expand Up @@ -180,7 +180,7 @@ NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle conn, char *topic, const u
{
if ((conn == NULL) || (topic == NULL) || (payload == NULL) || (nbuf == 0))
{
IOT_ERROR("Essensial args missing for function nvds_msgapi_send\n");
IOT_ERROR("Essential args missing for function nvds_msgapi_send\n");
return NVDS_MSGAPI_ERR;
}
AWS_IoT_Client *client = (AWS_IoT_Client *)conn;
Expand Down Expand Up @@ -217,7 +217,7 @@ NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic,
{
if ((h_ptr == NULL) || (topic == NULL) || (payload == NULL) || (nbuf == 0))
{
IOT_ERROR("Essensial args missing for function nvds_msgapi_send: %d, %d, %d, %d\n", (h_ptr == NULL), (topic == NULL), (payload == NULL), (nbuf == 0));
IOT_ERROR("Essential args missing for function nvds_msgapi_send: %d, %d, %d, %d\n", (h_ptr == NULL), (topic == NULL), (payload == NULL), (nbuf == 0));
return NVDS_MSGAPI_ERR;
}
Work *work_node = g_malloc(sizeof(Work));
Expand All @@ -243,6 +243,35 @@ NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic,
return NVDS_MSGAPI_OK;
}

NvDsMsgApiErrorType nvds_msgapi_subscribe(NvDsMsgApiHandle h_ptr, char **topics, int num_topics, nvds_msgapi_subscribe_request_cb_t cb, void *user_ctx)
{
IOT_INFO("Subscribe called\n");

if ((h_ptr == NULL) || (topics == NULL) || (num_topics <= 0))
{
IOT_ERROR("Essential args missing for function nvds_msgapi_subscribe: %d, %d, %d\n", (h_ptr == NULL), (topics == NULL), (num_topics == NULL);
return NVDS_MSGAPI_ERR;
}
if (!cb)
{
IOT_ERROR("Callback function for nvds_msgapi_subscribe cannot be NULL\n");
return NVDS_MSGAPI_ERR;
}
IoT_Error_t rc = FAILURE;
AWS_IoT_Client *client = (AWS_IoT_Client *)h_ptr;
for (int i = 0; i < num_topics; i++)
{
rc = aws_iot_mqtt_subscribe(client, topics[i], strlen(topics[i]), QOS0, cb, user_ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not compile. cb is not the correct type. You will likely need to somehow wrap cb in a function that implements the correct type expected by aws_iot_mqtt_subscribe.

Recall that this is the callback type accepted by aws_iot_mqtt_subscribe:

typedef void(* pApplicationHandler_t) (AWS_IoT_Client *pClient, char *pTopicName, uint16_t topicNameLenIoT_Publish_Message_Params *pParams, void *pClientData)

And this is the callback type accepted by nvds_msgapi_subscribe:

typedef void(* nvds_msgapi_subscribe_request_cb_t)(NvDsMsgApiErrorType flag, void *msg, int msg_len, char *topic, void *user_ptr)

So we'll need to create a function of type *pApplicationHandler_t that calls cb.

Something like this (not this exactly) would work:

void nvds_msgapi_subscribe_cb_wrapped(AWS_IoT_Client *pClient, char *pTopicName, uint16_t topicNameLenIoT_Publish_Message_Params *pParams, void *pClientData)
{
    cb(NVDS_MSGAPI_OK, pParams->payload, pParams->payloadLen, pTopicName, pClientData);
}

The trick here is determining how to create this wrapped callback. You will likely need to have a static variable that stores a pointer to such a function, then you can set the value of this static variable when the subscribe method is called for the first time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually looking for way of wrapping the callback, thanks for the above explanation

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Shubhamturakhia Here's an idea:

Define a static variable nvds_cb that we will use to store the callback provided by a user:

static nvds_msgapi_subscribe_request_cb_t nvds_cb;

Define a function that implements the type of callbacks expected by the mqtt subscribe function:

void nvds_cb_wrapped(AWS_IoT_Client *pClient, char *pTopicName, uint16_t topicNameLen, IoT_Publish_Message_Params *pParams, void *pClientData)
{
    nvds_cb(NVDS_MSGAPI_OK, pParams->payload, pParams->payloadLen, pTopicName, pClientData);
}

In your implementation of nvds_msgapi_subscribe, assign the callback supplied as an argument to the nvds_cb variable:

nvds_cb = cb;

if (SUCCESS != rc)
{
IOT_ERROR("Unable to subscribe, error: %d\n", rc);
return NVDS_MSGAPI_ERR;
}
}
IOT_INFO("Successfully subscribed");
return NVDS_MSGAPI_OK;
}

/* ************************************************************************* */
// Do Work function def
/* ************************************************************************* */
Expand Down Expand Up @@ -277,7 +306,8 @@ void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr)
}
return;
}
while (! g_queue_is_empty(work_queue)){
while (!g_queue_is_empty(work_queue))
{
Work *work_node = (Work *)g_queue_pop_head(work_queue);
AWS_IoT_Client *client = (AWS_IoT_Client *)work_node->h_ptr;
rc = _mqtt_msg_send(client, work_node->topic, work_node->payload, work_node->payload_size);
Expand All @@ -291,7 +321,8 @@ void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr)
g_free(work_node);
return;
}
if (work_node->call_back_handler != NULL){
if (work_node->call_back_handler != NULL)
{
IOT_INFO("Pointer callback.");
work_node->call_back_handler(work_node->user_ptr, NVDS_MSGAPI_OK);
}
Expand Down
1 change: 1 addition & 0 deletions aws_protocol_adaptor/device_client/aws_nvmsgbroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ NvDsMsgApiHandle nvds_msgapi_connect(char *connection_str, nvds_msgapi_connect_c
NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr);
NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle conn, char *topic, const uint8_t *payload, size_t nbuf);
NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic, const uint8_t *payload, size_t nbuf, nvds_msgapi_send_cb_t send_callback, void *user_ptr);
NvDsMsgApiErrorType nvds_msgapi_subscribe(NvDsMsgApiHandle h_ptr, char **topics, int num_topics, nvds_msgapi_subscribe_request_cb_t cb, void *user_ctx);
void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr);