Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deepstream 5.0 Subscription Implementation #1

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2e29abb
Add msgapi_subscribe implementation
Shubhamturakhia Apr 5, 2021
2a7a3d6
Modify broker.c file
Shubhamturakhia Apr 6, 2021
9a5bc7a
Added subcribe implementation
Shubhamturakhia Apr 7, 2021
b2c6940
Added Subscribe implementation
Shubhamturakhia Apr 7, 2021
bf90453
Minor changes
Shubhamturakhia Apr 7, 2021
67dffec
Changes according to PR review
Shubhamturakhia Apr 7, 2021
5ca5006
Add callback wrapper
Shubhamturakhia Apr 8, 2021
4821a94
Added unsubscribe feature
Shubhamturakhia Apr 12, 2021
adc30df
Changes as per PR review
Shubhamturakhia Apr 12, 2021
f8f0099
Minor change
Shubhamturakhia Apr 14, 2021
7aa84e7
Minor change
Shubhamturakhia Apr 14, 2021
9a8ff42
Fix formatting
Shubhamturakhia Apr 14, 2021
bd3310d
Update aws_protocol_adaptor/device_client/aws_nvmsgbroker.c
Shubhamturakhia Apr 14, 2021
7e4b8d9
Add msgapi connection signature mwthod
Shubhamturakhia Apr 15, 2021
f2cd2b9
Merge branch 'DS-5.0-subscription-implementation' of https://github.c…
Shubhamturakhia Apr 15, 2021
3707004
added connection signature method
Shubhamturakhia Apr 16, 2021
2fd20b1
Minor changes
Shubhamturakhia Apr 16, 2021
072dfbb
Modified utilities for conn signature
Shubhamturakhia Apr 16, 2021
93deacc
Formatting and minor refactoring
Shubhamturakhia Apr 19, 2021
9c1449a
Changes as per PR review
Shubhamturakhia Apr 20, 2021
fa94cc3
Modifications for Hash code
Shubhamturakhia Apr 21, 2021
525ee45
Changes as per review
Shubhamturakhia Apr 21, 2021
579d7b4
Minor indentation change
Shubhamturakhia Apr 21, 2021
d330130
Hash code refactoring
Shubhamturakhia Apr 22, 2021
819cdc0
Add hash code implementation for config file
Shubhamturakhia Apr 23, 2021
0bac762
Fix Incorrect Return Types
lummish Apr 23, 2021
ef454fb
Minor changes
Shubhamturakhia Apr 23, 2021
9c61435
Resolve Compilation Errors
lummish Apr 23, 2021
d9fa5a6
Link OpenSSL in Makefile
lummish Apr 23, 2021
54fff91
Statically Link to libssl and libcrypto
lummish Apr 27, 2021
c99d2de
Reorder SSL Linkages
lummish Apr 27, 2021
0c7895e
Reorder Linkage to SSL Libraries
lummish Apr 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions aws_protocol_adaptor/device_client/aws_iot_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,27 @@
#define AWS_IOT_MAX_CERT_PATH_LEN 200
#define AWS_IOT_MAX_PRIVATE_KEY_PATH_LEN 200

#define AWS_IOT_MQTT_TX_BUF_LEN 4096 ///< Any time a message is sent out through the MQTT layer. The message is copied into this buffer anytime a publish is done. This will also be used in the case of Thing Shadow
#define AWS_IOT_MQTT_RX_BUF_LEN 512 ///< Any message that comes into the device should be less than this buffer size. If a received message is bigger than this buffer size the message will be dropped.
#define AWS_IOT_MQTT_TOPIC_BUF_LEN 257 ///< Maximum len of topics in AWS MQTT server + EOS
#define AWS_IOT_MQTT_TX_BUF_LEN 4096 ///< Any time a message is sent out through the MQTT layer. The message is copied into this buffer anytime a publish is done. This will also be used in the case of Thing Shadow
#define AWS_IOT_MQTT_RX_BUF_LEN 512 ///< Any message that comes into the device should be less than this buffer size. If a received message is bigger than this buffer size the message will be dropped.
#define AWS_IOT_MQTT_TOPIC_BUF_LEN 257 ///< Maximum len of topics in AWS MQTT server + EOS
#define AWS_IOT_CLIENT_YIELD_WAIT_TIME 10 ///< Maximum number of milliseconds to pass thread execution to the MQTT client.
#define AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS 5 ///< Maximum number of topic filters the MQTT client can handle at any given time. This should be increased appropriately when using Thing Shadow

#define AWS_IOT_MAX_SEND_INTERVAL_SEC 120 ///< Have to yield every this amount of time before time-out disconnect
#define MAX_SUBSCRIPTIONS 50
#define NVDS_MSGAPI_VERSION "2.0"
#define NVDS_MSGAPI_PROTOCOL "AWS"

// Thing Shadow specific configs
#define SHADOW_MAX_SIZE_OF_RX_BUFFER (AWS_IOT_MQTT_RX_BUF_LEN + 1) ///< Maximum size of the SHADOW buffer to store the received Shadow message, including terminating NULL byte.
#define MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES 80 ///< Maximum size of the Unique Client Id. For More info on the Client Id refer \ref response "Acknowledgments"
#define MAX_SIZE_CLIENT_ID_WITH_SEQUENCE MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES + 10 ///< This is size of the extra sequence number that will be appended to the Unique client Id
#define MAX_SIZE_CLIENT_TOKEN_CLIENT_SEQUENCE MAX_SIZE_CLIENT_ID_WITH_SEQUENCE + 20 ///< This is size of the the total clientToken key and value pair in the JSON
#define MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME 10 ///< At Any given time we will wait for this many responses. This will correlate to the rate at which the shadow actions are requested
#define MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME 10 ///< We could perform shadow action on any thing Name and this is maximum Thing Names we can act on at any given time
#define MAX_JSON_TOKEN_EXPECTED 120 ///< These are the max tokens that is expected to be in the Shadow JSON document. Include the metadata that gets published
#define MAX_SHADOW_TOPIC_LENGTH_WITHOUT_THINGNAME 60 ///< All shadow actions have to be published or subscribed to a topic which is of the format $aws/things/{thingName}/shadow/update/accepted. This refers to the size of the topic without the Thing Name
#define MAX_SIZE_OF_THING_NAME 20 ///< The Thing Name should not be bigger than this value. Modify this if the Thing Name needs to be bigger
#define SHADOW_MAX_SIZE_OF_RX_BUFFER (AWS_IOT_MQTT_RX_BUF_LEN + 1) ///< Maximum size of the SHADOW buffer to store the received Shadow message, including terminating NULL byte.
#define MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES 80 ///< Maximum size of the Unique Client Id. For More info on the Client Id refer \ref response "Acknowledgments"
#define MAX_SIZE_CLIENT_ID_WITH_SEQUENCE MAX_SIZE_OF_UNIQUE_CLIENT_ID_BYTES + 10 ///< This is size of the extra sequence number that will be appended to the Unique client Id
#define MAX_SIZE_CLIENT_TOKEN_CLIENT_SEQUENCE MAX_SIZE_CLIENT_ID_WITH_SEQUENCE + 20 ///< This is size of the the total clientToken key and value pair in the JSON
#define MAX_ACKS_TO_COMEIN_AT_ANY_GIVEN_TIME 10 ///< At Any given time we will wait for this many responses. This will correlate to the rate at which the shadow actions are requested
#define MAX_THINGNAME_HANDLED_AT_ANY_GIVEN_TIME 10 ///< We could perform shadow action on any thing Name and this is maximum Thing Names we can act on at any given time
#define MAX_JSON_TOKEN_EXPECTED 120 ///< These are the max tokens that is expected to be in the Shadow JSON document. Include the metadata that gets published
#define MAX_SHADOW_TOPIC_LENGTH_WITHOUT_THINGNAME 60 ///< All shadow actions have to be published or subscribed to a topic which is of the format $aws/things/{thingName}/shadow/update/accepted. This refers to the size of the topic without the Thing Name
#define MAX_SIZE_OF_THING_NAME 20 ///< The Thing Name should not be bigger than this value. Modify this if the Thing Name needs to be bigger
#define MAX_SHADOW_TOPIC_LENGTH_BYTES MAX_SHADOW_TOPIC_LENGTH_WITHOUT_THINGNAME + MAX_SIZE_OF_THING_NAME ///< This size includes the length of topic with Thing Name

// Auto Reconnect specific config
Expand Down
154 changes: 146 additions & 8 deletions aws_protocol_adaptor/device_client/aws_nvmsgbroker.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,26 @@
#include "aws_config_parser.h"
#include "nvds_msgapi.h"
#include "aws_nvmsgbroker.h"
#include <openssl/sha.h>

NvDsMsgApiHandle (*nvds_msgapi_connect_ptr)(char *connection_str, nvds_msgapi_connect_cb_t connect_cb, char *config_path);
NvDsMsgApiErrorType (*nvds_msgapi_send_ptr)(NvDsMsgApiHandle conn, char *topic, const uint8_t *payload, size_t nbuf);
NvDsMsgApiErrorType (*nvds_msgapi_disconnect_ptr)(NvDsMsgApiHandle h_ptr);
NvDsMsgApiErrorType (*nvds_msgapi_connection_signature_ptr)(char *connection_str, char *config_path, char *output_str, int max_len);
static GMutex thread_mutex;
static GQueue *work_queue;
static struct timespec last_send_time_stamp; // this is to make sure we send or yield frequent enough so we do not get disconnected.
static struct timespec last_send_time_stamp; // this is to make sure we send or yield frequent enough so we do not get disconnected.
static nvds_msgapi_connect_cb_t disconnect_cb; // disconnect handler provided by connect thread
static nvds_msgapi_subscribe_request_cb_t nvds_cb; // msgapi subscribe callback handler
static char *subscribed_topics[MAX_SUBSCRIPTIONS]; // to store the subscribed topics in order to be used during unsubscribe operation
static size_t num_subscriptions = 0; // number of registered subscriptions

/* ************************************************************************* */
// Connect function def
/* ************************************************************************* */

static void disconnectCallbackHandler(AWS_IoT_Client *pClient, void *data)
static void
disconnectCallbackHandler(AWS_IoT_Client *pClient, void *data)
{
IOT_WARN("MQTT Disconnect");
IoT_Error_t rc = FAILURE;
Expand Down Expand Up @@ -83,7 +89,7 @@ NvDsMsgApiHandle nvds_msgapi_connect(char *connection_str, nvds_msgapi_connect_c
disconnect_cb = connect_cb;
if (config_path == NULL)
{
IOT_ERROR("Essensial args missing for function nvds_msgapi_connect\n");
IOT_ERROR("Essential args missing for function nvds_msgapi_connect\n");
return NULL;
}

Expand Down Expand Up @@ -142,11 +148,21 @@ NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr)
{
if ((h_ptr == NULL))
{
IOT_ERROR("Essensial args missing for function nvds_msgapi_disconnect\n");
IOT_ERROR("Essential args missing for function nvds_msgapi_disconnect\n");
return NVDS_MSGAPI_ERR;
}
IoT_Error_t rc = FAILURE;
AWS_IoT_Client *client = (AWS_IoT_Client *)h_ptr;
for (int i = 0; i < num_subscriptions; i++)
{
rc = aws_iot_mqtt_unsubscribe(client, subscribed_topics[i], strlen(subscribed_topics[i]));
if (SUCCESS != rc)
{
IOT_ERROR("Unable to unsubscribe, error: %d\n", rc);
}
}
IOT_INFO("Successfully unsubscribed");

rc = aws_iot_mqtt_disconnect(client);
if (SUCCESS != rc)
{
Expand Down Expand Up @@ -180,7 +196,7 @@ NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle conn, char *topic, const u
{
if ((conn == NULL) || (topic == NULL) || (payload == NULL) || (nbuf == 0))
{
IOT_ERROR("Essensial args missing for function nvds_msgapi_send\n");
IOT_ERROR("Essential args missing for function nvds_msgapi_send\n");
return NVDS_MSGAPI_ERR;
}
AWS_IoT_Client *client = (AWS_IoT_Client *)conn;
Expand Down Expand Up @@ -217,7 +233,7 @@ NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic,
{
if ((h_ptr == NULL) || (topic == NULL) || (payload == NULL) || (nbuf == 0))
{
IOT_ERROR("Essensial args missing for function nvds_msgapi_send: %d, %d, %d, %d\n", (h_ptr == NULL), (topic == NULL), (payload == NULL), (nbuf == 0));
IOT_ERROR("Essential args missing for function nvds_msgapi_send: %d, %d, %d, %d\n", (h_ptr == NULL), (topic == NULL), (payload == NULL), (nbuf == 0));
return NVDS_MSGAPI_ERR;
}
Work *work_node = g_malloc(sizeof(Work));
Expand All @@ -243,6 +259,43 @@ NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic,
return NVDS_MSGAPI_OK;
}

void nvds_cb_wrapped(AWS_IoT_Client *pClient, char *pTopicName, uint16_t topicNameLen, IoT_Publish_Message_Params *pParams, void *pClientData)
{
IOT_INFO("Subscribe callback");
nvds_cb(NVDS_MSGAPI_OK, pParams->payload, pParams->payloadLen, pTopicName, pClientData);
}

NvDsMsgApiErrorType nvds_msgapi_subscribe(NvDsMsgApiHandle h_ptr, char **topics, int num_topics, nvds_msgapi_subscribe_request_cb_t cb, void *user_ctx)
{
IOT_INFO("Subscribe called\n");
if ((h_ptr == NULL) || (topics == NULL) || (num_topics <= 0))
{
IOT_ERROR("Essential args missing for function nvds_msgapi_subscribe: %d, %d, %d\n", (h_ptr == NULL), (topics == NULL), (num_topics == 0));
return NVDS_MSGAPI_ERR;
}
if (!cb)
{
IOT_ERROR("Callback function for nvds_msgapi_subscribe cannot be NULL\n");
return NVDS_MSGAPI_ERR;
}
nvds_cb = cb;
IoT_Error_t rc = FAILURE;
AWS_IoT_Client *client = (AWS_IoT_Client *)h_ptr;
for (int i = 0; i < num_topics; i++)
{
rc = aws_iot_mqtt_subscribe(client, topics[i], strlen(topics[i]), QOS0, nvds_cb_wrapped, user_ctx);
if (SUCCESS != rc)
{
IOT_ERROR("Unable to subscribe, error: %d\n", rc);
return NVDS_MSGAPI_ERR;
}
subscribed_topics[i] = topics[i];
num_subscriptions++;
}
IOT_INFO("Successfully subscribed");
return NVDS_MSGAPI_OK;
}

/* ************************************************************************* */
// Do Work function def
/* ************************************************************************* */
Expand Down Expand Up @@ -277,7 +330,8 @@ void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr)
}
return;
}
while (! g_queue_is_empty(work_queue)){
while (!g_queue_is_empty(work_queue))
{
Work *work_node = (Work *)g_queue_pop_head(work_queue);
AWS_IoT_Client *client = (AWS_IoT_Client *)work_node->h_ptr;
rc = _mqtt_msg_send(client, work_node->topic, work_node->payload, work_node->payload_size);
Expand All @@ -291,7 +345,8 @@ void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr)
g_free(work_node);
return;
}
if (work_node->call_back_handler != NULL){
if (work_node->call_back_handler != NULL)
{
IOT_INFO("Pointer callback.");
work_node->call_back_handler(work_node->user_ptr, NVDS_MSGAPI_OK);
}
Expand All @@ -300,3 +355,86 @@ void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr)
last_send_time_stamp = current_time_stamp;
return;
}

char *nvds_msgapi_getversion()
{
return NVDS_MSGAPI_VERSION;
}

char *nvds_msgapi_get_protocol_name()
{
return NVDS_MSGAPI_PROTOCOL;
}

bool is_valid_connection_str(char *connection_str)
{
char *burl = "", *bport = "";
if (connection_str == NULL)
{
IOT_ERROR("connection string cant be NULL");
return false;
}

char conn_str[] = connection_str;
int i = 0;

char *token = strtok(conn_str, ";");
char *data[2];

while (token)
{
data[i++] = token;
token = strtok(NULL, ";");
}
Comment on lines +383 to +390
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this will work properly. The second call to strtok shouldn't give you a second token. Instead, try something like this:

int i = 0;
char *data[2];

for (i = 0; i < 2; i++) {
  data[i] = strtok(conn_str, ";");
}

for (i = 0; i < 2; i++)
{
printf("%s\n", data[i]);
}
burl = data[0];
bport = data[1];

if (burl == "" || bport == "")
{
IOT_ERROR("connection string is invalid. hostname or port is empty\n");
return false;
}
return true;
}

char *generate_sha256_hash(char *str, char *output_str)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's typical in C-style APIs for the destination variable to come first. See sprintf for an example.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, you're passing in this argument output_str but you're not actually using it anywhere.

{
unsigned char hashval[SHA256_DIGEST_LENGTH];
int len = SHA256_DIGEST_LENGTH * 2 + 1;
char res[len];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

res is a statically allocated variable, but you're trying to return it. What do you think will happen as a result?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To remedy the problem, you should supply the output string as an argument to the function. Note: it will need to have space for at least 65 characters (which you'll need to add a check for).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max_len is actually the length of the output_str and I have added a check in the connection signature to check if has at least 65 characters

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, but you still need to resolve the issue of returning a statically allocated variable. res is created on the stack, not the heap, so its lifetime is only that of the enclosing scope. The value of res will be invalidated once this function returns.

SHA256_CTX sha256;
SHA256_Init(&sha256);
SHA256_Update(&sha256, str, strlen(str));
SHA256_Final(hashval, &sha256);
for (int i = 0; i < SHA256_DIGEST_LENGTH; i++)
{
sprintf(res + (i * 2), "%02x", hashval[i]);
}
return res;
}

NvDsMsgApiErrorType nvds_msgapi_connection_signature(char *broker_str, char *cfg, char *output_str, int max_len)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_len isn't used anywhere. This suggests that your string operations are unsafe.

{
strcpy(output_str, ""); // Initializing output_str as empty string if successful which is updated later by SHA-256 hash else empty string is returned
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment isn't very helpful.

Suggested change
strcpy(output_str, ""); // Initializing output_str as empty string if successful which is updated later by SHA-256 hash else empty string is returned
// Value of output_str must be empty string if operation is unsuccessful
strcpy(output_str, "");

int required_output_str_len = 2 * SHA256_DIGEST_LENGTH + 1
if (broker_str == NULL || cfg == NULL)
{
IOT_ERROR("nvds_msgapi_connection_signature: broker_str or cfg path cant be NULL\n");
return NVDS_MSGAPI_ERR;
}
if (max_len < required_output_str_len)
{
IOT_ERROR("nvds_msgapi_connection_signature: insufficient output string length. Atleast %d needed", required_output_str_len);
return NVDS_MSGAPI_ERR;
}
if (!is_valid_connection_str(broker_str))
{
return NVDS_MSGAPI_ERR;
}
output_str = generate_sha256_hash(broker_str, output_str);
return NVDS_MSGAPI_OK;
}
4 changes: 4 additions & 0 deletions aws_protocol_adaptor/device_client/aws_nvmsgbroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ NvDsMsgApiHandle nvds_msgapi_connect(char *connection_str, nvds_msgapi_connect_c
NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr);
NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle conn, char *topic, const uint8_t *payload, size_t nbuf);
NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic, const uint8_t *payload, size_t nbuf, nvds_msgapi_send_cb_t send_callback, void *user_ptr);
NvDsMsgApiErrorType nvds_msgapi_subscribe(NvDsMsgApiHandle h_ptr, char **topics, int num_topics, nvds_msgapi_subscribe_request_cb_t cb, void *user_ctx);
void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr);
char *nvds_msgapi_getversion(void);
char *nvds_msgapi_get_protocol_name(void);
NvDsMsgApiErrorType nvds_msgapi_connection_signature(char *broker_str, char *cfg, char *output_str, int max_len);
lummish marked this conversation as resolved.
Show resolved Hide resolved