-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deepstream 5.0 Subscription Implementation #1
Open
Shubhamturakhia
wants to merge
32
commits into
master
Choose a base branch
from
DS-5.0-subscription-implementation
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
2e29abb
Add msgapi_subscribe implementation
Shubhamturakhia 2a7a3d6
Modify broker.c file
Shubhamturakhia 9a5bc7a
Added subcribe implementation
Shubhamturakhia b2c6940
Added Subscribe implementation
Shubhamturakhia bf90453
Minor changes
Shubhamturakhia 67dffec
Changes according to PR review
Shubhamturakhia 5ca5006
Add callback wrapper
Shubhamturakhia 4821a94
Added unsubscribe feature
Shubhamturakhia adc30df
Changes as per PR review
Shubhamturakhia f8f0099
Minor change
Shubhamturakhia 7aa84e7
Minor change
Shubhamturakhia 9a8ff42
Fix formatting
Shubhamturakhia bd3310d
Update aws_protocol_adaptor/device_client/aws_nvmsgbroker.c
Shubhamturakhia 7e4b8d9
Add msgapi connection signature mwthod
Shubhamturakhia f2cd2b9
Merge branch 'DS-5.0-subscription-implementation' of https://github.c…
Shubhamturakhia 3707004
added connection signature method
Shubhamturakhia 2fd20b1
Minor changes
Shubhamturakhia 072dfbb
Modified utilities for conn signature
Shubhamturakhia 93deacc
Formatting and minor refactoring
Shubhamturakhia 9c1449a
Changes as per PR review
Shubhamturakhia fa94cc3
Modifications for Hash code
Shubhamturakhia 525ee45
Changes as per review
Shubhamturakhia 579d7b4
Minor indentation change
Shubhamturakhia d330130
Hash code refactoring
Shubhamturakhia 819cdc0
Add hash code implementation for config file
Shubhamturakhia 0bac762
Fix Incorrect Return Types
lummish ef454fb
Minor changes
Shubhamturakhia 9c61435
Resolve Compilation Errors
lummish d9fa5a6
Link OpenSSL in Makefile
lummish 54fff91
Statically Link to libssl and libcrypto
lummish c99d2de
Reorder SSL Linkages
lummish 0c7895e
Reorder Linkage to SSL Libraries
lummish File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,20 +33,29 @@ | |
#include "aws_config_parser.h" | ||
#include "nvds_msgapi.h" | ||
#include "aws_nvmsgbroker.h" | ||
#include <openssl/sha.h> | ||
|
||
#define SHA256_FILE_BUFLEN 32768 | ||
#define SHA256_STRLEN SHA256_DIGEST_LENGTH * 2 + 1 | ||
|
||
NvDsMsgApiHandle (*nvds_msgapi_connect_ptr)(char *connection_str, nvds_msgapi_connect_cb_t connect_cb, char *config_path); | ||
NvDsMsgApiErrorType (*nvds_msgapi_send_ptr)(NvDsMsgApiHandle conn, char *topic, const uint8_t *payload, size_t nbuf); | ||
NvDsMsgApiErrorType (*nvds_msgapi_disconnect_ptr)(NvDsMsgApiHandle h_ptr); | ||
NvDsMsgApiErrorType (*nvds_msgapi_connection_signature_ptr)(char *connection_str, char *config_path, char *output_str, int max_len); | ||
static GMutex thread_mutex; | ||
static GQueue *work_queue; | ||
static struct timespec last_send_time_stamp; // this is to make sure we send or yield frequent enough so we do not get disconnected. | ||
static struct timespec last_send_time_stamp; // this is to make sure we send or yield frequent enough so we do not get disconnected. | ||
static nvds_msgapi_connect_cb_t disconnect_cb; // disconnect handler provided by connect thread | ||
static nvds_msgapi_subscribe_request_cb_t nvds_cb; // msgapi subscribe callback handler | ||
static char *subscribed_topics[MAX_SUBSCRIPTIONS]; // to store the subscribed topics in order to be used during unsubscribe operation | ||
static size_t num_subscriptions = 0; // number of registered subscriptions | ||
|
||
/* ************************************************************************* */ | ||
// Connect function def | ||
/* ************************************************************************* */ | ||
|
||
static void disconnectCallbackHandler(AWS_IoT_Client *pClient, void *data) | ||
static void | ||
disconnectCallbackHandler(AWS_IoT_Client *pClient, void *data) | ||
{ | ||
IOT_WARN("MQTT Disconnect"); | ||
IoT_Error_t rc = FAILURE; | ||
|
@@ -83,7 +92,7 @@ NvDsMsgApiHandle nvds_msgapi_connect(char *connection_str, nvds_msgapi_connect_c | |
disconnect_cb = connect_cb; | ||
if (config_path == NULL) | ||
{ | ||
IOT_ERROR("Essensial args missing for function nvds_msgapi_connect\n"); | ||
IOT_ERROR("Essential args missing for function nvds_msgapi_connect\n"); | ||
return NULL; | ||
} | ||
|
||
|
@@ -142,11 +151,21 @@ NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr) | |
{ | ||
if ((h_ptr == NULL)) | ||
{ | ||
IOT_ERROR("Essensial args missing for function nvds_msgapi_disconnect\n"); | ||
IOT_ERROR("Essential args missing for function nvds_msgapi_disconnect\n"); | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
IoT_Error_t rc = FAILURE; | ||
AWS_IoT_Client *client = (AWS_IoT_Client *)h_ptr; | ||
for (int i = 0; i < num_subscriptions; i++) | ||
{ | ||
rc = aws_iot_mqtt_unsubscribe(client, subscribed_topics[i], strlen(subscribed_topics[i])); | ||
if (SUCCESS != rc) | ||
{ | ||
IOT_ERROR("Unable to unsubscribe, error: %d\n", rc); | ||
} | ||
} | ||
IOT_INFO("Successfully unsubscribed"); | ||
|
||
rc = aws_iot_mqtt_disconnect(client); | ||
if (SUCCESS != rc) | ||
{ | ||
|
@@ -180,7 +199,7 @@ NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle conn, char *topic, const u | |
{ | ||
if ((conn == NULL) || (topic == NULL) || (payload == NULL) || (nbuf == 0)) | ||
{ | ||
IOT_ERROR("Essensial args missing for function nvds_msgapi_send\n"); | ||
IOT_ERROR("Essential args missing for function nvds_msgapi_send\n"); | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
AWS_IoT_Client *client = (AWS_IoT_Client *)conn; | ||
|
@@ -217,7 +236,7 @@ NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic, | |
{ | ||
if ((h_ptr == NULL) || (topic == NULL) || (payload == NULL) || (nbuf == 0)) | ||
{ | ||
IOT_ERROR("Essensial args missing for function nvds_msgapi_send: %d, %d, %d, %d\n", (h_ptr == NULL), (topic == NULL), (payload == NULL), (nbuf == 0)); | ||
IOT_ERROR("Essential args missing for function nvds_msgapi_send: %d, %d, %d, %d\n", (h_ptr == NULL), (topic == NULL), (payload == NULL), (nbuf == 0)); | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
Work *work_node = g_malloc(sizeof(Work)); | ||
|
@@ -243,6 +262,43 @@ NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic, | |
return NVDS_MSGAPI_OK; | ||
} | ||
|
||
void nvds_cb_wrapped(AWS_IoT_Client *pClient, char *pTopicName, uint16_t topicNameLen, IoT_Publish_Message_Params *pParams, void *pClientData) | ||
{ | ||
IOT_INFO("Subscribe callback"); | ||
nvds_cb(NVDS_MSGAPI_OK, pParams->payload, pParams->payloadLen, pTopicName, pClientData); | ||
} | ||
|
||
NvDsMsgApiErrorType nvds_msgapi_subscribe(NvDsMsgApiHandle h_ptr, char **topics, int num_topics, nvds_msgapi_subscribe_request_cb_t cb, void *user_ctx) | ||
{ | ||
IOT_INFO("Subscribe called\n"); | ||
if ((h_ptr == NULL) || (topics == NULL) || (num_topics <= 0)) | ||
{ | ||
IOT_ERROR("Essential args missing for function nvds_msgapi_subscribe: %d, %d, %d\n", (h_ptr == NULL), (topics == NULL), (num_topics == 0)); | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
if (!cb) | ||
{ | ||
IOT_ERROR("Callback function for nvds_msgapi_subscribe cannot be NULL\n"); | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
nvds_cb = cb; | ||
IoT_Error_t rc = FAILURE; | ||
AWS_IoT_Client *client = (AWS_IoT_Client *)h_ptr; | ||
for (int i = 0; i < num_topics; i++) | ||
{ | ||
rc = aws_iot_mqtt_subscribe(client, topics[i], strlen(topics[i]), QOS0, nvds_cb_wrapped, user_ctx); | ||
if (SUCCESS != rc) | ||
{ | ||
IOT_ERROR("Unable to subscribe, error: %d\n", rc); | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
subscribed_topics[i] = topics[i]; | ||
num_subscriptions++; | ||
} | ||
IOT_INFO("Successfully subscribed"); | ||
return NVDS_MSGAPI_OK; | ||
} | ||
|
||
/* ************************************************************************* */ | ||
// Do Work function def | ||
/* ************************************************************************* */ | ||
|
@@ -277,7 +333,8 @@ void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr) | |
} | ||
return; | ||
} | ||
while (! g_queue_is_empty(work_queue)){ | ||
while (!g_queue_is_empty(work_queue)) | ||
{ | ||
Work *work_node = (Work *)g_queue_pop_head(work_queue); | ||
AWS_IoT_Client *client = (AWS_IoT_Client *)work_node->h_ptr; | ||
rc = _mqtt_msg_send(client, work_node->topic, work_node->payload, work_node->payload_size); | ||
|
@@ -291,7 +348,8 @@ void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr) | |
g_free(work_node); | ||
return; | ||
} | ||
if (work_node->call_back_handler != NULL){ | ||
if (work_node->call_back_handler != NULL) | ||
{ | ||
IOT_INFO("Pointer callback."); | ||
work_node->call_back_handler(work_node->user_ptr, NVDS_MSGAPI_OK); | ||
} | ||
|
@@ -300,3 +358,131 @@ void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr) | |
last_send_time_stamp = current_time_stamp; | ||
return; | ||
} | ||
|
||
char *nvds_msgapi_getversion() | ||
{ | ||
return NVDS_MSGAPI_VERSION; | ||
} | ||
|
||
char *nvds_msgapi_get_protocol_name() | ||
{ | ||
return NVDS_MSGAPI_PROTOCOL; | ||
} | ||
|
||
bool is_valid_connection_str(char *conn_str) | ||
{ | ||
char *burl = "", *bport = ""; | ||
if (conn_str == NULL) | ||
{ | ||
IOT_ERROR("connection string cant be NULL"); | ||
return false; | ||
} | ||
|
||
int i = 0; | ||
|
||
char *token = strtok(conn_str, ";"); | ||
char *data[2]; | ||
|
||
while (token) | ||
{ | ||
data[i++] = token; | ||
token = strtok(NULL, ";"); | ||
} | ||
for (i = 0; i < 2; i++) | ||
{ | ||
printf("%s\n", data[i]); | ||
} | ||
burl = data[0]; | ||
bport = data[1]; | ||
|
||
if (burl == "" || bport == "") | ||
{ | ||
IOT_ERROR("connection string is invalid. hostname or port is empty\n"); | ||
return false; | ||
} | ||
return true; | ||
} | ||
|
||
int cfg_file_sha256(SHA256_CTX *sha256, char *cfg_path) | ||
{ | ||
FILE *file; | ||
unsigned char buffer[SHA256_FILE_BUFLEN]; | ||
int bytes_read = 0; | ||
int read_err = 0; | ||
|
||
if (!(file = fopen(cfg_path, "rb"))) | ||
{ | ||
IOT_ERROR("Failed to open configuration file %s\n", cfg_path); | ||
return -1; | ||
} | ||
|
||
while ((bytes_read = fread(buffer, 1, sizeof(buffer), file))) | ||
{ | ||
SHA256_Update(sha256, buffer, bytes_read); | ||
} | ||
|
||
read_err = ferror(file); | ||
fclose(file); | ||
|
||
if (read_err != 0) | ||
{ | ||
IOT_ERROR("Reading configuration file failed with error code %d\n", read_err); | ||
return read_err; | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
int generate_sha256_hash(char *output_str, char *conn_str, char *cfg_path) | ||
{ | ||
SHA256_CTX sha256; | ||
unsigned char hashval[SHA256_DIGEST_LENGTH]; | ||
|
||
SHA256_Init(&sha256); | ||
|
||
SHA256_Update(&sha256, conn_str, strlen(conn_str)); | ||
|
||
if (cfg_file_sha256(&sha256, cfg_path) != 0) | ||
{ | ||
IOT_ERROR("Failed to generate config file hash\n"); | ||
return -1; | ||
} | ||
|
||
SHA256_Final(hashval, &sha256); | ||
for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) | ||
{ | ||
sprintf(output_str + (i * 2), "%02x", hashval[i]); | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
NvDsMsgApiErrorType nvds_msgapi_connection_signature(char *broker_str, char *cfg, char *output_str, int max_len) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
{ | ||
// Value of output_str must be empty string if operation is unsuccessful | ||
strcpy(output_str, ""); | ||
|
||
if (broker_str == NULL || cfg == NULL) | ||
{ | ||
IOT_ERROR("Must specify broker_str and cfg\n"); | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
|
||
if (max_len < SHA256_STRLEN) | ||
{ | ||
IOT_ERROR("Insufficient output string length. Need %d, got %d", SHA256_STRLEN, max_len); | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
|
||
if (!is_valid_connection_str(broker_str)) | ||
{ | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
|
||
if (generate_sha256_hash(output_str, broker_str, cfg) != 0) | ||
{ | ||
return NVDS_MSGAPI_ERR; | ||
} | ||
|
||
return NVDS_MSGAPI_OK; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think this will work properly. The second call to
strtok
shouldn't give you a second token. Instead, try something like this: