From bc432a7be647ad8441f3b4fd0947282a9fd60905 Mon Sep 17 00:00:00 2001 From: Ankur Saxena Date: Thu, 7 Dec 2023 15:29:27 -0500 Subject: [PATCH] Applied review feedback --- .../architecture/client_broker_protocol.md | 179 +++++++++--------- 1 file changed, 91 insertions(+), 88 deletions(-) diff --git a/docs/docs/architecture/client_broker_protocol.md b/docs/docs/architecture/client_broker_protocol.md index 23759494fb..c254c4b6b8 100644 --- a/docs/docs/architecture/client_broker_protocol.md +++ b/docs/docs/architecture/client_broker_protocol.md @@ -237,8 +237,8 @@ and broker, along with their purpose: request (see [this](https://github.com/bloomberg/blazingmq-sdk-java/blob/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/msg/OpenQueueResponse.java)). - **ConfigureQueueStream**: The request sent by the client to the broker to - configure the client's configuration for some queue which was opened earlier by the - client by making an `OpenQueue` request (see + configure the client's configuration for a queue which was opened earlier by + the client by making an `OpenQueue` request (see [this](https://github.com/bloomberg/blazingmq-sdk-java/blob/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/msg/ConfigureQueueStream.java)). The main field in this request is [`QueueStreamParameters`](https://github.com/bloomberg/blazingmq-sdk-java/blob/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/msg/ConfigureQueueStream.java). @@ -257,8 +257,9 @@ and broker, along with their purpose: - **CloseQueue**: The request sent by the client to the broker to indicate that client no longer wants to be attached to the queue (see [this](https://github.com/bloomberg/blazingmq-sdk-java/blob/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/msg/CloseQueue.java)). - A `CloseQueue` request is the opposite of an `OpenQueue` request. Note that a - `CloseQueue` request is always preceded by an `ConfigureQueueStream` request. + A `CloseQueue` request is the opposite of an `OpenQueue` request. Note that + a `CloseQueue` request is always preceded by an `ConfigureQueueStream` + request. - **Disconnect**: The request sent by the client to tear down the BlazingMQ session with the broker (see @@ -281,7 +282,7 @@ representation. As mentioned earlier in the article, every BlazingMQ network packet starts with an -`EventHeader` ([C++](https://github.com/bloomberg/blazingmq/blob/ca6491f69eea8d91733fa36ef3e82c4facc734fc/src/groups/bmq/bmqp/bmqp_protocol.h#L727), +`EventHeader`([C++](https://github.com/bloomberg/blazingmq/blob/ca6491f69eea8d91733fa36ef3e82c4facc734fc/src/groups/bmq/bmqp/bmqp_protocol.h#L727), [Java](https://github.com/bloomberg/blazingmq-sdk-java/blob/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventHeader.java)). ### Endianness @@ -308,11 +309,11 @@ the bytes seamlessly. As an example, [here](https://github.com/bloomberg/blazingmq/blob/69f0f8f5b188ca1eb07b9d262093f949729db538/src/groups/bmq/bmqp/bmqp_protocol.h#L779) we use the [`bdlb::BigEndianUint32`](https://github.com/bloomberg/bde/blob/cf44cbb2cc179077f687f5408d92d6949fae75af/groups/bdl/bdlb/bdlb_bigendian.h#L517) -type to represent a `uint32_t` to capture the `fragment` and `length` +type to represent an `uint32_t` to capture the `fragment` and `length` fields of an `EventHeader`. -Let's go over the wire layout of every binary message exchanged between the client and -the broker. +Let's go over the wire layout of every binary message exchanged between client +and the broker. ### PUT Event @@ -336,8 +337,8 @@ To describe above layout in words: [`PutHeader`](https://github.com/bloomberg/blazingmq/blob/e3ddd4fdc8e024e3abff96aa91555f042ce4e565/src/groups/bmq/bmqp/bmqp_protocol.h#L1337) contains fields like: - - `MessageWords`: Length of the entire PUT message (including options, properties, - payload and padding) in words (1 word == 4 bytes). + - `MessageWords`: Length of the entire PUT message (including options, + properties, payload and padding) in words (1 word == 4 bytes). - `OptionsWords`: Length of all options, if any, in words. @@ -580,8 +581,8 @@ encoding/decoding logic in two simple [components](https://github.com/bloomberg/blazingmq-sdk-java/blob/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/codec). On the outgoing path, the Java SDK also uses [`SchemaEventBuilder`](https://github.com/bloomberg/blazingmq-sdk-java/blob/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java) -to prepend [`EventHeader`] to the outgoing message. On the incoming side, the SDK -uses `JsonDecoderUtil` as shown +to prepend [`EventHeader`] to the outgoing message. On the incoming side, the +SDK uses `JsonDecoderUtil` as shown [here](https://github.com/bloomberg/blazingmq-sdk-java/blob/ed0eb848f5ac2897cfe32cc481575d689a4cb1c2/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/ControlEventImpl.java#L54). --- @@ -636,7 +637,7 @@ and have BlazingMQ client applications connect to the local BlazingMQ proxy which could listen on a fixed port number. Coming back to the TCP connection, SDK initiates the connection by calling some -flavor of the `connect` API in the underlying language or library being used. As +flavor of `connect` API in the underlying language or library being used. As an example, BlazingMQ Java SDK uses [`netty`](https://netty.io) for its TCP connection management and initiates a connection as shown [here](https://github.com/bloomberg/blazingmq-sdk-java/blob/ed0eb848f5ac2897cfe32cc481575d689a4cb1c2/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java#L370). @@ -645,25 +646,25 @@ the basic [`connect`](https://github.com/bloomberg/blazingmq-sdk-java/blob/ed0eb848f5ac2897cfe32cc481575d689a4cb1c2/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainProducerIT.java#L80) API provided by the Java language. -An important thing to note here is that both the BlazingMQ C++ and Java SDKs use -[non-blocking +An important thing to note here is that both the BlazingMQ C++ and Java SDKs +use [non-blocking I/O](https://beej.us/guide/bgnet/html//#slightly-advanced-techniques), which can help with asynchronous and flexible design in the SDK, but at the added -cost of more complexity. In our experience, the complexity is worth it, and -an SDK implementation in any language should consider using non-blocking I/O if +cost of more complexity. In our experience, the complexity is worth it, and an +SDK implementation in any language should consider using non-blocking I/O if possible. #### Negotiation -Once the `connect` operation succeeds, the BlazingMQ SDK will have a TCP connection -established with the broker. The next step is for the SDK to carry out a -handshake with the broker. This is known as *negotiation* in BlazingMQ. +Once the `connect` operation succeeds, the BlazingMQ SDK will have a TCP +connection established with the broker. The next step is for the SDK to carry +out a handshake with the broker. This is known as *negotiation* in BlazingMQ. Immediately after establishing a TCP connection, the SDK sends a [`NegotiationMessage`](https://github.com/bloomberg/blazingmq/blob/69f0f8f5b188ca1eb07b9d262093f949729db538/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd#L1592) to the broker. This is a schema message, and hence needs to be encoded. See -the [previous](#schema-messages-wire-layout) section for more details on the wire -layout. Note that `NegotationMessage` is one of the top level schema messages, -the other being +the [previous](#schema-messages-wire-layout) section for more details on the +wire layout. Note that `NegotationMessage` is one of the top level schema +messages, the other being [`ControlMessage`](https://github.com/bloomberg/blazingmq/blob/9b692fe25f74543e954a27e30b6b15b0ae057c8d/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd#L84). There is no way to discriminate between these two top level schema messages. It is implicit that the `NegotiationMessage` will only be used during the @@ -760,12 +761,12 @@ sending an [`OpenQueueResponse`](https://github.com/bloomberg/blazingmq/blob/9b692fe25f74543e954a27e30b6b15b0ae057c8d/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd#L191-L211). Let's go over the three fields in this type: -- `originalRequest`: A field whose type is `OpenQueue`, and it simply - carries the `OpenQueue` request sent by the client. +- `originalRequest`: A field whose type is `OpenQueue`, and it simply carries + the `OpenQueue` request sent by the client. -- `routingConfiguration`: An integer which indicates information about the message - routing strategy of the queue, as indicated by the broker. Possible values - for the flag can be found +- `routingConfiguration`: An integer which indicates information about the + message routing strategy of the queue, as indicated by the broker. Possible + values for the flag can be found [here](https://github.com/bloomberg/blazingmq/blob/9b692fe25f74543e954a27e30b6b15b0ae057c8d/src/groups/bmq/bmqp/bmqp_routingconfigurationutils.h#L51-L61). - `deduplicationTimeMs`: An integer which indicates the time interval (in ms) @@ -837,8 +838,8 @@ Let's look at the fields of this - `expression`: This field, which is of type [`Expression`](https://github.com/bloomberg/blazingmq/blob/9b692fe25f74543e954a27e30b6b15b0ae057c8d/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd#L412C4-L426) - contains the actual subscription expression as a string type as well as - expression grammar version. + contains the actual subscription expression as a string type as well as the + version of the expression grammar. - `consumers`: This field is an array of type [`ConsumerInfo`](https://github.com/bloomberg/blazingmq/blob/9b692fe25f74543e954a27e30b6b15b0ae057c8d/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd#L428-L453), @@ -870,16 +871,16 @@ both SDKs, the `post()` API is asynchronous i.e., the API does not wait for the BlazingMQ broker to respond with ACK messages. In fact, in some cases, the API does not write the PUT messages to the TCP socket. Instead, the SDK just accepts the messages, adds them to an internal buffer, and sends them later. -This can occur if the TCP layer is enforcing some flow control, or in case the client is -not connected to the broker, etc. +This can occur if the TCP layer is enforcing some flow control, or in case the +client is not connected to the broker, etc. -Keeping the above complications aside, the simplest way to send PUT messages is to -do what the [Java +Keeping the above complications aside, the simplest way to send PUT messages is +to do what the [Java SDK](https://github.com/bloomberg/blazingmq-sdk-java/blob/ed0eb848f5ac2897cfe32cc481575d689a4cb1c2/bmq-sdk/src/main/java/com/bloomberg/bmq/Queue.java#L109-L115) -does. A public class called `PutMessage` could be created, with data -members like the payload, message properties and a callback which will be -invoked by the SDK when it receives the ACK message for the PUT message from -the broker. `PutMessage` could look like this (in pseudo-code): +does. A public class called `PutMessage` could be created, with data members +like the payload, message properties and a callback which will be invoked by +the SDK when it receives the ACK message for the PUT message from the broker. +`PutMessage` could look like this (in pseudo-code): ``` class PutMessage { @@ -942,7 +943,7 @@ PUT message(s) specified by the application, by following guidelines application is calling `post`, it must have opened the queue with *WRITE* intent. -- Ensure that the queue is in the process of being closed or is already closed. +- Ensure that the queue is in the process of being closed or already closed. - Ensure that the connection with BlazingMQ broker is up, and not stopping or disconnected. @@ -959,16 +960,16 @@ API is invoked. After posting one or more messages, the SDK should expect a response from the BlazingMQ broker in the form of an ACK (acknowledgement) message containing the result of the `post` operation. Multiple ACK messages can be delivered by the -broker in one ACK event, so the SDK must be able to handle them. The wire layout -of an ACK event was discussed in a [previous](#ack-event) section. - -One of the fields in ACK message is `MessageGUID`, which is used to -correlate the ACK message to its PUT message. For example, when the -application calls the `post` API, the SDK should generate a `MessageGUID`, add it to -the PUT header of that message before sending it, and also keep track of the -`MessageGUID` along with any relevant information for that PUT message (e.g., -the `AckCallback` specified by the application for that PUT message). When the -ACK message arrives, SDK can extract `MessageGUID` from it, retrieve the +broker in one ACK event, so the SDK must be able to handle them. The wire +layout of an ACK event was discussed in a [previous](#ack-event) section. + +One of the fields in ACK message is `MessageGUID`, which is used to correlate +the ACK message to its PUT message. For example, when the application calls +the `post` API, SDK should generate a `MessageGUID`, add it to the PUT header +of that message before sending it, and also keep track of the `MessageGUID` +along with any relevant information for that PUT message (e.g., the +`AckCallback` specified by the application for that PUT message). When the ACK +message arrives, SDK can extract `MessageGUID` from it, retrieve the `AckCallback` and then invoke it to deliver the result to the application. ### Receiving Messages (Consumers) @@ -997,9 +998,9 @@ multiple PUSH messages to the application in one go. #### Consume API Implementation -The wire layout of a PUSH event was described in a [previous](#push-event) section. -As far as implementation is concerned, SDK logic should keep some of these things -in mind when receiving and dispatching a PUSH event: +The wire layout of a PUSH event was described in a [previous](#push-event) +section. As far as implemention is concerned, the SDK logic should keep some +of these things in mind when receiving and dispatching a PUSH event: - Ensure that the queue is still open, and not closed or being closed by the application. @@ -1025,12 +1026,11 @@ CONFIRM message. Similar to the **Post API**, C++ and Java SDKs support batching of CONFIRM messages i.e., consumer application can send a batch of CONFIRM messages in one shot instead of sending them one by one. Batching can improve performance. -Additionally, in both SDKs, the `confirmMessage()` API is -asynchronous; i.e., the API may not immediately write the CONFIRM messages to the TCP -socket. Instead, the SDK just accepts the CONFIRM messages, adds them to an -internal buffer, and sends them later. This can occur if the TCP layer is -enforcing some flow control, or in case the SDK is not connected to the broker, -etc. +Additionally, in both SDKs, the `confirmMessage()` API is asynchronous; i.e., +the API may not immediately write the CONFIRM messages to the TCP socket. +Instead, the SDK just accepts the CONFIRM messages, adds them to an internal +buffer, and sends them later. This can occur if the TCP layer is enforcing +some flow control, or in case the SDK is not connected to the broker, etc. Keeping above complications aside, the simplest way to send CONFIRM messages is to do what the [Java @@ -1056,10 +1056,10 @@ request. ### Closing a Queue -Once a client application no longer wants to produce to or consume from a queue -that it previously opened, it should close the queue. Closing a queue will not -delete the queue in BlazingMQ backend. It will simply detach the application -from the queue. +Once a client application no longer wants to produce to or consumer from a +queue that it previously opened, it should close the queue. Closing a queue +will not delete the queue in BlazingMQ backend. It will simply detach the +application from the queue. Just like opening a queue, closing a queue is a two step process (i.e., it requires two pairs of request/response exchanges with the broker). Both of @@ -1102,11 +1102,11 @@ sending a > 1) As the second step in the open-queue workflow (i.e., when an application > calls one of the flavors of the `openQueue` APIs). > -> 2) As a standalone request when an application calls one of the flavors of the -> `configureQueue` APIs. +> 2) As a standalone request when an application calls one of the flavors of +> the `configureQueue` APIs. > -> 3) As the first step in the close-queue workflow (i.e., when an application calls -> one of the flavors of the `closeQueue` APIs). +> 3) As the first step in the close-queue workflow (i.e., when application +> calls one of the flavors of the `closeQueue` APIs). ### Stopping a Session with BlazingMQ Broker @@ -1118,9 +1118,10 @@ application can stop the BlazingMQ session at any time. Stopping the BlazingMQ session is typically done by calling a [`stop`](https://github.com/bloomberg/blazingmq-sdk-java/blob/358eda1e6ea7917e63ee6fb50d15ecd5873dbf56/bmq-sdk/src/main/java/com/bloomberg/bmq/Session.java#L512-L524) method on the top level Session/Connection/Context object. This method needs -to carry out three things -- it needs to inform the broker about its intent to disconnect and -wait for a response from the broker, and then finally it needs to close the TCP socket with -the broker. Let's look into these steps in detail. +to carry out three things -- it needs to inform the broker about its intent to +disconnect intent and wait for a response from the broker, and then finally it +needs to close the TCP socket with the broker. Let's look into these steps in +detail. #### `Disconnect` Request @@ -1133,16 +1134,16 @@ it responds to this request by sending a [`DisconnectResponse`](https://github.com/bloomberg/blazingmq/blob/58044d8e4579665fffa0419df820c8be5cdbc2eb/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd#L136-L147). A guarantee provided by the broker is that once the `DisconnectResponse` has been sent, the broker will not send any other -message/request/response/notification to the client. Logically, the *Disconnect* -request/response pair can be thought of as a way to ensure that the "pipe" -between client and broker is empty. +message/request/response/notification to the client. Logically, the +*Disconnect* request/response pair can be thought of as a way to ensure that +the "pipe" between client and broker is empty. #### Closing the TCP Connection Once the SDK receives the `DisconnectResponse`, it can proceed to close the TCP socket with the broker, and release any resources associated with the -connection. Care must be taken to ensure that the resources are released in the -right order without any dangling references or leaks. +connection. Care must be taken to ensure that the resources are released in +the right order without any dangling references or leaks. --- @@ -1186,12 +1187,13 @@ find useful: 1. The network I/O logic could be made asynchronous (i.e., could use non-blocking sockets). This is not very important for higher level - languages like Python, etc. but is for languages like Rust, etc. Non-blocking - I/O can enable client applications to achieve higher throughput, at the cost - of higher complexity of SDK implementation. One could leverage some well - known open source libraries which make it easier to achieve async I/O. For - example, the Java SDK uses - [`netty`](https://github.com/bloomberg/blazingmq-sdk-java/tree/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net) to achieve async I/O fairly easily. + languages like Python, etc. but is for languages like Rust, etc. + Non-blocking I/O can enable client appliactions to achieve higher + throughput, at the cost of higher complexity of SDK implementation. One + could leverage some well known open source libraries which make it easier to + achieve async I/O. For example, the Java SDK uses + [`netty`](https://github.com/bloomberg/blazingmq-sdk-java/tree/main/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net) + to achieve async I/O fairly easily. 2. For SDKs which are highly asynchronous in their implementation, it can be difficult to reason about when multiple events are occurring concurrently. @@ -1199,14 +1201,15 @@ find useful: disconnection with the BlazingMQ broker, and timeout of an outstanding *OpenQueue* request could occur at the same time, and it might become unreasonably difficult to reason about the code. As one of the potential - solutions, authors may find a Finite-State Machine-based (FSM) approach easy to implement and - reason about. In this approach, all states, state transitions, events and - actions can be codified in the form of enums, callbacks, etc. Any incoming - or outgoing event (initiated by any entity like the user, timer, network, - etc) on the queue or the top level session object can be applied to the - appropriate FSM (queue or session), and desired outcome (state transition) - and action can dispatched from the FSM table. As an example, the C++ SDK - takes this approach (but only partially) by introducing concepts like: + solutions, authors may find a Finite-State Machine-based (FSM) approach easy + to implement and reason about. In this approach, all states, state + transitions, events and actions can be codified in the form of enums, + callbacks, etc. Any incoming or outgoing event (initiated by any entity + like the user, timer, network, etc) on the queue or the top level session + object can be applied to the appropriate FSM (queue or session), and desired + outcome (state transition) and action can dispatched from the FSM table. As + an example, the C++ SDK takes this approach (but only partially) by + introducing concepts like: - [`QueueFSM`](https://github.com/bloomberg/blazingmq/blob/58044d8e4579665fffa0419df820c8be5cdbc2eb/src/groups/bmq/bmqimp/bmqimp_brokersession.h#L456)