Skip to content

Commit

Permalink
Got C producer working, will clean later
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Adotey committed Dec 1, 2023
1 parent ca92e7e commit 85adf3e
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 45 deletions.
100 changes: 100 additions & 0 deletions src/groups/z_bmq/z_bmqa/demo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <z_bmqa_session.h>
#include <z_bmqa_queueid.h>
#include <z_bmqa_messageeventbuilder.h>
#include <z_bmqa_message.h>
#include <string.h>

#if defined(__cplusplus)
extern "C" {
#endif

const int K_QUEUE_ID = 1;
const char K_QUEUE_URI[] = "bmq://bmq.test.mem.priority/test-queue";
const int K_NUM_MESSAGES = 5;

enum QueueFlags {
e_ADMIN = (1 << 0) // The queue is opened in admin mode (Valid only
// for BlazingMQ admin tasks)
,
e_READ = (1 << 1) // The queue is opened for consuming messages
,
e_WRITE = (1 << 2) // The queue is opened for posting messages
,
e_ACK = (1 << 3) // Set to indicate interested in receiving
// 'ACK' events for all message posted
};


void postEvent(const char* text,
z_bmqa_QueueId* queueId,
z_bmqa_Session* session){

z_bmqa_MessageEventBuilder* builder;
z_bmqa_MessageEventBuilder__create(&builder);

z_bmqa_Session__loadMessageEventBuilder(session, &builder);

z_bmqa_Message* message;

z_bmqa_MessageEventBuilder__startMessage(builder, &message);

z_bmqa_Message__setDataRef(message, text, (int)strlen(text));

z_bmqa_MessageEventBuilder__packMessage(builder, queueId);

const z_bmqa_MessageEvent* messageEvent;
z_bmqa_MessageEventBuilder__messageEvent(builder, &messageEvent);

z_bmqa_Session__post(session, messageEvent);
}


void produce(z_bmqa_Session* session){
z_bmqa_QueueId* queueId;

z_bmqa_QueueId__createFromNumeric(&queueId, K_QUEUE_ID);
z_bmqa_Session__openQueueSync(session,
queueId,
K_QUEUE_URI,
e_WRITE);

const char* messages[] = {"Hello world!",
"message 1",
"message 2",
"message 3",
"Good Bye!"};
for(int idx = 0; idx < 5; ++idx){
postEvent(messages[idx], queueId, session);
}

z_bmqa_Session__closeQueueSync(session, queueId);
}

int run_c_producer(){
z_bmqa_Session* session;
z_bmqt_SessionOptions* options;

z_bmqt_SessionOptions__create(&options);
z_bmqa_Session__create(&session, options);


//start the session
z_bmqa_Session__start(session, 1000);

produce(session);

//stop the session
z_bmqa_Session__stop(session);

printf("Good2\n");

// z_bmqa_Session__delete(&session);
// z_bmqt_SessionOptions__delete(&options);

return 0;

}

#if defined(__cplusplus)
}
#endif
3 changes: 2 additions & 1 deletion src/groups/z_bmq/z_bmqa/package/z_bmqa.mem
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ z_bmqa_session
z_bmqa_event
z_bmqa_confirmeventbuilder
z_bmqa_messageeventbuilder
z_bmqa_messageproperties
z_bmqa_messageproperties
z_bmqa_message
1 change: 1 addition & 0 deletions src/groups/z_bmq/z_bmqa/z_bmqa_confirmeventbuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern "C" {
#endif

typedef struct z_bmqa_ConfirmEventBuilder z_bmqa_ConfirmEventBuilder;
typedef struct z_bmqa_MessageConfirmationCookie z_bmqa_MessageConfirmationCookie;

int z_bmqa_ConfirmEventBuilder__create(z_bmqa_ConfirmEventBuilder** builder_obj);

Expand Down
2 changes: 1 addition & 1 deletion src/groups/z_bmq/z_bmqa/z_bmqa_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ int z_bmqa_Message__createEmpty(z_bmqa_Message** message_obj){
return 0;
}

int z_bmqa_Message_setDataRef(z_bmqa_Message** message_obj, const char* data, size_t length){
int z_bmqa_Message__setDataRef(z_bmqa_Message* message_obj, const char* data, int length){
using namespace BloombergLP;

bmqa::Message* message_ptr = reinterpret_cast<bmqa::Message*>(message_obj);
Expand Down
10 changes: 9 additions & 1 deletion src/groups/z_bmq/z_bmqa/z_bmqa_message.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
#ifndef INCLUDED_Z_BMQA_MESSAGE
#define INCLUDED_Z_BMQA_MESSAGE

#if defined(__cplusplus)
extern "C" {
#endif

typedef struct z_bmqa_Message z_bmqa_Message;

int z_bmqa_Message__createEmpty(z_bmqa_Message** message_obj);

int z_bmqa_Message_setDataRef(z_bmqa_Message** message_obj, const char* data, int length);
int z_bmqa_Message__setDataRef(z_bmqa_Message* message_obj, const char* data, int length);


#if defined(__cplusplus)
}
#endif

#endif
5 changes: 3 additions & 2 deletions src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ int z_bmqa_MessageEventBuilder__create(z_bmqa_MessageEventBuilder** builder_obj)
return 0;
}

int z_bmqa_MessageEventBuilder__startMessage(z_bmqa_MessageEventBuilder* builder_obj, z_bmqa_Message* out_obj) {
int z_bmqa_MessageEventBuilder__startMessage(z_bmqa_MessageEventBuilder* builder_obj, z_bmqa_Message** out_obj) {
using namespace BloombergLP;

bmqa::MessageEventBuilder* builder_ptr = reinterpret_cast<bmqa::MessageEventBuilder*>(builder_obj);
out_obj = reinterpret_cast<z_bmqa_Message*>(&builder_ptr->startMessage()); // uhhhhh
bmqa::Message* message_ptr = &builder_ptr->startMessage();
*out_obj = reinterpret_cast<z_bmqa_Message*>(message_ptr);

return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/groups/z_bmq/z_bmqa/z_bmqa_messageeventbuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ typedef struct z_bmqa_MessageEventBuilder z_bmqa_MessageEventBuilder;

int z_bmqa_MessageEventBuilder__create(z_bmqa_MessageEventBuilder** builder_obj);

int z_bmqa_MessageEventBuilder__startMessage(z_bmqa_MessageEventBuilder* builder_obj, z_bmqa_Message* out_obj);
int z_bmqa_MessageEventBuilder__startMessage(z_bmqa_MessageEventBuilder* builder_obj, z_bmqa_Message** out_obj);

int z_bmqa_MessageEventBuilder__packMessage(z_bmqa_MessageEventBuilder* builder_obj, const z_bmqa_QueueId* queueId);

Expand Down
28 changes: 11 additions & 17 deletions src/groups/z_bmq/z_bmqa/z_bmqa_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@
#include <bmqa_openqueuestatus.h>


int z_bmqa_Session__delete(z_bmqa_Session** session_obj) {
using namespace BloombergLP;

bmqa::Session* session_ptr = reinterpret_cast<bmqa::Session*>(session_obj);
delete session_ptr;
*session_obj = NULL;

return 0;
}

int z_bmqa_Session__create(z_bmqa_Session** session_obj , const z_bmqt_SessionOptions* options) {
using namespace BloombergLP;

Expand Down Expand Up @@ -46,19 +56,12 @@ int z_bmqa_Session__finalizeStop(z_bmqa_Session* session_obj){
}


int z_bmqa_Session__finalizeStop(z_bmqa_Session* session_obj){
using namespace BloombergLP;

bmqa::Session* session_ptr = reinterpret_cast<bmqa::Session*>(session_obj);
session_ptr->finalizeStop();
return 0;
}

int z_bmqa_Session__loadMessageEventBuilder(z_bmqa_Session* session_obj, z_bmqa_MessageEventBuilder** builder){
using namespace BloombergLP;

bmqa::Session* session_ptr = reinterpret_cast<bmqa::Session*>(session_obj);
bmqa::MessageEventBuilder* builder_ptr = reinterpret_cast<bmqa::MessageEventBuilder*>(builder);
bmqa::MessageEventBuilder* builder_ptr = reinterpret_cast<bmqa::MessageEventBuilder*>(*builder);

session_ptr->loadMessageEventBuilder(builder_ptr);
return 0;
Expand Down Expand Up @@ -129,15 +132,6 @@ int z_bmqa_Session__closeQueueSync(z_bmqa_Session* session_obj, z_bmqa_QueueId*
return 0;
}

int z_bmqa_Session__loadMessageEventBuilder(z_bmqa_Session* session_obj, z_bmqa_MessageEventBuilder** builder){
using namespace BloombergLP;

bmqa::Session* session_ptr = reinterpret_cast<bmqa::Session*>(session_obj);
bmqa::MessageEventBuilder* builder_ptr = reinterpret_cast<bmqa::MessageEventBuilder*>(builder);

session_ptr->loadMessageEventBuilder(builder_ptr);
return 0;
}

int z_bmqa_Session__post(z_bmqa_Session* session_obj, const z_bmqa_MessageEvent* event){
using namespace BloombergLP;
Expand Down
4 changes: 4 additions & 0 deletions src/groups/z_bmq/z_bmqa/z_bmqa_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ typedef struct EventHandlerData {
// }
// }

//

int z_bmqa_Session__delete(z_bmqa_Session** session_obj);

int z_bmqa_Session__create(z_bmqa_Session** session_obj, const z_bmqt_SessionOptions* options);

int z_bmqa_Session__createAsync(z_bmqa_Session** session_obj, z_bmqa_SessionEventHandler* eventHandler, const z_bmqt_SessionOptions* options);
Expand Down
27 changes: 5 additions & 22 deletions src/groups/z_bmq/z_bmqa/z_bmqa_session.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
// TEST DRIVER
#include <mwctst_testhelper.h>

#include <demo.h>

// CONVENIENCE
using namespace BloombergLP;
using namespace bsl;
Expand All @@ -37,26 +39,7 @@ using namespace bsl;

static void test1_session()
{
mwctst::TestHelper::printTestName("Create Session");

// Create default session

z_bmqa_Session* session;
z_bmqt_SessionOptions* options;
z_bmqt_SessionOptions__create(&options);
z_bmqa_Session__create(&session, options);

// Make sure 'k_BROKER_DEFAULT_PORT' and the default brokerUri are in sync
{
PV("CHECKING start() and stop()");

z_bmqa_Session__start(session, 1000);

z_bmqa_Session__stop(session);
// ASSERT_EQ(sessionOptions_cpp.brokerUri(), result);
}

// z_bmqa_Session__destroy(session);
run_c_producer();
}


Expand All @@ -69,8 +52,8 @@ int main(int argc, char* argv[])
TEST_PROLOG(mwctst::TestHelper::e_DEFAULT);

switch (_testCase) {
case 0:
case 1: test1_session(); break;
case 0: test1_session(); break;
case 1: printf("Good\n"); break;
default: {
cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl;
s_testStatus = -1;
Expand Down
10 changes: 10 additions & 0 deletions src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
#include <z_bmqt_sessionoptions.h>


int z_bmqt_SessionOptions__delete(z_bmqt_SessionOptions** options_obj) {
using namespace BloombergLP;

bmqt::SessionOptions* options_ptr = reinterpret_cast<bmqt::SessionOptions*>(options_obj);
delete options_ptr;
*options_obj = NULL;

return 0;
}

int z_bmqt_SessionOptions__create(z_bmqt_SessionOptions** options_obj) {
using namespace BloombergLP;
bmqt::SessionOptions* options_ptr = new bmqt::SessionOptions();
Expand Down
2 changes: 2 additions & 0 deletions src/groups/z_bmq/z_bmqt/z_bmqt_sessionoptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ extern "C" {

typedef struct z_bmqt_SessionOptions z_bmqt_SessionOptions;

int z_bmqt_SessionOptions__delete(z_bmqt_SessionOptions** options_obj);

int z_bmqt_SessionOptions__create(z_bmqt_SessionOptions** options_obj);

const char* z_bmqt_SessionOptions__brokerUri(const z_bmqt_SessionOptions* options_obj);
Expand Down

0 comments on commit 85adf3e

Please sign in to comment.