From 85adf3e02ac8c4af22a612d91f3b7557286f6d44 Mon Sep 17 00:00:00 2001 From: Jonathan Adotey Date: Fri, 1 Dec 2023 03:40:20 -0500 Subject: [PATCH] Got C producer working, will clean later --- src/groups/z_bmq/z_bmqa/demo.h | 100 ++++++++++++++++++ src/groups/z_bmq/z_bmqa/package/z_bmqa.mem | 3 +- .../z_bmq/z_bmqa/z_bmqa_confirmeventbuilder.h | 1 + src/groups/z_bmq/z_bmqa/z_bmqa_message.cpp | 2 +- src/groups/z_bmq/z_bmqa/z_bmqa_message.h | 10 +- .../z_bmqa/z_bmqa_messageeventbuilder.cpp | 5 +- .../z_bmq/z_bmqa/z_bmqa_messageeventbuilder.h | 2 +- src/groups/z_bmq/z_bmqa/z_bmqa_session.cpp | 28 ++--- src/groups/z_bmq/z_bmqa/z_bmqa_session.h | 4 + src/groups/z_bmq/z_bmqa/z_bmqa_session.t.cpp | 27 +---- .../z_bmq/z_bmqt/z_bmqt_sessionoptions.cpp | 10 ++ .../z_bmq/z_bmqt/z_bmqt_sessionoptions.h | 2 + 12 files changed, 149 insertions(+), 45 deletions(-) create mode 100644 src/groups/z_bmq/z_bmqa/demo.h diff --git a/src/groups/z_bmq/z_bmqa/demo.h b/src/groups/z_bmq/z_bmqa/demo.h new file mode 100644 index 0000000000..703686a7c9 --- /dev/null +++ b/src/groups/z_bmq/z_bmqa/demo.h @@ -0,0 +1,100 @@ +#include +#include +#include +#include +#include + +#if defined(__cplusplus) +extern "C" { +#endif + +const int K_QUEUE_ID = 1; +const char K_QUEUE_URI[] = "bmq://bmq.test.mem.priority/test-queue"; +const int K_NUM_MESSAGES = 5; + + enum QueueFlags { + e_ADMIN = (1 << 0) // The queue is opened in admin mode (Valid only + // for BlazingMQ admin tasks) + , + e_READ = (1 << 1) // The queue is opened for consuming messages + , + e_WRITE = (1 << 2) // The queue is opened for posting messages + , + e_ACK = (1 << 3) // Set to indicate interested in receiving + // 'ACK' events for all message posted +}; + + +void postEvent(const char* text, + z_bmqa_QueueId* queueId, + z_bmqa_Session* session){ + + z_bmqa_MessageEventBuilder* builder; + z_bmqa_MessageEventBuilder__create(&builder); + + z_bmqa_Session__loadMessageEventBuilder(session, &builder); + + z_bmqa_Message* message; + + z_bmqa_MessageEventBuilder__startMessage(builder, &message); + + z_bmqa_Message__setDataRef(message, text, (int)strlen(text)); + + z_bmqa_MessageEventBuilder__packMessage(builder, queueId); + + const z_bmqa_MessageEvent* messageEvent; + z_bmqa_MessageEventBuilder__messageEvent(builder, &messageEvent); + + z_bmqa_Session__post(session, messageEvent); +} + + +void produce(z_bmqa_Session* session){ + z_bmqa_QueueId* queueId; + + z_bmqa_QueueId__createFromNumeric(&queueId, K_QUEUE_ID); + z_bmqa_Session__openQueueSync(session, + queueId, + K_QUEUE_URI, + e_WRITE); + + const char* messages[] = {"Hello world!", + "message 1", + "message 2", + "message 3", + "Good Bye!"}; + for(int idx = 0; idx < 5; ++idx){ + postEvent(messages[idx], queueId, session); + } + + z_bmqa_Session__closeQueueSync(session, queueId); +} + +int run_c_producer(){ + z_bmqa_Session* session; + z_bmqt_SessionOptions* options; + + z_bmqt_SessionOptions__create(&options); + z_bmqa_Session__create(&session, options); + + + //start the session + z_bmqa_Session__start(session, 1000); + + produce(session); + + //stop the session + z_bmqa_Session__stop(session); + + printf("Good2\n"); + + // z_bmqa_Session__delete(&session); + // z_bmqt_SessionOptions__delete(&options); + + return 0; + +} + +#if defined(__cplusplus) +} +#endif \ No newline at end of file diff --git a/src/groups/z_bmq/z_bmqa/package/z_bmqa.mem b/src/groups/z_bmq/z_bmqa/package/z_bmqa.mem index da1eb6b88f..bf62e7499c 100644 --- a/src/groups/z_bmq/z_bmqa/package/z_bmqa.mem +++ b/src/groups/z_bmq/z_bmqa/package/z_bmqa.mem @@ -3,4 +3,5 @@ z_bmqa_session z_bmqa_event z_bmqa_confirmeventbuilder z_bmqa_messageeventbuilder -z_bmqa_messageproperties \ No newline at end of file +z_bmqa_messageproperties +z_bmqa_message \ No newline at end of file diff --git a/src/groups/z_bmq/z_bmqa/z_bmqa_confirmeventbuilder.h b/src/groups/z_bmq/z_bmqa/z_bmqa_confirmeventbuilder.h index 5d872fb123..3e2fb9b34b 100644 --- a/src/groups/z_bmq/z_bmqa/z_bmqa_confirmeventbuilder.h +++ b/src/groups/z_bmq/z_bmqa/z_bmqa_confirmeventbuilder.h @@ -9,6 +9,7 @@ extern "C" { #endif typedef struct z_bmqa_ConfirmEventBuilder z_bmqa_ConfirmEventBuilder; +typedef struct z_bmqa_MessageConfirmationCookie z_bmqa_MessageConfirmationCookie; int z_bmqa_ConfirmEventBuilder__create(z_bmqa_ConfirmEventBuilder** builder_obj); diff --git a/src/groups/z_bmq/z_bmqa/z_bmqa_message.cpp b/src/groups/z_bmq/z_bmqa/z_bmqa_message.cpp index fafb902303..56ad5780be 100644 --- a/src/groups/z_bmq/z_bmqa/z_bmqa_message.cpp +++ b/src/groups/z_bmq/z_bmqa/z_bmqa_message.cpp @@ -10,7 +10,7 @@ int z_bmqa_Message__createEmpty(z_bmqa_Message** message_obj){ return 0; } -int z_bmqa_Message_setDataRef(z_bmqa_Message** message_obj, const char* data, size_t length){ +int z_bmqa_Message__setDataRef(z_bmqa_Message* message_obj, const char* data, int length){ using namespace BloombergLP; bmqa::Message* message_ptr = reinterpret_cast(message_obj); diff --git a/src/groups/z_bmq/z_bmqa/z_bmqa_message.h b/src/groups/z_bmq/z_bmqa/z_bmqa_message.h index ab2226c482..ac3352e0b4 100644 --- a/src/groups/z_bmq/z_bmqa/z_bmqa_message.h +++ b/src/groups/z_bmq/z_bmqa/z_bmqa_message.h @@ -1,11 +1,19 @@ #ifndef INCLUDED_Z_BMQA_MESSAGE #define INCLUDED_Z_BMQA_MESSAGE +#if defined(__cplusplus) +extern "C" { +#endif + typedef struct z_bmqa_Message z_bmqa_Message; int z_bmqa_Message__createEmpty(z_bmqa_Message** message_obj); -int z_bmqa_Message_setDataRef(z_bmqa_Message** message_obj, const char* data, int length); +int z_bmqa_Message__setDataRef(z_bmqa_Message* message_obj, const char* data, int length); + +#if defined(__cplusplus) +} +#endif #endif \ No newline at end of file diff --git a/src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.cpp b/src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.cpp index 26a192f8ee..8d5d163dfa 100644 --- a/src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.cpp +++ b/src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.cpp @@ -13,11 +13,12 @@ int z_bmqa_MessageEventBuilder__create(z_bmqa_MessageEventBuilder** builder_obj) return 0; } -int z_bmqa_MessageEventBuilder__startMessage(z_bmqa_MessageEventBuilder* builder_obj, z_bmqa_Message* out_obj) { +int z_bmqa_MessageEventBuilder__startMessage(z_bmqa_MessageEventBuilder* builder_obj, z_bmqa_Message** out_obj) { using namespace BloombergLP; bmqa::MessageEventBuilder* builder_ptr = reinterpret_cast(builder_obj); - out_obj = reinterpret_cast(&builder_ptr->startMessage()); // uhhhhh + bmqa::Message* message_ptr = &builder_ptr->startMessage(); + *out_obj = reinterpret_cast(message_ptr); return 0; } diff --git a/src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.h b/src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.h index f7e0809c68..1d89a98e63 100644 --- a/src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.h +++ b/src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.h @@ -15,7 +15,7 @@ typedef struct z_bmqa_MessageEventBuilder z_bmqa_MessageEventBuilder; int z_bmqa_MessageEventBuilder__create(z_bmqa_MessageEventBuilder** builder_obj); -int z_bmqa_MessageEventBuilder__startMessage(z_bmqa_MessageEventBuilder* builder_obj, z_bmqa_Message* out_obj); +int z_bmqa_MessageEventBuilder__startMessage(z_bmqa_MessageEventBuilder* builder_obj, z_bmqa_Message** out_obj); int z_bmqa_MessageEventBuilder__packMessage(z_bmqa_MessageEventBuilder* builder_obj, const z_bmqa_QueueId* queueId); diff --git a/src/groups/z_bmq/z_bmqa/z_bmqa_session.cpp b/src/groups/z_bmq/z_bmqa/z_bmqa_session.cpp index 9a5c660e12..1ad72cdcd4 100644 --- a/src/groups/z_bmq/z_bmqa/z_bmqa_session.cpp +++ b/src/groups/z_bmq/z_bmqa/z_bmqa_session.cpp @@ -5,6 +5,16 @@ #include +int z_bmqa_Session__delete(z_bmqa_Session** session_obj) { + using namespace BloombergLP; + + bmqa::Session* session_ptr = reinterpret_cast(session_obj); + delete session_ptr; + *session_obj = NULL; + + return 0; +} + int z_bmqa_Session__create(z_bmqa_Session** session_obj , const z_bmqt_SessionOptions* options) { using namespace BloombergLP; @@ -46,19 +56,12 @@ int z_bmqa_Session__finalizeStop(z_bmqa_Session* session_obj){ } -int z_bmqa_Session__finalizeStop(z_bmqa_Session* session_obj){ - using namespace BloombergLP; - - bmqa::Session* session_ptr = reinterpret_cast(session_obj); - session_ptr->finalizeStop(); - return 0; -} int z_bmqa_Session__loadMessageEventBuilder(z_bmqa_Session* session_obj, z_bmqa_MessageEventBuilder** builder){ using namespace BloombergLP; bmqa::Session* session_ptr = reinterpret_cast(session_obj); - bmqa::MessageEventBuilder* builder_ptr = reinterpret_cast(builder); + bmqa::MessageEventBuilder* builder_ptr = reinterpret_cast(*builder); session_ptr->loadMessageEventBuilder(builder_ptr); return 0; @@ -129,15 +132,6 @@ int z_bmqa_Session__closeQueueSync(z_bmqa_Session* session_obj, z_bmqa_QueueId* return 0; } -int z_bmqa_Session__loadMessageEventBuilder(z_bmqa_Session* session_obj, z_bmqa_MessageEventBuilder** builder){ - using namespace BloombergLP; - - bmqa::Session* session_ptr = reinterpret_cast(session_obj); - bmqa::MessageEventBuilder* builder_ptr = reinterpret_cast(builder); - - session_ptr->loadMessageEventBuilder(builder_ptr); - return 0; -} int z_bmqa_Session__post(z_bmqa_Session* session_obj, const z_bmqa_MessageEvent* event){ using namespace BloombergLP; diff --git a/src/groups/z_bmq/z_bmqa/z_bmqa_session.h b/src/groups/z_bmq/z_bmqa/z_bmqa_session.h index 4985eb52b9..1e029b0003 100644 --- a/src/groups/z_bmq/z_bmqa/z_bmqa_session.h +++ b/src/groups/z_bmq/z_bmqa/z_bmqa_session.h @@ -41,6 +41,10 @@ typedef struct EventHandlerData { // } // } + // + +int z_bmqa_Session__delete(z_bmqa_Session** session_obj); + int z_bmqa_Session__create(z_bmqa_Session** session_obj, const z_bmqt_SessionOptions* options); int z_bmqa_Session__createAsync(z_bmqa_Session** session_obj, z_bmqa_SessionEventHandler* eventHandler, const z_bmqt_SessionOptions* options); diff --git a/src/groups/z_bmq/z_bmqa/z_bmqa_session.t.cpp b/src/groups/z_bmq/z_bmqa/z_bmqa_session.t.cpp index 8c2c755b38..d5e3c373b8 100644 --- a/src/groups/z_bmq/z_bmqa/z_bmqa_session.t.cpp +++ b/src/groups/z_bmq/z_bmqa/z_bmqa_session.t.cpp @@ -27,6 +27,8 @@ // TEST DRIVER #include +#include + // CONVENIENCE using namespace BloombergLP; using namespace bsl; @@ -37,26 +39,7 @@ using namespace bsl; static void test1_session() { - mwctst::TestHelper::printTestName("Create Session"); - - // Create default session - - z_bmqa_Session* session; - z_bmqt_SessionOptions* options; - z_bmqt_SessionOptions__create(&options); - z_bmqa_Session__create(&session, options); - - // Make sure 'k_BROKER_DEFAULT_PORT' and the default brokerUri are in sync - { - PV("CHECKING start() and stop()"); - - z_bmqa_Session__start(session, 1000); - - z_bmqa_Session__stop(session); - // ASSERT_EQ(sessionOptions_cpp.brokerUri(), result); - } - - // z_bmqa_Session__destroy(session); + run_c_producer(); } @@ -69,8 +52,8 @@ int main(int argc, char* argv[]) TEST_PROLOG(mwctst::TestHelper::e_DEFAULT); switch (_testCase) { - case 0: - case 1: test1_session(); break; + case 0: test1_session(); break; + case 1: printf("Good\n"); break; default: { cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl; s_testStatus = -1; diff --git a/src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.cpp b/src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.cpp index 7e4a751eed..28876700f5 100644 --- a/src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.cpp +++ b/src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.cpp @@ -2,6 +2,16 @@ #include +int z_bmqt_SessionOptions__delete(z_bmqt_SessionOptions** options_obj) { + using namespace BloombergLP; + + bmqt::SessionOptions* options_ptr = reinterpret_cast(options_obj); + delete options_ptr; + *options_obj = NULL; + + return 0; +} + int z_bmqt_SessionOptions__create(z_bmqt_SessionOptions** options_obj) { using namespace BloombergLP; bmqt::SessionOptions* options_ptr = new bmqt::SessionOptions(); diff --git a/src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.h b/src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.h index 4e86e570e3..269e5bba3d 100644 --- a/src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.h +++ b/src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.h @@ -8,6 +8,8 @@ extern "C" { typedef struct z_bmqt_SessionOptions z_bmqt_SessionOptions; +int z_bmqt_SessionOptions__delete(z_bmqt_SessionOptions** options_obj); + int z_bmqt_SessionOptions__create(z_bmqt_SessionOptions** options_obj); const char* z_bmqt_SessionOptions__brokerUri(const z_bmqt_SessionOptions* options_obj);