Skip to content

Commit

Permalink
Added support for empty payload on messages
Browse files Browse the repository at this point in the history
Added support for default message with a simple payload (JSON)
Added support (fixed issue) with --stdin, a command-line payload option
Added support for --file to specify file (content) as payload on send, request and reply commands
Added support for --content-type to influence payload print & selection of message type
  • Loading branch information
gvensan committed Feb 19, 2024
1 parent 28f5973 commit 709e858
Show file tree
Hide file tree
Showing 19 changed files with 511 additions and 1,211 deletions.
29 changes: 20 additions & 9 deletions PARAMETERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@ Options:
/* MESSAGE SETTINGS */
--topic <TOPIC...> the message topic(s) (default: ["solace/try/me"])
--message <MESSAGE> the message body (a default payload)
--file <FILENAME> the filename containing the message content
--message <MESSAGE> the message body
--default-message use default message body
--file <FILENAME> filename containing the message content
--stdin read the message body from stdin (default: false)
--count <COUNT> the number of events to publish (default: 1)
--interval <MILLISECONDS> the time to wait between publish (default: 1000)
--time-to-live <MILLISECONDS> the time before a message is discarded or moved to a DMQ
--dmq-eligible [BOOLEAN] the DMQ eligible flag
--dmq-eligible [BOOLEAN] the DMQ eligible flag (default: true)
--partition-key <KEY> the simulated partition key option (SECOND or MILLISECOND,
derives a value from publish time and set as partition key)
--partition-keys <KEY...> the partition key(s) list
/* CONFIGURATION SETTINGS */
--config <CONFIG_FILE> the configuration file (default: "stm-cli-config.json")
Expand Down Expand Up @@ -104,6 +108,7 @@ Options:
--delivery-mode <MODE> [advanced] the application-requested message delivery mode 'DIRECT' or 'PERSISTENT' (default: "PERSISTENT")
--reply-to-topic <TOPIC> [advanced] string which is used as the topic name for a response message
--user-properties <PROPS...> [advanced] the user properties (e.g., "name1: value1" "name2: value2")
--content-type <CONTENT_TYPE> [advanced] payload content type (default: "text/plain")
--output-mode <MODE> [advanced] message print mode: COMPACT, PRETTY, NONE
/* HELP OPTIONS */
Expand All @@ -128,9 +133,9 @@ Options:
--vpn <VPN> the message VPN name (default: "default")
--username <USERNAME> the username (default: "default")
--password <PASSWORD> the password (default: "default")
--topic <TOPIC...> the message topic(s) (default: ["solace/try/me"])
/* QUEUE SETTINGS */
--topic <TOPIC...> the message topic(s) (default: ["solace/try/me"])
--queue <QUEUE> the message queue
/* CONFIGURATION SETTINGS */
Expand Down Expand Up @@ -169,8 +174,9 @@ Options:
--keepalive-interval-limit <NUMBER> [advanced] the maximum number of consecutive Keep-Alive messages that can be sent without receiving a response before the session is declared down
--receive-timestamps [BOOLEAN] [advanced] include a receive timestamp on received messages
--reapply-subscriptions [BOOLEAN] [advanced] reapply subscriptions upon calling on a disconnected session (default: true)
--output-mode <MODE> [advanced] message print mode: COMPACT, PRETTY, NONE
--acknowledge-mode <MODE> [advanced] the acknowledgement mode - AUTO or CLIENT (default: "AUTO")
--content-type <CONTENT_TYPE> [advanced] payload content type (default: "text/plain")
--output-mode <MODE> [advanced] message print mode: COMPACT, PRETTY, NONE
--log-level <LEVEL> [advanced] solace log level, one of values: FATAL, ERROR, WARN, INFO, DEBUG, TRACE (default: "ERROR")
/* HELP OPTIONS */
Expand Down Expand Up @@ -199,8 +205,9 @@ Options:
/* MESSAGE SETTINGS */
--topic <TOPIC> the message topic (default: "solace/try/me/request")
--message <MESSAGE> the message body (a default payload)
--file <FILENAME> the filename containing the message content
--message <MESSAGE> the message body
--default-message use default message body
--file <FILENAME> filename containing the message content
--stdin read the message body from stdin (default: false)
--time-to-live <MILLISECONDS> the time before a message is discarded or moved to a DMQ
--timeout <MILLISECONDS> the timeout value
Expand Down Expand Up @@ -256,6 +263,7 @@ Options:
--delivery-mode <MODE> [advanced] the application-requested message delivery mode 'DIRECT' or 'PERSISTENT' (default: "PERSISTENT")
--reply-to-topic <TOPIC> [advanced] string which is used as the topic name for a response message
--user-properties <PROPS...> [advanced] the user properties (e.g., "name1: value1" "name2: value2")
--content-type <CONTENT_TYPE> [advanced] payload content type (default: "text/plain")
--output-mode <MODE> [advanced] message print mode: COMPACT, PRETTY, NONE
/* HELP OPTIONS */
Expand Down Expand Up @@ -283,8 +291,10 @@ Options:
/* MESSAGE SETTINGS */
--topic <TOPIC...> the message topic(s) (default: ["solace/try/me"])
--message <MESSAGE> the message body (a default payload)
--file <FILENAME> the filename containing the message content
--message <MESSAGE> the message body
--default-message use default message body
--file <FILENAME> filename containing the message content
--stdin read the message body from stdin (default: false)
--time-to-live <MILLISECONDS> the time before a message is discarded or moved to a DMQ
--dmq-eligible [BOOLEAN] the DMQ eligible flag
--partition-key <KEY> the partition key (SECOND or MILLISECOND, derives a value from publish time and set as partition key)
Expand Down Expand Up @@ -336,6 +346,7 @@ Options:
--correlation-key <CORRELATION_KEY> [advanced] the application-provided message correlation key for acknowledgement management
--reply-to-topic <TOPIC> [advanced] string which is used as the topic name for a response message
--user-properties <PROPS...> [advanced] the user properties (e.g., "name1: value1" "name2: value2")
--content-type <CONTENT_TYPE> [advanced] payload content type (default: "text/plain")
--output-mode <MODE> [advanced] message print mode: COMPACT, PRETTY, NONE
/* HELP OPTIONS */
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
"ascii-table": "^0.0.9",
"chalk": "~4.1.2",
"commander": "^11.1.0",
"concat-stream": "^2.0.0",
"core-js": "^3.26.0",
"express": "^4.18.2",
"live-server": "^1.2.2",
"node-localstorage": "^3.0.5",
"opener": "^1.5.2",
"pkg": "^5.8.1",
"prompt-sync": "^4.2.0",
"readline": "^1.3.0",
"signale": "^1.4.0",
"solclientjs": "^10.15.0",
"ts-node": "^10.9.1",
Expand Down
34 changes: 29 additions & 5 deletions src/common/publish-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import solace, { Message } from "solclientjs";
import { Logger } from '../utils/logger'
import { LogLevel, MessageDeliveryModeType } from "solclientjs";
import { STM_CLIENT_CONNECTED, STM_CLIENT_DISCONNECTED, STM_EVENT_PUBLISHED } from "../utils/controlevents";
import { getDefaultClientName } from "../utils/defaults";
import { getDefaultClientName, getType } from "../utils/defaults";
import { VisualizeClient } from "./visualize-client";
import { randomUUID } from "crypto";
const { uuid } = require('uuidv4');
Expand Down Expand Up @@ -155,7 +155,7 @@ export class SolaceClient extends VisualizeClient {
}

// Publish a message on a topic
publish(topicName: string, payload: string | Buffer, iteration: number = 0) {
publish(topicName: string, payload: string | Buffer | undefined, contentType: string, iteration: number = 0) {
if (this.exited) return;

if (!this.session) {
Expand All @@ -166,7 +166,31 @@ export class SolaceClient extends VisualizeClient {
if (!topicName.startsWith('@STM')) Logger.await('publishing...');
let message = solace.SolclientFactory.createMessage();
message.setDestination(solace.SolclientFactory.createTopicDestination(topicName));
message.setBinaryAttachment(JSON.stringify(payload));
if (payload) {
if (contentType === 'application/xml' || contentType === 'text/plain') {
if (typeof payload === 'string')
message.setSdtContainer(solace.SDTField.create(solace.SDTFieldType.STRING, payload));
else
message.setSdtContainer(solace.SDTField.create(solace.SDTFieldType.STRING, JSON.stringify(payload)));
} else if (contentType === 'application/json') {
message.setSdtContainer(solace.SDTField.create(solace.SDTFieldType.STRING, JSON.stringify(payload)));
} else {
if (typeof payload === 'object') {
const encoder = new TextEncoder();
const result = encoder.encode(JSON.stringify(payload));
message.setBinaryAttachment(result);
} else if (typeof payload === 'string') {
const encoder = new TextEncoder();
const result = encoder.encode(payload);
message.setBinaryAttachment(result);
} else {
message.setBinaryAttachment(payload);
}
}
} else {
message.setSdtContainer(solace.SDTField.create(solace.SDTFieldType.STRING, ""));
}

message.setCorrelationKey(this.options.correlationKey ? this.options.correlationKey : randomUUID());
this.options.deliveryMode && message.setDeliveryMode(deliveryModeMap.get(this.options.deliveryMode.toUpperCase()) as MessageDeliveryModeType);
this.options.timeToLive && message.setTimeToLive(this.options.timeToLive);
Expand Down Expand Up @@ -207,8 +231,8 @@ export class SolaceClient extends VisualizeClient {
message.setUserPropertyMap(propertyMap);
}

Logger.logSuccess(`message published to topic ${topicName}`)
Logger.printMessage(message.dump(0), message.getUserPropertyMap(), message.getBinaryAttachment(), this.options.outputMode);
Logger.logSuccess(`message published to topic - ${message.getDestination()}, type - ${getType(message)}`)
Logger.dumpMessage(message, this.options.contentType, this.options.outputMode);
this.session.send(message);
this.publishVisualizationEvent(this.session, this.options, STM_EVENT_PUBLISHED, {
type: 'sender', deliveryMode: message.getDeliveryMode(), topicName, clientName: this.clientName, uuid: uuid(), msgId: message.getApplicationMessageId()
Expand Down
15 changes: 6 additions & 9 deletions src/common/receive-client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import solace, { LogLevel } from "solclientjs";
import { Logger } from '../utils/logger'
import { STM_CLIENT_CONNECTED, STM_CLIENT_DISCONNECTED, STM_EVENT_PUBLISHED, STM_EVENT_RECEIVED } from "../utils/controlevents";
import { getDefaultClientName } from "../utils/defaults";
import { getDefaultClientName, getType } from "../utils/defaults";
import { VisualizeClient } from "./visualize-client";
const { uuid } = require('uuidv4');

Expand Down Expand Up @@ -77,10 +77,10 @@ export class SolaceClient extends VisualizeClient {

//The UP_NOTICE dictates whether the session has been established
this.session.on(solace.SessionEventCode.UP_NOTICE, (sessionEvent: solace.SessionEvent) => {
Logger.logSuccess('=== ' + this.clientName + ' successfully connected and ready to receive events. ===');
this.publishVisualizationEvent(this.session, this.options, STM_CLIENT_CONNECTED, {
type: 'receiver', clientName: this.clientName, uuid: uuid()
})
Logger.logSuccess('=== ' + this.clientName + ' successfully connected and ready to receive events. ===');
resolve();
});

Expand Down Expand Up @@ -146,7 +146,7 @@ export class SolaceClient extends VisualizeClient {

//Message callback function
this.session.on(solace.SessionEventCode.MESSAGE, (message:any) => {
Logger.logSuccess(`message received - ${message.getDestination()}`)
Logger.logSuccess(`message Received - ${message.getDestination()}, type - ${getType(message)}`)

//Get the topic name from the message's destination
let topicName: string = message.getDestination().getName();
Expand All @@ -166,15 +166,12 @@ export class SolaceClient extends VisualizeClient {
matched = matched || topicName.match(regexSub) !== null;
if (matched) break;
}
if (!matched) {
Logger.logError('💣💣 Hmm.. received message on an unsubscribed topic 💥💥')
}
}

this.publishVisualizationEvent(this.session, this.options, STM_EVENT_RECEIVED, {
type: 'receiver', deliveryMode: message.getDeliveryMode(), topicName, clientName: this.clientName, uuid: uuid(), msgId: message.getApplicationMessageId()
})
Logger.printMessage(message.dump(0), message.getUserPropertyMap(), message.getBinaryAttachment(), this.options.outputMode);
Logger.dumpMessage(message, this.options.contentType, this.options.outputMode);
});
} catch (error: any) {
Logger.logDetailedError('session creation failed - ', error.toString())
Expand Down Expand Up @@ -391,8 +388,8 @@ export class SolaceClient extends VisualizeClient {
});
// Define message received event listener
this.receiver.messageReceiver.on(solace.MessageConsumerEventName.MESSAGE, (message: any) => {
Logger.logSuccess(`message Received - ${message.getDestination()}`)
Logger.printMessage(message.dump(0), message.getUserPropertyMap(), message.getBinaryAttachment(), this.options.outputMode);
Logger.logSuccess(`message Received - ${message.getDestination()}, type - ${getType(message)}`)
Logger.dumpMessage(message, this.options.contentType, this.options.outputMode);
this.publishVisualizationEvent(this.session, this.options, STM_EVENT_RECEIVED, {
type: 'receiver', deliveryMode: message.getDeliveryMode(), queue: this.receiver.queue, topicName: message.getDestination().getName(),
clientName: this.clientName, uuid: uuid(), msgId: message.getApplicationMessageId()
Expand Down
47 changes: 35 additions & 12 deletions src/common/reply-client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import solace, { LogLevel, MessageDeliveryModeType } from "solclientjs";
import { Logger } from '../utils/logger'
import { getDefaultClientName, getDefaultMessage } from "../utils/defaults";
import { getDefaultClientName, getDefaultMessage, getType } from "../utils/defaults";
import { VisualizeClient } from "./visualize-client";
import { STM_CLIENT_CONNECTED, STM_CLIENT_DISCONNECTED, STM_EVENT_REPLIED, STM_EVENT_REQUEST_RECEIVED } from "../utils/controlevents";
const { uuid } = require('uuidv4');
Expand All @@ -26,7 +26,7 @@ export class SolaceClient extends VisualizeClient {
session:any = null;
clientName:string = "";
payload:any = null;
defaultMessage:boolean = false;
contentType:string = "";

constructor(options:any) {
super();
Expand Down Expand Up @@ -115,7 +115,7 @@ export class SolaceClient extends VisualizeClient {
type: 'replier', topicName: request.getDestination().getName(), clientName: this.clientName, uuid: uuid(), msgId: message.getApplicationMessageId()
})

this.reply(request, this.payload);
this.reply(request, this.payload, this.contentType);
} catch (error:any) {
Logger.logDetailedError('send reply failed - ', error.toString())
if (error.cause?.message) Logger.logDetailedError(``, `${error.cause?.message}`)
Expand All @@ -139,7 +139,7 @@ export class SolaceClient extends VisualizeClient {
}

// Subscribes to request topic on Solace PubSub+ Event Broker
subscribe = (topicNames: any, payload: any, defaultMessage: boolean) => {
subscribe = (topicNames: any, payload: string | Buffer | undefined, contentType: string) => {
//Check if the session has been established
if (!this.session) {
Logger.logWarn("cannot subscribe because not connected to Solace message router!");
Expand All @@ -148,7 +148,7 @@ export class SolaceClient extends VisualizeClient {

try {
this.payload = payload;
this.defaultMessage = defaultMessage;
this.contentType = contentType;
topicNames.forEach((topicName:any) => {
Logger.logInfo(`subscribing to ${topicName}`);

Expand Down Expand Up @@ -189,14 +189,37 @@ export class SolaceClient extends VisualizeClient {
}
};

reply = (message:any, payload:any) => {
Logger.logSuccess('request received');
Logger.printMessage(message.dump(0), message.getUserPropertyMap(), message.getBinaryAttachment(), this.options.outputMode);
reply = (message:any, payload: string | Buffer | undefined, contentType: string) => {
Logger.logSuccess(`request Received - ${message.getDestination()}, type - ${getType(message)}`)
Logger.dumpMessage(message, this.options.contentType, this.options.outputMode);
Logger.await(`replying to request on topic '${message.getDestination().getName()}'...`);
if (this.session !== null) {
if (this.defaultMessage) payload = getDefaultMessage();
var reply = solace.SolclientFactory.createMessage();
reply.setBinaryAttachment(JSON.stringify(payload));
if (payload) {
if (contentType === 'application/xml' || contentType === 'text/plain') {
if (typeof payload === 'string')
reply.setSdtContainer(solace.SDTField.create(solace.SDTFieldType.STRING, payload));
else
reply.setSdtContainer(solace.SDTField.create(solace.SDTFieldType.STRING, JSON.stringify(payload)));
} else if (contentType === 'application/json') {
reply.setSdtContainer(solace.SDTField.create(solace.SDTFieldType.STRING, JSON.stringify(payload)));
} else {
if (typeof payload === 'object') {
const encoder = new TextEncoder();
const result = encoder.encode(JSON.stringify(payload));
reply.setBinaryAttachment(result);
} else if (typeof payload === 'string') {
const encoder = new TextEncoder();
const result = encoder.encode(payload);
reply.setBinaryAttachment(result);
} else {
reply.setBinaryAttachment(payload);
}
}
} else {
reply.setSdtContainer(solace.SDTField.create(solace.SDTFieldType.STRING, ""));
}

reply.setApplicationMessageId(message.getApplicationMessageId());
if (this.options.replyToTopic)
reply.setDestination(solace.SolclientFactory.createTopicDestination(this.options.replyToTopic));
Expand All @@ -207,7 +230,7 @@ export class SolaceClient extends VisualizeClient {
this.session.sendReply(message, reply);
// this.session.send(reply)
Logger.logSuccess(`reply sent`);
Logger.printMessage(reply.dump(0), reply.getUserPropertyMap(), reply.getBinaryAttachment(), this.options.outputMode);
Logger.dumpMessage(reply, this.options.contentType, this.options.outputMode);

this.publishVisualizationEvent(this.session, this.options, STM_EVENT_REPLIED, {
type: 'replier', topicName: message.getDestination().getName() + ' [reply]', clientName: this.clientName, uuid: uuid(), msgId: reply.getApplicationMessageId()
Expand Down Expand Up @@ -246,4 +269,4 @@ export class SolaceClient extends VisualizeClient {
process.exit(0);
}, 1500); // wait for 1 second to finish
};
}
}
Loading

0 comments on commit 709e858

Please sign in to comment.