diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..5694b92 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,7 @@ +root = true + +[*] +end_of_line = lf +indent_style = space +insert_final_newline = true +trim_trailing_whitespace = true diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 0845b7a..f34dea8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -7,7 +7,23 @@ on: branches: [ master ] jobs: - build: + lint: + runs-on: ubuntu-latest + steps: + - name: Install protocol buffer compiler + uses: arduino/setup-protoc@v1 + - uses: actions/checkout@v3 + - uses: Swatinem/rust-cache@v1 + - name: Build + run: cargo build + - name: Check Clippy + run: cargo clippy --tests --all-features -- -D warnings + - name: Install nightly rustfmt + run: rustup toolchain install nightly --component rustfmt + - name: Check format + run: cargo +nightly fmt --all --check + + test: runs-on: ubuntu-latest strategy: matrix: @@ -18,8 +34,8 @@ jobs: - name: Start Pulsar Standalone Container run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -d -e GITHUB_ACTIONS=true -e CI=true streamnative/pulsar:${{ matrix.pulsar-version }} /pulsar/bin/pulsar standalone - uses: actions/checkout@v3 - - name: Build - run: cargo build --verbose - uses: Swatinem/rust-cache@v1 + - name: Build + run: cargo build - name: Run tests run: cargo test -- --nocapture diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f84b8ea..dab6014 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,10 +1,9 @@ -# Contributing to ulsar-rs +# Contributing to pulsar-rs -Thank you for your interest in contributing to pulsar-rs! There are many ways to contribute -and we appreciate all of them. +Thank you for your interest in contributing to pulsar-rs! There are many ways to contribute and we appreciate all of them. -As of 2022/08, maintainers of this repo have agreed to stabilize this repo since it's quality is not upto java pulsar client. -Thus, issues that are beyond pulsar version 2.10.x are welcomed but will not be prioritized until 2022/12. -If you would like to contribute to this crate, solving issues listed on this [page](https://github.com/streamnative/pulsar-rs/issues/224) is strongly encouraged. +As of 2022/08, maintainers of this repo have agreed to stabilize this repo since it's quality is not upto java pulsar client. Thus, issues that are beyond pulsar version 2.10.x are welcomed but will not be prioritized until 2022/12. + +If you would like to contribute to this crate, solving issues listed on this [page](https://github.com/streamnative/pulsar-rs/issues/224) is strongly encouraged. Thanks! diff --git a/PulsarApi.proto b/PulsarApi.proto index dd7fb3f..836c1e9 100644 --- a/PulsarApi.proto +++ b/PulsarApi.proto @@ -16,1051 +16,1051 @@ * specific language governing permissions and limitations * under the License. */ - syntax = "proto2"; - - package pulsar.proto; - option java_package = "org.apache.pulsar.common.api.proto"; - option optimize_for = LITE_RUNTIME; - - message Schema { - enum Type { - None = 0; - String = 1; - Json = 2; - Protobuf = 3; - Avro = 4; - Bool = 5; - Int8 = 6; - Int16 = 7; - Int32 = 8; - Int64 = 9; - Float = 10; - Double = 11; - Date = 12; - Time = 13; - Timestamp = 14; - KeyValue = 15; - Instant = 16; - LocalDate = 17; - LocalTime = 18; - LocalDateTime = 19; - ProtobufNative = 20; - } - - required string name = 1; - required bytes schema_data = 3; - required Type type = 4; - repeated KeyValue properties = 5; - - } - - message MessageIdData { - required uint64 ledgerId = 1; - required uint64 entryId = 2; - optional int32 partition = 3 [default = -1]; - optional int32 batch_index = 4 [default = -1]; - repeated int64 ack_set = 5; - optional int32 batch_size = 6; - - // For the chunk message id, we need to specify the first chunk message id. - optional MessageIdData first_chunk_message_id = 7; - } - - message KeyValue { - required string key = 1; - required string value = 2; - } - - message KeyLongValue { - required string key = 1; - required uint64 value = 2; - } - - message IntRange { - required int32 start = 1; - required int32 end = 2; - } - - message EncryptionKeys { - required string key = 1; - required bytes value = 2; - repeated KeyValue metadata = 3; - } - - enum CompressionType { - NONE = 0; - LZ4 = 1; - ZLIB = 2; - ZSTD = 3; - SNAPPY = 4; - } - - enum ProducerAccessMode { - Shared = 0; // By default multiple producers can publish on a topic - Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected. - WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access - ExclusiveWithFencing = 3; // Require exclusive access for producer. Fence out old producer. - } - - message MessageMetadata { - required string producer_name = 1; - required uint64 sequence_id = 2; - required uint64 publish_time = 3; - repeated KeyValue properties = 4; - - // Property set on replicated message, - // includes the source cluster name - optional string replicated_from = 5; - //key to decide partition for the msg - optional string partition_key = 6; - // Override namespace's replication - repeated string replicate_to = 7; - optional CompressionType compression = 8 [default = NONE]; - optional uint32 uncompressed_size = 9 [default = 0]; - // Removed below checksum field from Metadata as - // it should be part of send-command which keeps checksum of header + payload - //optional sfixed64 checksum = 10; - // differentiate single and batch message metadata - optional int32 num_messages_in_batch = 11 [default = 1]; - - // the timestamp that this event occurs. it is typically set by applications. - // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. - optional uint64 event_time = 12 [default = 0]; - // Contains encryption key name, encrypted key and metadata to describe the key - repeated EncryptionKeys encryption_keys = 13; - // Algorithm used to encrypt data key - optional string encryption_algo = 14; - // Additional parameters required by encryption - optional bytes encryption_param = 15; - optional bytes schema_version = 16; - - optional bool partition_key_b64_encoded = 17 [ default = false ]; - // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. - optional bytes ordering_key = 18; - - // Mark the message to be delivered at or after the specified timestamp - optional int64 deliver_at_time = 19; - - // Identify whether a message is a "marker" message used for - // internal metadata instead of application published data. - // Markers will generally not be propagated back to clients - optional int32 marker_type = 20; - - // transaction related message info - optional uint64 txnid_least_bits = 22; - optional uint64 txnid_most_bits = 23; - - /// Add highest sequence id to support batch message with external sequence id - optional uint64 highest_sequence_id = 24 [default = 0]; - - // Indicate if the message payload value is set - optional bool null_value = 25 [default = false]; - optional string uuid = 26; - optional int32 num_chunks_from_msg = 27; - optional int32 total_chunk_msg_size = 28; - optional int32 chunk_id = 29; - - // Indicate if the message partition key is set - optional bool null_partition_key = 30 [default = false]; - } - - message SingleMessageMetadata { - repeated KeyValue properties = 1; - optional string partition_key = 2; - required int32 payload_size = 3; - optional bool compacted_out = 4 [default = false]; - - // the timestamp that this event occurs. it is typically set by applications. - // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. - optional uint64 event_time = 5 [default = 0]; - optional bool partition_key_b64_encoded = 6 [ default = false ]; - // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. - optional bytes ordering_key = 7; - // Allows consumer retrieve the sequence id that the producer set. - optional uint64 sequence_id = 8; - // Indicate if the message payload value is set - optional bool null_value = 9 [ default = false ]; - // Indicate if the message partition key is set - optional bool null_partition_key = 10 [ default = false]; - } - - // metadata added for entry from broker - message BrokerEntryMetadata { - optional uint64 broker_timestamp = 1; - optional uint64 index = 2; - } - - enum ServerError { - UnknownError = 0; - MetadataError = 1; // Error with ZK/metadata - PersistenceError = 2; // Error writing reading from BK - AuthenticationError = 3; // Non valid authentication - AuthorizationError = 4; // Not authorized to use resource - - ConsumerBusy = 5; // Unable to subscribe/unsubscribe because - // other consumers are connected - ServiceNotReady = 6; // Any error that requires client retry operation with a fresh lookup - ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded - ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded - ChecksumError = 9; // Error while verifying message checksum - UnsupportedVersionError = 10; // Error when an older client/version doesn't support a required feature - TopicNotFound = 11; // Topic not found - SubscriptionNotFound = 12; // Subscription not found - ConsumerNotFound = 13; // Consumer not found - TooManyRequests = 14; // Error with too many simultaneously request - TopicTerminatedError = 15; // The topic has been terminated - - ProducerBusy = 16; // Producer with same name is already connected - InvalidTopicName = 17; // The topic name is not valid - - IncompatibleSchema = 18; // Specified schema was incompatible with topic schema - ConsumerAssignError = 19; // Dispatcher assign consumer error - - TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error - InvalidTxnStatus = 21; // Invalid txn status error - NotAllowedError = 22; // Not allowed error - - TransactionConflict = 23; // Ack with transaction conflict - TransactionNotFound = 24; // Transaction not found - - ProducerFenced = 25; // When a producer asks and fail to get exclusive producer access, - // or loses the exclusive status after a reconnection, the broker will - // use this error to indicate that this producer is now permanently - // fenced. Applications are now supposed to close it and create a - // new producer - } - - enum AuthMethod { - AuthMethodNone = 0; - AuthMethodYcaV1 = 1; - AuthMethodAthens = 2; - } - - // Each protocol version identify new features that are - // incrementally added to the protocol - enum ProtocolVersion { - v0 = 0; // Initial versioning - v1 = 1; // Added application keep-alive - v2 = 2; // Added RedeliverUnacknowledgedMessages Command - v3 = 3; // Added compression with LZ4 and ZLib - v4 = 4; // Added batch message support - v5 = 5; // Added disconnect client w/o closing connection - v6 = 6; // Added checksum computation for metadata + payload - v7 = 7; // Added CommandLookupTopic - Binary Lookup - v8 = 8; // Added CommandConsumerStats - Client fetches broker side consumer stats - v9 = 9; // Added end of topic notification - v10 = 10;// Added proxy to broker - v11 = 11;// C++ consumers before this version are not correctly handling the checksum field - v12 = 12;// Added get topic's last messageId from broker - // Added CommandActiveConsumerChange - // Added CommandGetTopicsOfNamespace - v13 = 13; // Schema-registry : added avro schema format for json - v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth - // Added Key_Shared subscription - v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse - v16 = 16; // Add support for broker entry metadata - v17 = 17; // Added support ack receipt - v18 = 18; // Add client support for broker entry metadata - v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse - } - - message CommandConnect { - required string client_version = 1; - optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name" instead. - optional string auth_method_name = 5; - optional bytes auth_data = 3; - optional int32 protocol_version = 4 [default = 0]; - - // Client can ask to be proxyied to a specific broker - // This is only honored by a Pulsar proxy - optional string proxy_to_broker_url = 6; - - // Original principal that was verified by - // a Pulsar proxy. In this case the auth info above - // will be the auth of the proxy itself - optional string original_principal = 7; - - // Original auth role and auth Method that was passed - // to the proxy. In this case the auth info above - // will be the auth of the proxy itself - optional string original_auth_data = 8; - optional string original_auth_method = 9; - - // Feature flags - optional FeatureFlags feature_flags = 10; - } - - message FeatureFlags { - optional bool supports_auth_refresh = 1 [default = false]; - optional bool supports_broker_entry_metadata = 2 [default = false]; - optional bool supports_partial_producer = 3 [default = false]; - } - - message CommandConnected { - required string server_version = 1; - optional int32 protocol_version = 2 [default = 0]; - optional int32 max_message_size = 3; - } - - message CommandAuthResponse { - optional string client_version = 1; - optional AuthData response = 2; - optional int32 protocol_version = 3 [default = 0]; - } - - message CommandAuthChallenge { - optional string server_version = 1; - optional AuthData challenge = 2; - optional int32 protocol_version = 3 [default = 0]; - } - - // To support mutual authentication type, such as Sasl, reuse this command to mutual auth. - message AuthData { - optional string auth_method_name = 1; - optional bytes auth_data = 2; - } - - enum KeySharedMode { - AUTO_SPLIT = 0; - STICKY = 1; - } - - message KeySharedMeta { - required KeySharedMode keySharedMode = 1; - repeated IntRange hashRanges = 3; - optional bool allowOutOfOrderDelivery = 4 [default = false]; - } - - message CommandSubscribe { - enum SubType { - Exclusive = 0; - Shared = 1; - Failover = 2; - Key_Shared = 3; - } - required string topic = 1; - required string subscription = 2; - required SubType subType = 3; - - required uint64 consumer_id = 4; - required uint64 request_id = 5; - optional string consumer_name = 6; - optional int32 priority_level = 7; - - // Signal wether the subscription should be backed by a - // durable cursor or not - optional bool durable = 8 [default = true]; - - // If specified, the subscription will position the cursor - // markd-delete position on the particular message id and - // will send messages from that point - optional MessageIdData start_message_id = 9; - - /// Add optional metadata key=value to this consumer - repeated KeyValue metadata = 10; - - optional bool read_compacted = 11; - - optional Schema schema = 12; - enum InitialPosition { - Latest = 0; - Earliest = 1; - } - // Signal whether the subscription will initialize on latest - // or not -- earliest - optional InitialPosition initialPosition = 13 [default = Latest]; - - // Mark the subscription as "replicated". Pulsar will make sure - // to periodically sync the state of replicated subscriptions - // across different clusters (when using geo-replication). - optional bool replicate_subscription_state = 14; - - // If true, the subscribe operation will cause a topic to be - // created if it does not exist already (and if topic auto-creation - // is allowed by broker. - // If false, the subscribe operation will fail if the topic - // does not exist. - optional bool force_topic_creation = 15 [default = true]; - - // If specified, the subscription will reset cursor's position back - // to specified seconds and will send messages from that point - optional uint64 start_message_rollback_duration_sec = 16 [default = 0]; - - optional KeySharedMeta keySharedMeta = 17; - - repeated KeyValue subscription_properties = 18; - - // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch - optional uint64 consumer_epoch = 19; - } - - message CommandPartitionedTopicMetadata { - required string topic = 1; - required uint64 request_id = 2; - // TODO - Remove original_principal, original_auth_data, original_auth_method - // Original principal that was verified by - // a Pulsar proxy. - optional string original_principal = 3; - - // Original auth role and auth Method that was passed - // to the proxy. - optional string original_auth_data = 4; - optional string original_auth_method = 5; - } - - message CommandPartitionedTopicMetadataResponse { - enum LookupType { - Success = 0; - Failed = 1; - } - optional uint32 partitions = 1; // Optional in case of error - required uint64 request_id = 2; - optional LookupType response = 3; - optional ServerError error = 4; - optional string message = 5; - } - - message CommandLookupTopic { - required string topic = 1; - required uint64 request_id = 2; - optional bool authoritative = 3 [default = false]; - - // TODO - Remove original_principal, original_auth_data, original_auth_method - // Original principal that was verified by - // a Pulsar proxy. - optional string original_principal = 4; - - // Original auth role and auth Method that was passed - // to the proxy. - optional string original_auth_data = 5; - optional string original_auth_method = 6; - // - optional string advertised_listener_name = 7; - } - - message CommandLookupTopicResponse { - enum LookupType { - Redirect = 0; - Connect = 1; - Failed = 2; - } - - optional string brokerServiceUrl = 1; // Optional in case of error - optional string brokerServiceUrlTls = 2; - optional LookupType response = 3; - required uint64 request_id = 4; - optional bool authoritative = 5 [default = false]; - optional ServerError error = 6; - optional string message = 7; - - // If it's true, indicates to the client that it must - // always connect through the service url after the - // lookup has been completed. - optional bool proxy_through_service_url = 8 [default = false]; - } - - /// Create a new Producer on a topic, assigning the given producer_id, - /// all messages sent with this producer_id will be persisted on the topic - message CommandProducer { - required string topic = 1; - required uint64 producer_id = 2; - required uint64 request_id = 3; - - /// If a producer name is specified, the name will be used, - /// otherwise the broker will generate a unique name - optional string producer_name = 4; - - optional bool encrypted = 5 [default = false]; - - /// Add optional metadata key=value to this producer - repeated KeyValue metadata = 6; - - optional Schema schema = 7; - - // If producer reconnect to broker, the epoch of this producer will +1 - optional uint64 epoch = 8 [default = 0]; - - // Indicate the name of the producer is generated or user provided - // Use default true here is in order to be forward compatible with the client - optional bool user_provided_producer_name = 9 [default = true]; - - // Require that this producers will be the only producer allowed on the topic - optional ProducerAccessMode producer_access_mode = 10 [default = Shared]; - - // Topic epoch is used to fence off producers that reconnects after a new - // exclusive producer has already taken over. This id is assigned by the - // broker on the CommandProducerSuccess. The first time, the client will - // leave it empty and then it will always carry the same epoch number on - // the subsequent reconnections. - optional uint64 topic_epoch = 11; - - optional bool txn_enabled = 12 [default = false]; - - // Name of the initial subscription of the topic. - // If this field is not set, the initial subscription will not be created. - // If this field is set but the broker's `allowAutoSubscriptionCreation` - // is disabled, the producer will fail to be created. - optional string initial_subscription_name = 13; - } - - message CommandSend { - required uint64 producer_id = 1; - required uint64 sequence_id = 2; - optional int32 num_messages = 3 [default = 1]; - optional uint64 txnid_least_bits = 4 [default = 0]; - optional uint64 txnid_most_bits = 5 [default = 0]; - - /// Add highest sequence id to support batch message with external sequence id - optional uint64 highest_sequence_id = 6 [default = 0]; - optional bool is_chunk =7 [default = false]; - - // Specify if the message being published is a Pulsar marker or not - optional bool marker = 8 [default = false]; - } - - message CommandSendReceipt { - required uint64 producer_id = 1; - required uint64 sequence_id = 2; - optional MessageIdData message_id = 3; - optional uint64 highest_sequence_id = 4 [default = 0]; - } - - message CommandSendError { - required uint64 producer_id = 1; - required uint64 sequence_id = 2; - required ServerError error = 3; - required string message = 4; - } - - message CommandMessage { - required uint64 consumer_id = 1; - required MessageIdData message_id = 2; - optional uint32 redelivery_count = 3 [default = 0]; - repeated int64 ack_set = 4; - optional uint64 consumer_epoch = 5; - } - - message CommandAck { - enum AckType { - Individual = 0; - Cumulative = 1; - } - - required uint64 consumer_id = 1; - required AckType ack_type = 2; - - // In case of individual acks, the client can pass a list of message ids - repeated MessageIdData message_id = 3; - - // Acks can contain a flag to indicate the consumer - // received an invalid message that got discarded - // before being passed on to the application. - enum ValidationError { - UncompressedSizeCorruption = 0; - DecompressionError = 1; - ChecksumMismatch = 2; - BatchDeSerializeError = 3; - DecryptionError = 4; - } - - optional ValidationError validation_error = 4; - repeated KeyLongValue properties = 5; - - optional uint64 txnid_least_bits = 6 [default = 0]; - optional uint64 txnid_most_bits = 7 [default = 0]; - optional uint64 request_id = 8; - } - - message CommandAckResponse { - required uint64 consumer_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional ServerError error = 4; - optional string message = 5; - optional uint64 request_id = 6; - } - - // changes on active consumer - message CommandActiveConsumerChange { - required uint64 consumer_id = 1; - optional bool is_active = 2 [default = false]; - } - - message CommandFlow { - required uint64 consumer_id = 1; - - // Max number of messages to prefetch, in addition - // of any number previously specified - required uint32 messagePermits = 2; - } - - message CommandUnsubscribe { - required uint64 consumer_id = 1; - required uint64 request_id = 2; - } - - // Reset an existing consumer to a particular message id - message CommandSeek { - required uint64 consumer_id = 1; - required uint64 request_id = 2; - - optional MessageIdData message_id = 3; - optional uint64 message_publish_time = 4; - } - - // Message sent by broker to client when a topic - // has been forcefully terminated and there are no more - // messages left to consume - message CommandReachedEndOfTopic { - required uint64 consumer_id = 1; - } - - message CommandCloseProducer { - required uint64 producer_id = 1; - required uint64 request_id = 2; - } - - message CommandCloseConsumer { - required uint64 consumer_id = 1; - required uint64 request_id = 2; - } - - message CommandRedeliverUnacknowledgedMessages { - required uint64 consumer_id = 1; - repeated MessageIdData message_ids = 2; - optional uint64 consumer_epoch = 3; - } - - message CommandSuccess { - required uint64 request_id = 1; - optional Schema schema = 2; - } - - /// Response from CommandProducer - message CommandProducerSuccess { - required uint64 request_id = 1; - required string producer_name = 2; - - // The last sequence id that was stored by this producer in the previous session - // This will only be meaningful if deduplication has been enabled. - optional int64 last_sequence_id = 3 [default = -1]; - optional bytes schema_version = 4; - - // The topic epoch assigned by the broker. This field will only be set if we - // were requiring exclusive access when creating the producer. - optional uint64 topic_epoch = 5; - - // If producer is not "ready", the client will avoid to timeout the request - // for creating the producer. Instead it will wait indefinitely until it gets - // a subsequent `CommandProducerSuccess` with `producer_ready==true`. - optional bool producer_ready = 6 [default = true]; - } - - message CommandError { - required uint64 request_id = 1; - required ServerError error = 2; - required string message = 3; - } - - // Commands to probe the state of connection. - // When either client or broker doesn't receive commands for certain - // amount of time, they will send a Ping probe. - message CommandPing { - } - message CommandPong { - } - - message CommandConsumerStats { - required uint64 request_id = 1; - // required string topic_name = 2; - // required string subscription_name = 3; - required uint64 consumer_id = 4; - } - - message CommandConsumerStatsResponse { - required uint64 request_id = 1; - optional ServerError error_code = 2; - optional string error_message = 3; - - /// Total rate of messages delivered to the consumer. msg/s - optional double msgRateOut = 4; - - /// Total throughput delivered to the consumer. bytes/s - optional double msgThroughputOut = 5; - - /// Total rate of messages redelivered by this consumer. msg/s - optional double msgRateRedeliver = 6; - - /// Name of the consumer - optional string consumerName = 7; - - /// Number of available message permits for the consumer - optional uint64 availablePermits = 8; - - /// Number of unacknowledged messages for the consumer - optional uint64 unackedMessages = 9; - - /// Flag to verify if consumer is blocked due to reaching threshold of unacked messages - optional bool blockedConsumerOnUnackedMsgs = 10; - - /// Address of this consumer - optional string address = 11; - - /// Timestamp of connection - optional string connectedSince = 12; - - /// Whether this subscription is Exclusive or Shared or Failover - optional string type = 13; - - /// Total rate of messages expired on this subscription. msg/s - optional double msgRateExpired = 14; - - /// Number of messages in the subscription backlog - optional uint64 msgBacklog = 15; - - /// Total rate of messages ack. msg/s - optional double messageAckRate = 16; - } - - message CommandGetLastMessageId { - required uint64 consumer_id = 1; - required uint64 request_id = 2; - } - - message CommandGetLastMessageIdResponse { - required MessageIdData last_message_id = 1; - required uint64 request_id = 2; - optional MessageIdData consumer_mark_delete_position = 3; - } - - message CommandGetTopicsOfNamespace { - enum Mode { - PERSISTENT = 0; - NON_PERSISTENT = 1; - ALL = 2; - } - required uint64 request_id = 1; - required string namespace = 2; - optional Mode mode = 3 [default = PERSISTENT]; - optional string topics_pattern = 4; - optional string topics_hash = 5; - } - - message CommandGetTopicsOfNamespaceResponse { - required uint64 request_id = 1; - repeated string topics = 2; - // true iff the topic list was filtered by the pattern supplied by the client - optional bool filtered = 3 [default = false]; - // hash computed from the names of matching topics - optional string topics_hash = 4; - // if false, topics is empty and the list of matching topics has not changed - optional bool changed = 5 [default = true]; - } - - message CommandGetSchema { - required uint64 request_id = 1; - required string topic = 2; - - optional bytes schema_version = 3; - } - - message CommandGetSchemaResponse { - required uint64 request_id = 1; - optional ServerError error_code = 2; - optional string error_message = 3; - - optional Schema schema = 4; - optional bytes schema_version = 5; - } - - message CommandGetOrCreateSchema { - required uint64 request_id = 1; - required string topic = 2; - required Schema schema = 3; - } - - message CommandGetOrCreateSchemaResponse { - required uint64 request_id = 1; - optional ServerError error_code = 2; - optional string error_message = 3; - - optional bytes schema_version = 4; - } - - /// --- transaction related --- - - enum TxnAction { - COMMIT = 0; - ABORT = 1; - } - - message CommandTcClientConnectRequest { - required uint64 request_id = 1; - required uint64 tc_id = 2 [default = 0]; - } - - message CommandTcClientConnectResponse { - required uint64 request_id = 1; - optional ServerError error = 2; - optional string message = 3; - } - - message CommandNewTxn { - required uint64 request_id = 1; - optional uint64 txn_ttl_seconds = 2 [default = 0]; - optional uint64 tc_id = 3 [default = 0]; - } - - message CommandNewTxnResponse { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional ServerError error = 4; - optional string message = 5; - } - - message CommandAddPartitionToTxn { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - repeated string partitions = 4; - } - - message CommandAddPartitionToTxnResponse { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional ServerError error = 4; - optional string message = 5; - } - - message Subscription { - required string topic = 1; - required string subscription = 2; - } - message CommandAddSubscriptionToTxn { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - repeated Subscription subscription = 4; - } - - message CommandAddSubscriptionToTxnResponse { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional ServerError error = 4; - optional string message = 5; - } - - message CommandEndTxn { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional TxnAction txn_action = 4; - } - - message CommandEndTxnResponse { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional ServerError error = 4; - optional string message = 5; - } - - message CommandEndTxnOnPartition { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional string topic = 4; - optional TxnAction txn_action = 5; - optional uint64 txnid_least_bits_of_low_watermark = 6; - } - - message CommandEndTxnOnPartitionResponse { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional ServerError error = 4; - optional string message = 5; - } - - message CommandEndTxnOnSubscription { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional Subscription subscription= 4; - optional TxnAction txn_action = 5; - optional uint64 txnid_least_bits_of_low_watermark = 6; - } - - message CommandEndTxnOnSubscriptionResponse { - required uint64 request_id = 1; - optional uint64 txnid_least_bits = 2 [default = 0]; - optional uint64 txnid_most_bits = 3 [default = 0]; - optional ServerError error = 4; - optional string message = 5; - } - - message BaseCommand { - enum Type { - CONNECT = 2; - CONNECTED = 3; - SUBSCRIBE = 4; - - PRODUCER = 5; - - SEND = 6; - SEND_RECEIPT= 7; - SEND_ERROR = 8; - - MESSAGE = 9; - ACK = 10; - FLOW = 11; - - UNSUBSCRIBE = 12; - - SUCCESS = 13; - ERROR = 14; - - CLOSE_PRODUCER = 15; - CLOSE_CONSUMER = 16; - - PRODUCER_SUCCESS = 17; - - PING = 18; - PONG = 19; - - REDELIVER_UNACKNOWLEDGED_MESSAGES = 20; - - PARTITIONED_METADATA = 21; - PARTITIONED_METADATA_RESPONSE = 22; - - LOOKUP = 23; - LOOKUP_RESPONSE = 24; - - CONSUMER_STATS = 25; - CONSUMER_STATS_RESPONSE = 26; - - REACHED_END_OF_TOPIC = 27; - - SEEK = 28; - - GET_LAST_MESSAGE_ID = 29; - GET_LAST_MESSAGE_ID_RESPONSE = 30; - - ACTIVE_CONSUMER_CHANGE = 31; - - - GET_TOPICS_OF_NAMESPACE = 32; - GET_TOPICS_OF_NAMESPACE_RESPONSE = 33; - - GET_SCHEMA = 34; - GET_SCHEMA_RESPONSE = 35; - - AUTH_CHALLENGE = 36; - AUTH_RESPONSE = 37; - - ACK_RESPONSE = 38; - - GET_OR_CREATE_SCHEMA = 39; - GET_OR_CREATE_SCHEMA_RESPONSE = 40; - - // transaction related - NEW_TXN = 50; - NEW_TXN_RESPONSE = 51; - - ADD_PARTITION_TO_TXN = 52; - ADD_PARTITION_TO_TXN_RESPONSE = 53; - - ADD_SUBSCRIPTION_TO_TXN = 54; - ADD_SUBSCRIPTION_TO_TXN_RESPONSE = 55; - - END_TXN = 56; - END_TXN_RESPONSE = 57; - - END_TXN_ON_PARTITION = 58; - END_TXN_ON_PARTITION_RESPONSE = 59; - - END_TXN_ON_SUBSCRIPTION = 60; - END_TXN_ON_SUBSCRIPTION_RESPONSE = 61; - TC_CLIENT_CONNECT_REQUEST = 62; - TC_CLIENT_CONNECT_RESPONSE = 63; - - } - - - required Type type = 1; - - optional CommandConnect connect = 2; - optional CommandConnected connected = 3; - - optional CommandSubscribe subscribe = 4; - optional CommandProducer producer = 5; - optional CommandSend send = 6; - optional CommandSendReceipt send_receipt = 7; - optional CommandSendError send_error = 8; - optional CommandMessage message = 9; - optional CommandAck ack = 10; - optional CommandFlow flow = 11; - optional CommandUnsubscribe unsubscribe = 12; - - optional CommandSuccess success = 13; - optional CommandError error = 14; - - optional CommandCloseProducer close_producer = 15; - optional CommandCloseConsumer close_consumer = 16; - - optional CommandProducerSuccess producer_success = 17; - optional CommandPing ping = 18; - optional CommandPong pong = 19; - optional CommandRedeliverUnacknowledgedMessages redeliverUnacknowledgedMessages = 20; - - optional CommandPartitionedTopicMetadata partitionMetadata = 21; - optional CommandPartitionedTopicMetadataResponse partitionMetadataResponse = 22; - - optional CommandLookupTopic lookupTopic = 23; - optional CommandLookupTopicResponse lookupTopicResponse = 24; - - optional CommandConsumerStats consumerStats = 25; - optional CommandConsumerStatsResponse consumerStatsResponse = 26; - - optional CommandReachedEndOfTopic reachedEndOfTopic = 27; - - optional CommandSeek seek = 28; - - optional CommandGetLastMessageId getLastMessageId = 29; - optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; - - optional CommandActiveConsumerChange active_consumer_change = 31; - - optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 32; - optional CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 33; - - optional CommandGetSchema getSchema = 34; - optional CommandGetSchemaResponse getSchemaResponse = 35; - - optional CommandAuthChallenge authChallenge = 36; - optional CommandAuthResponse authResponse = 37; - - optional CommandAckResponse ackResponse = 38; - - optional CommandGetOrCreateSchema getOrCreateSchema = 39; - optional CommandGetOrCreateSchemaResponse getOrCreateSchemaResponse = 40; - - // transaction related - optional CommandNewTxn newTxn = 50; - optional CommandNewTxnResponse newTxnResponse = 51; - optional CommandAddPartitionToTxn addPartitionToTxn= 52; - optional CommandAddPartitionToTxnResponse addPartitionToTxnResponse = 53; - optional CommandAddSubscriptionToTxn addSubscriptionToTxn = 54; - optional CommandAddSubscriptionToTxnResponse addSubscriptionToTxnResponse = 55; - optional CommandEndTxn endTxn = 56; - optional CommandEndTxnResponse endTxnResponse = 57; - optional CommandEndTxnOnPartition endTxnOnPartition = 58; - optional CommandEndTxnOnPartitionResponse endTxnOnPartitionResponse = 59; - optional CommandEndTxnOnSubscription endTxnOnSubscription = 60; - optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse = 61; - optional CommandTcClientConnectRequest tcClientConnectRequest = 62; - optional CommandTcClientConnectResponse tcClientConnectResponse = 63; - } \ No newline at end of file +syntax = "proto2"; + +package pulsar.proto; +option java_package = "org.apache.pulsar.common.api.proto"; +option optimize_for = LITE_RUNTIME; + +message Schema { + enum Type { + None = 0; + String = 1; + Json = 2; + Protobuf = 3; + Avro = 4; + Bool = 5; + Int8 = 6; + Int16 = 7; + Int32 = 8; + Int64 = 9; + Float = 10; + Double = 11; + Date = 12; + Time = 13; + Timestamp = 14; + KeyValue = 15; + Instant = 16; + LocalDate = 17; + LocalTime = 18; + LocalDateTime = 19; + ProtobufNative = 20; + } + + required string name = 1; + required bytes schema_data = 3; + required Type type = 4; + repeated KeyValue properties = 5; + +} + +message MessageIdData { + required uint64 ledgerId = 1; + required uint64 entryId = 2; + optional int32 partition = 3 [default = -1]; + optional int32 batch_index = 4 [default = -1]; + repeated int64 ack_set = 5; + optional int32 batch_size = 6; + + // For the chunk message id, we need to specify the first chunk message id. + optional MessageIdData first_chunk_message_id = 7; +} + +message KeyValue { + required string key = 1; + required string value = 2; +} + +message KeyLongValue { + required string key = 1; + required uint64 value = 2; +} + +message IntRange { + required int32 start = 1; + required int32 end = 2; +} + +message EncryptionKeys { + required string key = 1; + required bytes value = 2; + repeated KeyValue metadata = 3; +} + +enum CompressionType { + NONE = 0; + LZ4 = 1; + ZLIB = 2; + ZSTD = 3; + SNAPPY = 4; +} + +enum ProducerAccessMode { + Shared = 0; // By default multiple producers can publish on a topic + Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected. + WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access + ExclusiveWithFencing = 3; // Require exclusive access for producer. Fence out old producer. +} + +message MessageMetadata { + required string producer_name = 1; + required uint64 sequence_id = 2; + required uint64 publish_time = 3; + repeated KeyValue properties = 4; + + // Property set on replicated message, + // includes the source cluster name + optional string replicated_from = 5; + //key to decide partition for the msg + optional string partition_key = 6; + // Override namespace's replication + repeated string replicate_to = 7; + optional CompressionType compression = 8 [default = NONE]; + optional uint32 uncompressed_size = 9 [default = 0]; + // Removed below checksum field from Metadata as + // it should be part of send-command which keeps checksum of header + payload + //optional sfixed64 checksum = 10; + // differentiate single and batch message metadata + optional int32 num_messages_in_batch = 11 [default = 1]; + + // the timestamp that this event occurs. it is typically set by applications. + // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. + optional uint64 event_time = 12 [default = 0]; + // Contains encryption key name, encrypted key and metadata to describe the key + repeated EncryptionKeys encryption_keys = 13; + // Algorithm used to encrypt data key + optional string encryption_algo = 14; + // Additional parameters required by encryption + optional bytes encryption_param = 15; + optional bytes schema_version = 16; + + optional bool partition_key_b64_encoded = 17 [default = false]; + // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. + optional bytes ordering_key = 18; + + // Mark the message to be delivered at or after the specified timestamp + optional int64 deliver_at_time = 19; + + // Identify whether a message is a "marker" message used for + // internal metadata instead of application published data. + // Markers will generally not be propagated back to clients + optional int32 marker_type = 20; + + // transaction related message info + optional uint64 txnid_least_bits = 22; + optional uint64 txnid_most_bits = 23; + + /// Add highest sequence id to support batch message with external sequence id + optional uint64 highest_sequence_id = 24 [default = 0]; + + // Indicate if the message payload value is set + optional bool null_value = 25 [default = false]; + optional string uuid = 26; + optional int32 num_chunks_from_msg = 27; + optional int32 total_chunk_msg_size = 28; + optional int32 chunk_id = 29; + + // Indicate if the message partition key is set + optional bool null_partition_key = 30 [default = false]; +} + +message SingleMessageMetadata { + repeated KeyValue properties = 1; + optional string partition_key = 2; + required int32 payload_size = 3; + optional bool compacted_out = 4 [default = false]; + + // the timestamp that this event occurs. it is typically set by applications. + // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. + optional uint64 event_time = 5 [default = 0]; + optional bool partition_key_b64_encoded = 6 [default = false]; + // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. + optional bytes ordering_key = 7; + // Allows consumer retrieve the sequence id that the producer set. + optional uint64 sequence_id = 8; + // Indicate if the message payload value is set + optional bool null_value = 9 [default = false]; + // Indicate if the message partition key is set + optional bool null_partition_key = 10 [default = false]; +} + +// metadata added for entry from broker +message BrokerEntryMetadata { + optional uint64 broker_timestamp = 1; + optional uint64 index = 2; +} + +enum ServerError { + UnknownError = 0; + MetadataError = 1; // Error with ZK/metadata + PersistenceError = 2; // Error writing reading from BK + AuthenticationError = 3; // Non valid authentication + AuthorizationError = 4; // Not authorized to use resource + + ConsumerBusy = 5; // Unable to subscribe/unsubscribe because + // other consumers are connected + ServiceNotReady = 6; // Any error that requires client retry operation with a fresh lookup + ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded + ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded + ChecksumError = 9; // Error while verifying message checksum + UnsupportedVersionError = 10; // Error when an older client/version doesn't support a required feature + TopicNotFound = 11; // Topic not found + SubscriptionNotFound = 12; // Subscription not found + ConsumerNotFound = 13; // Consumer not found + TooManyRequests = 14; // Error with too many simultaneously request + TopicTerminatedError = 15; // The topic has been terminated + + ProducerBusy = 16; // Producer with same name is already connected + InvalidTopicName = 17; // The topic name is not valid + + IncompatibleSchema = 18; // Specified schema was incompatible with topic schema + ConsumerAssignError = 19; // Dispatcher assign consumer error + + TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error + InvalidTxnStatus = 21; // Invalid txn status error + NotAllowedError = 22; // Not allowed error + + TransactionConflict = 23; // Ack with transaction conflict + TransactionNotFound = 24; // Transaction not found + + ProducerFenced = 25; // When a producer asks and fail to get exclusive producer access, + // or loses the exclusive status after a reconnection, the broker will + // use this error to indicate that this producer is now permanently + // fenced. Applications are now supposed to close it and create a + // new producer +} + +enum AuthMethod { + AuthMethodNone = 0; + AuthMethodYcaV1 = 1; + AuthMethodAthens = 2; +} + +// Each protocol version identify new features that are +// incrementally added to the protocol +enum ProtocolVersion { + v0 = 0; // Initial versioning + v1 = 1; // Added application keep-alive + v2 = 2; // Added RedeliverUnacknowledgedMessages Command + v3 = 3; // Added compression with LZ4 and ZLib + v4 = 4; // Added batch message support + v5 = 5; // Added disconnect client w/o closing connection + v6 = 6; // Added checksum computation for metadata + payload + v7 = 7; // Added CommandLookupTopic - Binary Lookup + v8 = 8; // Added CommandConsumerStats - Client fetches broker side consumer stats + v9 = 9; // Added end of topic notification + v10 = 10;// Added proxy to broker + v11 = 11;// C++ consumers before this version are not correctly handling the checksum field + v12 = 12;// Added get topic's last messageId from broker + // Added CommandActiveConsumerChange + // Added CommandGetTopicsOfNamespace + v13 = 13; // Schema-registry : added avro schema format for json + v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth + // Added Key_Shared subscription + v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse + v16 = 16; // Add support for broker entry metadata + v17 = 17; // Added support ack receipt + v18 = 18; // Add client support for broker entry metadata + v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse +} + +message CommandConnect { + required string client_version = 1; + optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name" instead. + optional string auth_method_name = 5; + optional bytes auth_data = 3; + optional int32 protocol_version = 4 [default = 0]; + + // Client can ask to be proxyied to a specific broker + // This is only honored by a Pulsar proxy + optional string proxy_to_broker_url = 6; + + // Original principal that was verified by + // a Pulsar proxy. In this case the auth info above + // will be the auth of the proxy itself + optional string original_principal = 7; + + // Original auth role and auth Method that was passed + // to the proxy. In this case the auth info above + // will be the auth of the proxy itself + optional string original_auth_data = 8; + optional string original_auth_method = 9; + + // Feature flags + optional FeatureFlags feature_flags = 10; +} + +message FeatureFlags { + optional bool supports_auth_refresh = 1 [default = false]; + optional bool supports_broker_entry_metadata = 2 [default = false]; + optional bool supports_partial_producer = 3 [default = false]; +} + +message CommandConnected { + required string server_version = 1; + optional int32 protocol_version = 2 [default = 0]; + optional int32 max_message_size = 3; +} + +message CommandAuthResponse { + optional string client_version = 1; + optional AuthData response = 2; + optional int32 protocol_version = 3 [default = 0]; +} + +message CommandAuthChallenge { + optional string server_version = 1; + optional AuthData challenge = 2; + optional int32 protocol_version = 3 [default = 0]; +} + +// To support mutual authentication type, such as Sasl, reuse this command to mutual auth. +message AuthData { + optional string auth_method_name = 1; + optional bytes auth_data = 2; +} + +enum KeySharedMode { + AUTO_SPLIT = 0; + STICKY = 1; +} + +message KeySharedMeta { + required KeySharedMode keySharedMode = 1; + repeated IntRange hashRanges = 3; + optional bool allowOutOfOrderDelivery = 4 [default = false]; +} + +message CommandSubscribe { + enum SubType { + Exclusive = 0; + Shared = 1; + Failover = 2; + Key_Shared = 3; + } + required string topic = 1; + required string subscription = 2; + required SubType subType = 3; + + required uint64 consumer_id = 4; + required uint64 request_id = 5; + optional string consumer_name = 6; + optional int32 priority_level = 7; + + // Signal wether the subscription should be backed by a + // durable cursor or not + optional bool durable = 8 [default = true]; + + // If specified, the subscription will position the cursor + // markd-delete position on the particular message id and + // will send messages from that point + optional MessageIdData start_message_id = 9; + + /// Add optional metadata key=value to this consumer + repeated KeyValue metadata = 10; + + optional bool read_compacted = 11; + + optional Schema schema = 12; + enum InitialPosition { + Latest = 0; + Earliest = 1; + } + // Signal whether the subscription will initialize on latest + // or not -- earliest + optional InitialPosition initialPosition = 13 [default = Latest]; + + // Mark the subscription as "replicated". Pulsar will make sure + // to periodically sync the state of replicated subscriptions + // across different clusters (when using geo-replication). + optional bool replicate_subscription_state = 14; + + // If true, the subscribe operation will cause a topic to be + // created if it does not exist already (and if topic auto-creation + // is allowed by broker. + // If false, the subscribe operation will fail if the topic + // does not exist. + optional bool force_topic_creation = 15 [default = true]; + + // If specified, the subscription will reset cursor's position back + // to specified seconds and will send messages from that point + optional uint64 start_message_rollback_duration_sec = 16 [default = 0]; + + optional KeySharedMeta keySharedMeta = 17; + + repeated KeyValue subscription_properties = 18; + + // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch + optional uint64 consumer_epoch = 19; +} + +message CommandPartitionedTopicMetadata { + required string topic = 1; + required uint64 request_id = 2; + // TODO - Remove original_principal, original_auth_data, original_auth_method + // Original principal that was verified by + // a Pulsar proxy. + optional string original_principal = 3; + + // Original auth role and auth Method that was passed + // to the proxy. + optional string original_auth_data = 4; + optional string original_auth_method = 5; +} + +message CommandPartitionedTopicMetadataResponse { + enum LookupType { + Success = 0; + Failed = 1; + } + optional uint32 partitions = 1; // Optional in case of error + required uint64 request_id = 2; + optional LookupType response = 3; + optional ServerError error = 4; + optional string message = 5; +} + +message CommandLookupTopic { + required string topic = 1; + required uint64 request_id = 2; + optional bool authoritative = 3 [default = false]; + + // TODO - Remove original_principal, original_auth_data, original_auth_method + // Original principal that was verified by + // a Pulsar proxy. + optional string original_principal = 4; + + // Original auth role and auth Method that was passed + // to the proxy. + optional string original_auth_data = 5; + optional string original_auth_method = 6; + // + optional string advertised_listener_name = 7; +} + +message CommandLookupTopicResponse { + enum LookupType { + Redirect = 0; + Connect = 1; + Failed = 2; + } + + optional string brokerServiceUrl = 1; // Optional in case of error + optional string brokerServiceUrlTls = 2; + optional LookupType response = 3; + required uint64 request_id = 4; + optional bool authoritative = 5 [default = false]; + optional ServerError error = 6; + optional string message = 7; + + // If it's true, indicates to the client that it must + // always connect through the service url after the + // lookup has been completed. + optional bool proxy_through_service_url = 8 [default = false]; +} + +/// Create a new Producer on a topic, assigning the given producer_id, +/// all messages sent with this producer_id will be persisted on the topic +message CommandProducer { + required string topic = 1; + required uint64 producer_id = 2; + required uint64 request_id = 3; + + /// If a producer name is specified, the name will be used, + /// otherwise the broker will generate a unique name + optional string producer_name = 4; + + optional bool encrypted = 5 [default = false]; + + /// Add optional metadata key=value to this producer + repeated KeyValue metadata = 6; + + optional Schema schema = 7; + + // If producer reconnect to broker, the epoch of this producer will +1 + optional uint64 epoch = 8 [default = 0]; + + // Indicate the name of the producer is generated or user provided + // Use default true here is in order to be forward compatible with the client + optional bool user_provided_producer_name = 9 [default = true]; + + // Require that this producers will be the only producer allowed on the topic + optional ProducerAccessMode producer_access_mode = 10 [default = Shared]; + + // Topic epoch is used to fence off producers that reconnects after a new + // exclusive producer has already taken over. This id is assigned by the + // broker on the CommandProducerSuccess. The first time, the client will + // leave it empty and then it will always carry the same epoch number on + // the subsequent reconnections. + optional uint64 topic_epoch = 11; + + optional bool txn_enabled = 12 [default = false]; + + // Name of the initial subscription of the topic. + // If this field is not set, the initial subscription will not be created. + // If this field is set but the broker's `allowAutoSubscriptionCreation` + // is disabled, the producer will fail to be created. + optional string initial_subscription_name = 13; +} + +message CommandSend { + required uint64 producer_id = 1; + required uint64 sequence_id = 2; + optional int32 num_messages = 3 [default = 1]; + optional uint64 txnid_least_bits = 4 [default = 0]; + optional uint64 txnid_most_bits = 5 [default = 0]; + + /// Add highest sequence id to support batch message with external sequence id + optional uint64 highest_sequence_id = 6 [default = 0]; + optional bool is_chunk = 7 [default = false]; + + // Specify if the message being published is a Pulsar marker or not + optional bool marker = 8 [default = false]; +} + +message CommandSendReceipt { + required uint64 producer_id = 1; + required uint64 sequence_id = 2; + optional MessageIdData message_id = 3; + optional uint64 highest_sequence_id = 4 [default = 0]; +} + +message CommandSendError { + required uint64 producer_id = 1; + required uint64 sequence_id = 2; + required ServerError error = 3; + required string message = 4; +} + +message CommandMessage { + required uint64 consumer_id = 1; + required MessageIdData message_id = 2; + optional uint32 redelivery_count = 3 [default = 0]; + repeated int64 ack_set = 4; + optional uint64 consumer_epoch = 5; +} + +message CommandAck { + enum AckType { + Individual = 0; + Cumulative = 1; + } + + required uint64 consumer_id = 1; + required AckType ack_type = 2; + + // In case of individual acks, the client can pass a list of message ids + repeated MessageIdData message_id = 3; + + // Acks can contain a flag to indicate the consumer + // received an invalid message that got discarded + // before being passed on to the application. + enum ValidationError { + UncompressedSizeCorruption = 0; + DecompressionError = 1; + ChecksumMismatch = 2; + BatchDeSerializeError = 3; + DecryptionError = 4; + } + + optional ValidationError validation_error = 4; + repeated KeyLongValue properties = 5; + + optional uint64 txnid_least_bits = 6 [default = 0]; + optional uint64 txnid_most_bits = 7 [default = 0]; + optional uint64 request_id = 8; +} + +message CommandAckResponse { + required uint64 consumer_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional ServerError error = 4; + optional string message = 5; + optional uint64 request_id = 6; +} + +// changes on active consumer +message CommandActiveConsumerChange { + required uint64 consumer_id = 1; + optional bool is_active = 2 [default = false]; +} + +message CommandFlow { + required uint64 consumer_id = 1; + + // Max number of messages to prefetch, in addition + // of any number previously specified + required uint32 messagePermits = 2; +} + +message CommandUnsubscribe { + required uint64 consumer_id = 1; + required uint64 request_id = 2; +} + +// Reset an existing consumer to a particular message id +message CommandSeek { + required uint64 consumer_id = 1; + required uint64 request_id = 2; + + optional MessageIdData message_id = 3; + optional uint64 message_publish_time = 4; +} + +// Message sent by broker to client when a topic +// has been forcefully terminated and there are no more +// messages left to consume +message CommandReachedEndOfTopic { + required uint64 consumer_id = 1; +} + +message CommandCloseProducer { + required uint64 producer_id = 1; + required uint64 request_id = 2; +} + +message CommandCloseConsumer { + required uint64 consumer_id = 1; + required uint64 request_id = 2; +} + +message CommandRedeliverUnacknowledgedMessages { + required uint64 consumer_id = 1; + repeated MessageIdData message_ids = 2; + optional uint64 consumer_epoch = 3; +} + +message CommandSuccess { + required uint64 request_id = 1; + optional Schema schema = 2; +} + +/// Response from CommandProducer +message CommandProducerSuccess { + required uint64 request_id = 1; + required string producer_name = 2; + + // The last sequence id that was stored by this producer in the previous session + // This will only be meaningful if deduplication has been enabled. + optional int64 last_sequence_id = 3 [default = -1]; + optional bytes schema_version = 4; + + // The topic epoch assigned by the broker. This field will only be set if we + // were requiring exclusive access when creating the producer. + optional uint64 topic_epoch = 5; + + // If producer is not "ready", the client will avoid to timeout the request + // for creating the producer. Instead it will wait indefinitely until it gets + // a subsequent `CommandProducerSuccess` with `producer_ready==true`. + optional bool producer_ready = 6 [default = true]; +} + +message CommandError { + required uint64 request_id = 1; + required ServerError error = 2; + required string message = 3; +} + +// Commands to probe the state of connection. +// When either client or broker doesn't receive commands for certain +// amount of time, they will send a Ping probe. +message CommandPing { +} +message CommandPong { +} + +message CommandConsumerStats { + required uint64 request_id = 1; + // required string topic_name = 2; + // required string subscription_name = 3; + required uint64 consumer_id = 4; +} + +message CommandConsumerStatsResponse { + required uint64 request_id = 1; + optional ServerError error_code = 2; + optional string error_message = 3; + + /// Total rate of messages delivered to the consumer. msg/s + optional double msgRateOut = 4; + + /// Total throughput delivered to the consumer. bytes/s + optional double msgThroughputOut = 5; + + /// Total rate of messages redelivered by this consumer. msg/s + optional double msgRateRedeliver = 6; + + /// Name of the consumer + optional string consumerName = 7; + + /// Number of available message permits for the consumer + optional uint64 availablePermits = 8; + + /// Number of unacknowledged messages for the consumer + optional uint64 unackedMessages = 9; + + /// Flag to verify if consumer is blocked due to reaching threshold of unacked messages + optional bool blockedConsumerOnUnackedMsgs = 10; + + /// Address of this consumer + optional string address = 11; + + /// Timestamp of connection + optional string connectedSince = 12; + + /// Whether this subscription is Exclusive or Shared or Failover + optional string type = 13; + + /// Total rate of messages expired on this subscription. msg/s + optional double msgRateExpired = 14; + + /// Number of messages in the subscription backlog + optional uint64 msgBacklog = 15; + + /// Total rate of messages ack. msg/s + optional double messageAckRate = 16; +} + +message CommandGetLastMessageId { + required uint64 consumer_id = 1; + required uint64 request_id = 2; +} + +message CommandGetLastMessageIdResponse { + required MessageIdData last_message_id = 1; + required uint64 request_id = 2; + optional MessageIdData consumer_mark_delete_position = 3; +} + +message CommandGetTopicsOfNamespace { + enum Mode { + PERSISTENT = 0; + NON_PERSISTENT = 1; + ALL = 2; + } + required uint64 request_id = 1; + required string namespace = 2; + optional Mode mode = 3 [default = PERSISTENT]; + optional string topics_pattern = 4; + optional string topics_hash = 5; +} + +message CommandGetTopicsOfNamespaceResponse { + required uint64 request_id = 1; + repeated string topics = 2; + // true iff the topic list was filtered by the pattern supplied by the client + optional bool filtered = 3 [default = false]; + // hash computed from the names of matching topics + optional string topics_hash = 4; + // if false, topics is empty and the list of matching topics has not changed + optional bool changed = 5 [default = true]; +} + +message CommandGetSchema { + required uint64 request_id = 1; + required string topic = 2; + + optional bytes schema_version = 3; +} + +message CommandGetSchemaResponse { + required uint64 request_id = 1; + optional ServerError error_code = 2; + optional string error_message = 3; + + optional Schema schema = 4; + optional bytes schema_version = 5; +} + +message CommandGetOrCreateSchema { + required uint64 request_id = 1; + required string topic = 2; + required Schema schema = 3; +} + +message CommandGetOrCreateSchemaResponse { + required uint64 request_id = 1; + optional ServerError error_code = 2; + optional string error_message = 3; + + optional bytes schema_version = 4; +} + +/// --- transaction related --- + +enum TxnAction { + COMMIT = 0; + ABORT = 1; +} + +message CommandTcClientConnectRequest { + required uint64 request_id = 1; + required uint64 tc_id = 2 [default = 0]; +} + +message CommandTcClientConnectResponse { + required uint64 request_id = 1; + optional ServerError error = 2; + optional string message = 3; +} + +message CommandNewTxn { + required uint64 request_id = 1; + optional uint64 txn_ttl_seconds = 2 [default = 0]; + optional uint64 tc_id = 3 [default = 0]; +} + +message CommandNewTxnResponse { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional ServerError error = 4; + optional string message = 5; +} + +message CommandAddPartitionToTxn { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + repeated string partitions = 4; +} + +message CommandAddPartitionToTxnResponse { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional ServerError error = 4; + optional string message = 5; +} + +message Subscription { + required string topic = 1; + required string subscription = 2; +} +message CommandAddSubscriptionToTxn { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + repeated Subscription subscription = 4; +} + +message CommandAddSubscriptionToTxnResponse { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional ServerError error = 4; + optional string message = 5; +} + +message CommandEndTxn { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional TxnAction txn_action = 4; +} + +message CommandEndTxnResponse { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional ServerError error = 4; + optional string message = 5; +} + +message CommandEndTxnOnPartition { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional string topic = 4; + optional TxnAction txn_action = 5; + optional uint64 txnid_least_bits_of_low_watermark = 6; +} + +message CommandEndTxnOnPartitionResponse { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional ServerError error = 4; + optional string message = 5; +} + +message CommandEndTxnOnSubscription { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional Subscription subscription = 4; + optional TxnAction txn_action = 5; + optional uint64 txnid_least_bits_of_low_watermark = 6; +} + +message CommandEndTxnOnSubscriptionResponse { + required uint64 request_id = 1; + optional uint64 txnid_least_bits = 2 [default = 0]; + optional uint64 txnid_most_bits = 3 [default = 0]; + optional ServerError error = 4; + optional string message = 5; +} + +message BaseCommand { + enum Type { + CONNECT = 2; + CONNECTED = 3; + SUBSCRIBE = 4; + + PRODUCER = 5; + + SEND = 6; + SEND_RECEIPT = 7; + SEND_ERROR = 8; + + MESSAGE = 9; + ACK = 10; + FLOW = 11; + + UNSUBSCRIBE = 12; + + SUCCESS = 13; + ERROR = 14; + + CLOSE_PRODUCER = 15; + CLOSE_CONSUMER = 16; + + PRODUCER_SUCCESS = 17; + + PING = 18; + PONG = 19; + + REDELIVER_UNACKNOWLEDGED_MESSAGES = 20; + + PARTITIONED_METADATA = 21; + PARTITIONED_METADATA_RESPONSE = 22; + + LOOKUP = 23; + LOOKUP_RESPONSE = 24; + + CONSUMER_STATS = 25; + CONSUMER_STATS_RESPONSE = 26; + + REACHED_END_OF_TOPIC = 27; + + SEEK = 28; + + GET_LAST_MESSAGE_ID = 29; + GET_LAST_MESSAGE_ID_RESPONSE = 30; + + ACTIVE_CONSUMER_CHANGE = 31; + + + GET_TOPICS_OF_NAMESPACE = 32; + GET_TOPICS_OF_NAMESPACE_RESPONSE = 33; + + GET_SCHEMA = 34; + GET_SCHEMA_RESPONSE = 35; + + AUTH_CHALLENGE = 36; + AUTH_RESPONSE = 37; + + ACK_RESPONSE = 38; + + GET_OR_CREATE_SCHEMA = 39; + GET_OR_CREATE_SCHEMA_RESPONSE = 40; + + // transaction related + NEW_TXN = 50; + NEW_TXN_RESPONSE = 51; + + ADD_PARTITION_TO_TXN = 52; + ADD_PARTITION_TO_TXN_RESPONSE = 53; + + ADD_SUBSCRIPTION_TO_TXN = 54; + ADD_SUBSCRIPTION_TO_TXN_RESPONSE = 55; + + END_TXN = 56; + END_TXN_RESPONSE = 57; + + END_TXN_ON_PARTITION = 58; + END_TXN_ON_PARTITION_RESPONSE = 59; + + END_TXN_ON_SUBSCRIPTION = 60; + END_TXN_ON_SUBSCRIPTION_RESPONSE = 61; + TC_CLIENT_CONNECT_REQUEST = 62; + TC_CLIENT_CONNECT_RESPONSE = 63; + + } + + + required Type type = 1; + + optional CommandConnect connect = 2; + optional CommandConnected connected = 3; + + optional CommandSubscribe subscribe = 4; + optional CommandProducer producer = 5; + optional CommandSend send = 6; + optional CommandSendReceipt send_receipt = 7; + optional CommandSendError send_error = 8; + optional CommandMessage message = 9; + optional CommandAck ack = 10; + optional CommandFlow flow = 11; + optional CommandUnsubscribe unsubscribe = 12; + + optional CommandSuccess success = 13; + optional CommandError error = 14; + + optional CommandCloseProducer close_producer = 15; + optional CommandCloseConsumer close_consumer = 16; + + optional CommandProducerSuccess producer_success = 17; + optional CommandPing ping = 18; + optional CommandPong pong = 19; + optional CommandRedeliverUnacknowledgedMessages redeliverUnacknowledgedMessages = 20; + + optional CommandPartitionedTopicMetadata partitionMetadata = 21; + optional CommandPartitionedTopicMetadataResponse partitionMetadataResponse = 22; + + optional CommandLookupTopic lookupTopic = 23; + optional CommandLookupTopicResponse lookupTopicResponse = 24; + + optional CommandConsumerStats consumerStats = 25; + optional CommandConsumerStatsResponse consumerStatsResponse = 26; + + optional CommandReachedEndOfTopic reachedEndOfTopic = 27; + + optional CommandSeek seek = 28; + + optional CommandGetLastMessageId getLastMessageId = 29; + optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; + + optional CommandActiveConsumerChange active_consumer_change = 31; + + optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 32; + optional CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 33; + + optional CommandGetSchema getSchema = 34; + optional CommandGetSchemaResponse getSchemaResponse = 35; + + optional CommandAuthChallenge authChallenge = 36; + optional CommandAuthResponse authResponse = 37; + + optional CommandAckResponse ackResponse = 38; + + optional CommandGetOrCreateSchema getOrCreateSchema = 39; + optional CommandGetOrCreateSchemaResponse getOrCreateSchemaResponse = 40; + + // transaction related + optional CommandNewTxn newTxn = 50; + optional CommandNewTxnResponse newTxnResponse = 51; + optional CommandAddPartitionToTxn addPartitionToTxn = 52; + optional CommandAddPartitionToTxnResponse addPartitionToTxnResponse = 53; + optional CommandAddSubscriptionToTxn addSubscriptionToTxn = 54; + optional CommandAddSubscriptionToTxnResponse addSubscriptionToTxnResponse = 55; + optional CommandEndTxn endTxn = 56; + optional CommandEndTxnResponse endTxnResponse = 57; + optional CommandEndTxnOnPartition endTxnOnPartition = 58; + optional CommandEndTxnOnPartitionResponse endTxnOnPartitionResponse = 59; + optional CommandEndTxnOnSubscription endTxnOnSubscription = 60; + optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse = 61; + optional CommandTcClientConnectRequest tcClientConnectRequest = 62; + optional CommandTcClientConnectResponse tcClientConnectResponse = 63; +} diff --git a/examples/batching.rs b/examples/batching.rs index 23f317a..d5e5dbc 100644 --- a/examples/batching.rs +++ b/examples/batching.rs @@ -1,8 +1,14 @@ #![recursion_limit = "256"] #[macro_use] extern crate serde; + use futures::{future::join_all, TryStreamExt}; -use pulsar::{message::proto::command_subscribe::SubType, message::Payload, producer, Consumer, DeserializeMessage, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor, compression}; +use pulsar::{ + compression::*, + message::{proto::command_subscribe::SubType, Payload}, + producer, Consumer, DeserializeMessage, Error as PulsarError, Pulsar, SerializeMessage, + TokioExecutor, +}; #[derive(Debug, Serialize, Deserialize)] struct TestData { @@ -39,10 +45,10 @@ async fn main() -> Result<(), pulsar::Error> { .with_name("my-producer2".to_string()) .with_options(producer::ProducerOptions { batch_size: Some(4), - // compression: Some(compression::Compression::Lz4(compression::CompressionLz4::default())), - // compression: Some(compression::Compression::Zlib(compression::CompressionZlib::default())), - // compression: Some(compression::Compression::Zstd(compression::CompressionZstd::default())), - compression: Some(compression::Compression::Snappy(compression::CompressionSnappy::default())), + // compression: Some(Compression::Lz4(CompressionLz4::default())), + // compression: Some(Compression::Zlib(CompressionZlib::default())), + // compression: Some(Compression::Zstd(CompressionZstd::default())), + compression: Some(Compression::Snappy(CompressionSnappy::default())), ..Default::default() }) .build() diff --git a/examples/consumer.rs b/examples/consumer.rs index 192f7b7..5e3a329 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -1,11 +1,12 @@ #[macro_use] extern crate serde; +use std::env; + use futures::TryStreamExt; -use pulsar::authentication::oauth2::OAuth2Authentication; use pulsar::{ - Authentication, Consumer, DeserializeMessage, Payload, Pulsar, SubType, TokioExecutor, + authentication::oauth2::OAuth2Authentication, Authentication, Consumer, DeserializeMessage, + Payload, Pulsar, SubType, TokioExecutor, }; -use std::env; #[derive(Serialize, Deserialize)] struct TestData { diff --git a/examples/producer.rs b/examples/producer.rs index fb7be4c..e11ed30 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -1,11 +1,11 @@ #[macro_use] extern crate serde; -use pulsar::authentication::oauth2::OAuth2Authentication; +use std::env; + use pulsar::{ - message::proto, producer, Authentication, Error as PulsarError, Pulsar, SerializeMessage, - TokioExecutor, + authentication::oauth2::OAuth2Authentication, message::proto, producer, Authentication, + Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor, }; -use std::env; #[derive(Serialize, Deserialize)] struct TestData { diff --git a/examples/reader.rs b/examples/reader.rs index 71e80fe..40744c8 100644 --- a/examples/reader.rs +++ b/examples/reader.rs @@ -1,11 +1,12 @@ #[macro_use] extern crate serde; +use std::env; + use futures::TryStreamExt; use pulsar::{ consumer::ConsumerOptions, proto::Schema, reader::Reader, Authentication, DeserializeMessage, Payload, Pulsar, TokioExecutor, }; -use std::env; #[derive(Serialize, Deserialize)] struct TestData { diff --git a/examples/round_trip.rs b/examples/round_trip.rs index 0998a12..550de14 100644 --- a/examples/round_trip.rs +++ b/examples/round_trip.rs @@ -3,8 +3,9 @@ extern crate serde; use futures::TryStreamExt; use pulsar::{ - message::proto, message::proto::command_subscribe::SubType, message::Payload, producer, - Consumer, DeserializeMessage, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor, + message::{proto, proto::command_subscribe::SubType, Payload}, + producer, Consumer, DeserializeMessage, Error as PulsarError, Pulsar, SerializeMessage, + TokioExecutor, }; #[derive(Serialize, Deserialize)] diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..ae9b181 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,5 @@ +imports_granularity = "Crate" +group_imports = "StdExternalCrate" +comment_width = 100 +wrap_comments = true +format_code_in_doc_comments = true diff --git a/src/authentication.rs b/src/authentication.rs index 4947b5f..4a974b9 100644 --- a/src/authentication.rs +++ b/src/authentication.rs @@ -16,8 +16,7 @@ pub mod token { use async_trait::async_trait; - use crate::authentication::Authentication; - use crate::error::AuthenticationError; + use crate::{authentication::Authentication, error::AuthenticationError}; pub struct TokenAuthentication { token: Vec, @@ -54,24 +53,26 @@ pub mod token { #[cfg(feature = "auth-oauth2")] pub mod oauth2 { - use std::fmt::{Display, Formatter}; - use std::fs; - use std::time::Instant; + use std::{ + fmt::{Display, Formatter}, + fs, + time::Instant, + }; use async_trait::async_trait; use data_url::DataUrl; use nom::lib::std::ops::Add; - use oauth2::basic::{BasicClient, BasicTokenResponse}; - use oauth2::reqwest::async_http_client; - use oauth2::AuthType::RequestBody; - use oauth2::{AuthUrl, ClientId, ClientSecret, Scope, TokenResponse, TokenUrl}; - use openidconnect::core::CoreProviderMetadata; - use openidconnect::IssuerUrl; + use oauth2::{ + basic::{BasicClient, BasicTokenResponse}, + reqwest::async_http_client, + AuthType::RequestBody, + AuthUrl, ClientId, ClientSecret, Scope, TokenResponse, TokenUrl, + }; + use openidconnect::{core::CoreProviderMetadata, IssuerUrl}; use serde::Deserialize; use url::Url; - use crate::authentication::Authentication; - use crate::error::AuthenticationError; + use crate::{authentication::Authentication, error::AuthenticationError}; #[derive(Deserialize, Debug)] struct OAuth2PrivateParams { diff --git a/src/client.rs b/src/client.rs index ceae420..3a11581 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,26 +1,28 @@ -use std::string::FromUtf8Error; -use std::sync::Arc; +use std::{string::FromUtf8Error, sync::Arc}; -use futures::channel::{mpsc, oneshot}; - -use crate::connection::Authentication; -use crate::connection_manager::{ - BrokerAddress, ConnectionManager, ConnectionRetryOptions, OperationRetryOptions, TlsOptions, -}; -use crate::consumer::{ConsumerBuilder, ConsumerOptions, InitialPosition}; -use crate::error::ConnectionError; -use crate::error::Error; -use crate::executor::Executor; -use crate::message::proto::{self, CommandSendReceipt}; -use crate::message::Payload; -use crate::producer::{self, ProducerBuilder, SendFuture}; -use crate::service_discovery::ServiceDiscovery; use futures::{ + channel::{mpsc, oneshot}, future::{select, Either}, lock::Mutex, pin_mut, StreamExt, }; +use crate::{ + connection::Authentication, + connection_manager::{ + BrokerAddress, ConnectionManager, ConnectionRetryOptions, OperationRetryOptions, TlsOptions, + }, + consumer::{ConsumerBuilder, ConsumerOptions, InitialPosition}, + error::{ConnectionError, Error}, + executor::Executor, + message::{ + proto::{self, CommandSendReceipt}, + Payload, + }, + producer::{self, ProducerBuilder, SendFuture}, + service_discovery::ServiceDiscovery, +}; + /// Helper trait for consumer deserialization pub trait DeserializeMessage { /// type produced from the message @@ -238,9 +240,7 @@ impl Pulsar { /// # async fn run() -> Result<(), pulsar::Error> { /// let addr = "pulsar://127.0.0.1:6650"; /// // you can indicate which executor you use as the return type of client creation - /// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor) - /// .build() - /// .await?; + /// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?; /// # Ok(()) /// # } /// ``` diff --git a/src/compression.rs b/src/compression.rs index c67fdbc..c6e5333 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -44,7 +44,7 @@ impl Clone for CompressionLz4 { CompressionMode::HIGHCOMPRESSION(i) => CompressionMode::HIGHCOMPRESSION(i), CompressionMode::FAST(i) => CompressionMode::FAST(i), CompressionMode::DEFAULT => CompressionMode::DEFAULT, - } + }, } } } diff --git a/src/connection.rs b/src/connection.rs index 6b7679d..ee629d7 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,39 +1,41 @@ -use std::collections::BTreeMap; -use std::fmt::Debug; -use std::net::SocketAddr; -use std::pin::Pin; -use std::sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, +use std::{ + collections::BTreeMap, + fmt::Debug, + net::SocketAddr, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, }; -use std::time::Duration; use async_trait::async_trait; use futures::{ self, channel::{mpsc, oneshot}, - future::{Either, select}, - Future, - FutureExt, - pin_mut, Sink, SinkExt, Stream, StreamExt, task::{Context, Poll}, + future::{select, Either}, + lock::Mutex, + pin_mut, + task::{Context, Poll}, + Future, FutureExt, Sink, SinkExt, Stream, StreamExt, }; -use futures::lock::Mutex; use native_tls::Certificate; -use rand::thread_rng; -use rand::seq::SliceRandom; +use proto::MessageIdData; +use rand::{seq::SliceRandom, thread_rng}; use url::Url; use uuid::Uuid; -use proto::MessageIdData; - -use crate::consumer::ConsumerOptions; -use crate::error::{AuthenticationError, ConnectionError, SharedError}; -use crate::executor::{Executor, ExecutorKind}; -use crate::message::{ - BaseCommand, - Codec, Message, proto::{self, command_subscribe::SubType}, +use crate::{ + consumer::ConsumerOptions, + error::{AuthenticationError, ConnectionError, SharedError}, + executor::{Executor, ExecutorKind}, + message::{ + proto::{self, command_subscribe::SubType}, + BaseCommand, Codec, Message, + }, + producer::{self, ProducerOptions}, }; -use crate::producer::{self, ProducerOptions}; pub(crate) enum Register { Request { @@ -743,7 +745,7 @@ impl Connection { .await { Some(Some(addresses)) if !addresses.is_empty() => addresses, - _ => return Err(ConnectionError::NotFound) + _ => return Err(ConnectionError::NotFound), }; let id = Uuid::new_v4(); @@ -769,9 +771,8 @@ impl Connection { pin_mut!(sender_prepare); pin_mut!(delay_f); - let sender; - match select(sender_prepare, delay_f).await { - Either::Left((Ok(res), _)) => sender = res, + let sender = match select(sender_prepare, delay_f).await { + Either::Left((Ok(res), _)) => res, Either::Left((Err(err), _)) => { errors.push(err); continue; @@ -785,7 +786,7 @@ impl Connection { } }; - return Ok(Connection { id, url, sender }) + return Ok(Connection { id, url, sender }); } let mut fatal_errors = vec![]; @@ -798,7 +799,7 @@ impl Connection { } } - return if retryable_errors.is_empty() { + if retryable_errors.is_empty() { error!("connection error, not retryable: {:?}", fatal_errors); Err(ConnectionError::Io(std::io::Error::new( std::io::ErrorKind::Other, @@ -1099,16 +1100,17 @@ where pub(crate) mod messages { use chrono::Utc; - use proto::MessageIdData; - use crate::connection::Authentication; - use crate::consumer::ConsumerOptions; - use crate::message::{ - Message, - Payload, proto::{self, base_command::Type as CommandType, command_subscribe::SubType}, + use crate::{ + connection::Authentication, + consumer::ConsumerOptions, + message::{ + proto::{self, base_command::Type as CommandType, command_subscribe::SubType}, + Message, Payload, + }, + producer::{self, ProducerOptions}, }; - use crate::producer::{self, ProducerOptions}; #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub fn connect(auth: Option, proxy_to_broker_url: Option) -> Message { diff --git a/src/connection_manager.rs b/src/connection_manager.rs index bff8643..d3cf56c 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -1,15 +1,12 @@ -use crate::connection::Connection; -use crate::error::ConnectionError; -use crate::executor::Executor; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; +use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{channel::oneshot, lock::Mutex}; use native_tls::Certificate; use rand::Rng; use url::Url; +use crate::{connection::Connection, error::ConnectionError, executor::Executor}; + /// holds connection information for a broker #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct BrokerAddress { @@ -37,7 +34,7 @@ pub struct ConnectionRetryOptions { pub keep_alive: Duration, } -impl std::default::Default for ConnectionRetryOptions { +impl Default for ConnectionRetryOptions { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn default() -> Self { ConnectionRetryOptions { @@ -61,7 +58,7 @@ pub struct OperationRetryOptions { pub max_retries: Option, } -impl std::default::Default for OperationRetryOptions { +impl Default for OperationRetryOptions { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn default() -> Self { OperationRetryOptions { @@ -119,7 +116,7 @@ pub struct ConnectionManager { connection_retry_options: ConnectionRetryOptions, pub(crate) operation_retry_options: OperationRetryOptions, tls_options: TlsOptions, - certificate_chain: Vec, + certificate_chain: Vec, } impl ConnectionManager { diff --git a/src/consumer.rs b/src/consumer.rs index cf84ca1..6ec74af 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -1,38 +1,42 @@ //! Topic subscriptions -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; -use std::fmt::Debug; -use std::marker::PhantomData; -use std::pin::Pin; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use core::iter; +use std::{ + collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, + convert::TryFrom, + fmt::Debug, + marker::PhantomData, + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; use chrono::{DateTime, Utc}; -use futures::channel::mpsc::unbounded; -use futures::task::{Context, Poll}; use futures::{ channel::{mpsc, oneshot}, future::try_join_all, + task::{Context, Poll}, Future, FutureExt, SinkExt, Stream, StreamExt, }; +use rand::{distributions::Alphanumeric, Rng}; use regex::Regex; +use url::Url; -use crate::connection::Connection; -use crate::error::{ConnectionError, ConsumerError, Error}; -use crate::executor::Executor; -use crate::message::proto::CommandMessage; -use crate::message::{ - parse_batched_message, - proto::{self, command_subscribe::SubType, MessageIdData, MessageMetadata, Schema}, - BatchedMessage, Message as RawMessage, Metadata, Payload, +use crate::{ + connection::Connection, + error::{ConnectionError, ConsumerError, Error}, + executor::Executor, + message::{ + parse_batched_message, + proto::{ + self, command_subscribe::SubType, CommandMessage, MessageIdData, MessageMetadata, + Schema, + }, + BatchedMessage, Message as RawMessage, Metadata, Payload, + }, + proto::{BaseCommand, CommandCloseConsumer, CommandConsumerStatsResponse}, + reader::{Reader, State}, + BrokerAddress, DeserializeMessage, Pulsar, }; -use crate::proto::{BaseCommand, CommandCloseConsumer, CommandConsumerStatsResponse}; -use crate::reader::{Reader, State}; -use crate::{BrokerAddress, DeserializeMessage, Pulsar}; -use core::iter; -use rand::distributions::Alphanumeric; -use rand::Rng; -use std::convert::TryFrom; -use url::Url; /// Configuration options for consumers #[derive(Clone, Default, Debug)] @@ -109,7 +113,8 @@ impl ConsumerOptions { #[derive(Debug, Clone)] pub struct DeadLetterPolicy { - /// Maximum number of times that a message will be redelivered before being sent to the dead letter queue. + /// Maximum number of times that a message will be redelivered before being sent to the dead + /// letter queue. pub max_redeliver_count: usize, /// Name of the dead topic where the failing messages will be sent. pub dead_letter_topic: String, @@ -268,8 +273,10 @@ impl Consumer { })) .await?; - let consumers: BTreeMap<_, _> = - consumers.into_iter().map(|c| (c.topic(), Box::pin(c))).collect(); + let consumers: BTreeMap<_, _> = consumers + .into_iter() + .map(|c| (c.topic(), Box::pin(c))) + .collect(); let topics: VecDeque = consumers.keys().cloned().collect(); let existing_topics = topics.clone(); let topic_refresh = Duration::from_secs(30); @@ -328,7 +335,11 @@ impl Consumer { match &mut self.inner { InnerConsumer::Single(c) => Ok(vec![c.connection().await?.url().clone()]), InnerConsumer::Multi(c) => { - let v = c.consumers.values_mut().map(|c| c.connection()).collect::>(); + let v = c + .consumers + .values_mut() + .map(|c| c.connection()) + .collect::>(); let mut connections = try_join_all(v).await?; Ok(connections @@ -455,7 +466,7 @@ enum InnerConsumer { Multi(MultiTopicConsumer), } -type MessageIdDataReceiver = mpsc::Receiver>; +type MessageIdDataReceiver = mpsc::Receiver>; // this is entirely public for use in reader.rs pub(crate) struct TopicConsumer { @@ -495,7 +506,7 @@ impl TopicConsumer { let mut connection = client.manager.get_connection(&addr).await?; let mut current_retries = 0u32; - let start = std::time::Instant::now(); + let start = Instant::now(); let operation_retry_options = client.operation_retry_options.clone(); loop { @@ -514,7 +525,7 @@ impl TopicConsumer { { Ok(_) => { if current_retries > 0 { - let dur = (std::time::Instant::now() - start).as_secs(); + let dur = (Instant::now() - start).as_secs(); log::info!( "subscribe({}) success after {} retries over {} seconds", topic, @@ -536,13 +547,19 @@ impl TopicConsumer { operation_retry_options.max_retries, text.unwrap_or_default()); current_retries += 1; - client.executor.delay(operation_retry_options.retry_delay).await; + client + .executor + .delay(operation_retry_options.retry_delay) + .await; // we need to look up again the topic's address let prev = addr; addr = client.lookup_topic(&topic).await?; if prev != addr { - info!("topic {} moved: previous = {:?}, new = {:?}", topic, prev, addr); + info!( + "topic {} moved: previous = {:?}, new = {:?}", + topic, prev, addr + ); } connection = client.manager.get_connection(&addr).await?; @@ -570,12 +587,11 @@ impl TopicConsumer { }) .map_err(|e| Error::Consumer(ConsumerError::Connection(e)))?; - let (engine_tx, engine_rx) = unbounded(); + let (engine_tx, engine_rx) = mpsc::unbounded(); // drop_signal will be dropped when Consumer is dropped, then // drop_receiver will return, and we can close the consumer let (_drop_signal, drop_receiver) = oneshot::channel::<()>(); let conn = connection.clone(); - //let ack_sender = nack_handler.clone(); let name = consumer_name.clone(); let topic_name = topic.clone(); let _ = client.executor.spawn(Box::pin(async move { @@ -596,7 +612,11 @@ impl TopicConsumer { let mut interval = client.executor.interval(Duration::from_millis(500)); let res = client.executor.spawn(Box::pin(async move { while interval.next().await.is_some() { - if redelivery_tx.send(EngineMessage::UnackedRedelivery).await.is_err() { + if redelivery_tx + .send(EngineMessage::UnackedRedelivery) + .await + .is_err() + { // Consumer shut down - stop ticker break; } @@ -685,7 +705,9 @@ impl TopicConsumer { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub async fn ack(&mut self, msg: &Message) -> Result<(), ConsumerError> { - self.engine_tx.send(EngineMessage::Ack(msg.message_id.clone(), false)).await?; + self.engine_tx + .send(EngineMessage::Ack(msg.message_id.clone(), false)) + .await?; Ok(()) } @@ -696,13 +718,17 @@ impl TopicConsumer { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] async fn cumulative_ack(&mut self, msg: &Message) -> Result<(), ConsumerError> { - self.engine_tx.send(EngineMessage::Ack(msg.message_id.clone(), true)).await?; + self.engine_tx + .send(EngineMessage::Ack(msg.message_id.clone(), true)) + .await?; Ok(()) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] async fn nack(&mut self, msg: &Message) -> Result<(), ConsumerError> { - self.engine_tx.send(EngineMessage::Nack(msg.message_id.clone())).await?; + self.engine_tx + .send(EngineMessage::Nack(msg.message_id.clone())) + .await?; Ok(()) } @@ -713,14 +739,22 @@ impl TopicConsumer { timestamp: Option, ) -> Result<(), Error> { let consumer_id = self.consumer_id; - self.connection().await?.sender().seek(consumer_id, message_id, timestamp).await?; + self.connection() + .await? + .sender() + .seek(consumer_id, message_id, timestamp) + .await?; Ok(()) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub async fn unsubscribe(&mut self) -> Result<(), Error> { let consumer_id = self.consumer_id; - self.connection().await?.sender().unsubscribe(consumer_id).await?; + self.connection() + .await? + .sender() + .unsubscribe(consumer_id) + .await?; Ok(()) } @@ -748,7 +782,7 @@ impl TopicConsumer { } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - fn create_message(&self, message_id: proto::MessageIdData, payload: Payload) -> Message { + fn create_message(&self, message_id: MessageIdData, payload: Payload) -> Message { Message { topic: self.topic.clone(), message_id: MessageData { @@ -792,7 +826,7 @@ struct ConsumerEngine { sub_type: SubType, id: u64, name: Option, - tx: mpsc::Sender>, + tx: mpsc::Sender>, messages_rx: Option>, engine_rx: Option>>, event_rx: mpsc::UnboundedReceiver>, @@ -823,7 +857,7 @@ impl ConsumerEngine { sub_type: SubType, id: u64, name: Option, - tx: mpsc::Sender>, + tx: mpsc::Sender>, messages_rx: mpsc::UnboundedReceiver, engine_rx: mpsc::UnboundedReceiver>, batch_size: u32, @@ -857,8 +891,9 @@ impl ConsumerEngine { } fn register_source(&self, mut rx: mpsc::UnboundedReceiver, mapper: M) -> Result<(), ()> - where E: Send + 'static, - M: Fn(Option) -> EngineEvent + Send + Sync + 'static, + where + E: Send + 'static, + M: Fn(Option) -> EngineEvent + Send + Sync + 'static, { let mut event_tx = self.event_tx.clone(); @@ -884,19 +919,27 @@ impl ConsumerEngine { loop { if !self.connection.is_valid() { if let Some(err) = self.connection.error() { - error!("Consumer: connection {} is not valid: {:?}", self.connection.id(), err); + error!( + "Consumer: connection {} is not valid: {:?}", + self.connection.id(), + err + ); self.reconnect().await?; } } if let Some(messages_rx) = self.messages_rx.take() { self.register_source(messages_rx, |msg| EngineEvent::Message(msg)) - .map_err(|_| Error::Custom(String::from("Error registering messages_rx source")))?; + .map_err(|_| { + Error::Custom(String::from("Error registering messages_rx source")) + })?; } if let Some(engine_rx) = self.engine_rx.take() { self.register_source(engine_rx, |msg| EngineEvent::EngineMessage(msg)) - .map_err(|_| Error::Custom(String::from("Error registering engine_rx source")))?; + .map_err(|_| { + Error::Custom(String::from("Error registering engine_rx source")) + })?; } if self.remaining_messages < self.batch_size / 2 { @@ -937,110 +980,106 @@ impl ConsumerEngine { } } } - } - + } - async fn handle_message_opt(&mut self, message_opt: Option) -> Option> { - match message_opt { - None => { - error!("Consumer: messages::next: returning Disconnected"); + async fn handle_message_opt( + &mut self, + message_opt: Option, + ) -> Option> { + match message_opt { + None => { + error!("Consumer: messages::next: returning Disconnected"); if let Err(err) = self.reconnect().await { Some(Err(err)) } else { None } - //return Err(Error::Consumer(ConsumerError::Connection(ConnectionError::Disconnected)).into()); - } - Some(message) => { - self.remaining_messages -= message - .payload - .as_ref() - .and_then(|payload| payload.metadata.num_messages_in_batch) - .unwrap_or(1i32) - as u32; - - match self.process_message(message).await { - // Continue - Ok(true) => { - None - } - // End of Topic - Ok(false) => { - Some(Ok(())) - } - Err(e) => { - if let Err(e) = self.tx.send(Err(e)).await { - error!("cannot send a message from the consumer engine to the consumer({}), stopping the engine", self.id); + //return Err(Error::Consumer(ConsumerError::Connection(ConnectionError::Disconnected)).into()); + } + Some(message) => { + self.remaining_messages -= message + .payload + .as_ref() + .and_then(|payload| payload.metadata.num_messages_in_batch) + .unwrap_or(1i32) as u32; + + match self.process_message(message).await { + // Continue + Ok(true) => None, + // End of Topic + Ok(false) => Some(Ok(())), + Err(e) => { + if let Err(e) = self.tx.send(Err(e)).await { + error!("cannot send a message from the consumer engine to the consumer({}), stopping the engine", self.id); Some(Err(Error::Consumer(e.into()))) } else { None - } - } - } } } } + } + } + } fn handle_ack_opt(&mut self, ack_opt: Option>) -> bool { - match ack_opt { - None => { - trace!("ack channel was closed"); + match ack_opt { + None => { + trace!("ack channel was closed"); false - } - Some(EngineMessage::Ack(message_id, cumulative)) => { - self.ack(message_id, cumulative); + } + Some(EngineMessage::Ack(message_id, cumulative)) => { + self.ack(message_id, cumulative); true - } - Some(EngineMessage::Nack(message_id)) => { - if let Err(e) = - self.connection.sender().send_redeliver_unacknowleged_messages( - self.id, - vec![message_id.id.clone()], - ) - { - error!( - "could not ask for redelivery for message {:?}: {:?}", - message_id, e - ); - } + } + Some(EngineMessage::Nack(message_id)) => { + if let Err(e) = self + .connection + .sender() + .send_redeliver_unacknowleged_messages(self.id, vec![message_id.id.clone()]) + { + error!( + "could not ask for redelivery for message {:?}: {:?}", + message_id, e + ); + } true + } + Some(EngineMessage::UnackedRedelivery) => { + let mut h = HashSet::new(); + let now = Instant::now(); + // info!("unacked messages length: {}", self.unacked_messages.len()); + for (id, t) in self.unacked_messages.iter() { + if *t < now { + h.insert(id.clone()); + } + } + + let ids: Vec<_> = h.iter().cloned().collect(); + if !ids.is_empty() { + // info!("will unack ids: {:?}", ids); + if let Err(e) = self + .connection + .sender() + .send_redeliver_unacknowleged_messages(self.id, ids) + { + error!("could not ask for redelivery: {:?}", e); + } else { + for i in h.iter() { + self.unacked_messages.remove(i); } - Some(EngineMessage::UnackedRedelivery) => { - let mut h = HashSet::new(); - let now = Instant::now(); - // info!("unacked messages length: {}", self.unacked_messages.len()); - for (id, t) in self.unacked_messages.iter() { - if *t < now { - h.insert(id.clone()); - } - } - - let ids: Vec<_> = h.iter().cloned().collect(); - if !ids.is_empty() { - // info!("will unack ids: {:?}", ids); - if let Err(e) = self - .connection - .sender() - .send_redeliver_unacknowleged_messages(self.id, ids) - { - error!("could not ask for redelivery: {:?}", e); - } else { - for i in h.iter() { - self.unacked_messages.remove(i); - } - } - } + } + } true - } - Some(EngineMessage::GetConnection(sender)) => { - let _ = sender.send(self.connection.clone()).map_err(|_| { - error!( - "consumer requested the engine's connection but dropped the \ + } + Some(EngineMessage::GetConnection(sender)) => { + let _ = sender.send(self.connection.clone()).map_err(|_| { + error!( + "consumer requested the engine's connection but dropped the \ channel before receiving" - ); - }); + ); + }); true - } + } } } @@ -1048,7 +1087,10 @@ impl ConsumerEngine { fn ack(&mut self, message_id: MessageData, cumulative: bool) { // FIXME: this does not handle cumulative acks self.unacked_messages.remove(&message_id.id); - let res = self.connection.sender().send_ack(self.id, vec![message_id.id], cumulative); + let res = self + .connection + .sender() + .send_ack(self.id, vec![message_id.id], cumulative); if res.is_err() { error!("ack error: {:?}", res); } @@ -1058,11 +1100,22 @@ impl ConsumerEngine { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] async fn process_message(&mut self, message: RawMessage) -> Result { match message { - RawMessage { command: BaseCommand { reached_end_of_topic: Some(_), .. }, .. } => { + RawMessage { + command: + BaseCommand { + reached_end_of_topic: Some(_), + .. + }, + .. + } => { return Ok(false); } RawMessage { - command: BaseCommand { active_consumer_change: Some(active_consumer_change), .. }, + command: + BaseCommand { + active_consumer_change: Some(active_consumer_change), + .. + }, .. } => { // TODO: Communicate this status to the Consumer and expose it @@ -1073,18 +1126,31 @@ impl ConsumerEngine { ); } RawMessage { - command: BaseCommand { message: Some(message), .. }, + command: + BaseCommand { + message: Some(message), + .. + }, payload: Some(payload), } => { self.process_payload(message, payload).await?; } - RawMessage { command: BaseCommand { message: Some(_), .. }, payload: None } => { - error!("Consumer {} received message without payload", self.debug_format()); + RawMessage { + command: BaseCommand { + message: Some(_), .. + }, + payload: None, + } => { + error!( + "Consumer {} received message without payload", + self.debug_format() + ); } RawMessage { command: BaseCommand { - close_consumer: Some(CommandCloseConsumer { consumer_id, .. }), .. + close_consumer: Some(CommandCloseConsumer { consumer_id, .. }), + .. }, .. } => { @@ -1118,14 +1184,13 @@ impl ConsumerEngine { let compression = match payload.metadata.compression { None => proto::CompressionType::None, Some(compression) => { - proto::CompressionType::from_i32(compression) - .ok_or_else(|| { - error!("unknown compression type: {}", compression); - Error::Consumer(ConsumerError::Io(std::io::Error::new( - std::io::ErrorKind::Other, - format!("unknown compression type: {}", compression), - ))) - })? + proto::CompressionType::from_i32(compression).ok_or_else(|| { + error!("unknown compression type: {}", compression); + Error::Consumer(ConsumerError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + format!("unknown compression type: {}", compression), + ))) + })? } }; @@ -1165,12 +1230,14 @@ impl ConsumerEngine { #[cfg(feature = "flate2")] { - use flate2::read::ZlibDecoder; use std::io::Read; + use flate2::read::ZlibDecoder; + let mut d = ZlibDecoder::new(&payload.data[..]); let mut decompressed_payload = Vec::new(); - d.read_to_end(&mut decompressed_payload).map_err(ConsumerError::Io)?; + d.read_to_end(&mut decompressed_payload) + .map_err(ConsumerError::Io)?; payload.data = decompressed_payload; payload @@ -1211,7 +1278,9 @@ impl ConsumerEngine { let mut decompressed_payload = Vec::new(); let mut decoder = snap::read::FrameDecoder::new(&payload.data[..]); - decoder.read_to_end(&mut decompressed_payload).map_err(ConsumerError::Io)?; + decoder + .read_to_end(&mut decompressed_payload) + .map_err(ConsumerError::Io)?; payload.data = decompressed_payload; payload @@ -1240,7 +1309,13 @@ impl ConsumerEngine { Error::Custom("DLQ send error".to_string()) })?; - self.ack(MessageData { id: message.message_id, batch_size: None }, false); + self.ack( + MessageData { + id: message.message_id, + batch_size: None, + }, + false, + ); } else { self.send_to_consumer(message.message_id, payload).await? } @@ -1258,10 +1333,13 @@ impl ConsumerEngine { payload: Payload, ) -> Result<(), Error> { let now = Instant::now(); - self.tx.send(Ok((message_id.clone(), payload))).await.map_err(|e| { - error!("tx returned {:?}", e); - Error::Custom("tx closed".to_string()) - })?; + self.tx + .send(Ok((message_id.clone(), payload))) + .await + .map_err(|e| { + error!("tx returned {:?}", e); + Error::Custom("tx closed".to_string()) + })?; if let Some(duration) = self.unacked_message_redelivery_delay { self.unacked_messages.insert(message_id, now + duration); } @@ -1321,7 +1399,10 @@ impl ConsumerEngine { })); let old_signal = std::mem::replace(&mut self._drop_signal, _drop_signal); if let Err(e) = old_signal.send(()) { - error!("could not send the drop signal to the old consumer(id={}): {:?}", id, e); + error!( + "could not send the drop signal to the old consumer(id={}): {:?}", + id, e + ); } Ok(()) @@ -1333,32 +1414,42 @@ impl ConsumerEngine { "[{id} - {subscription}{name}: {topic}]", id = self.id, subscription = &self.subscription, - name = self.name.as_ref().map(|s| format!("({})", s)).unwrap_or_default(), + name = self + .name + .as_ref() + .map(|s| format!("({})", s)) + .unwrap_or_default(), topic = &self.topic ) } #[cfg(feature = "async-std")] - async fn timeout, O>(fut: F, dur: Duration) -> Result { + async fn timeout, O>( + fut: F, + dur: Duration, + ) -> Result { use async_std::prelude::FutureExt; fut.timeout(dur).await } #[cfg(all(not(feature = "async-std"), feature = "tokio"))] - async fn timeout, O>(fut: F, dur: Duration) -> Result { + async fn timeout, O>( + fut: F, + dur: Duration, + ) -> Result { tokio::time::timeout_at(tokio::time::Instant::now() + dur, fut).await } } #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct MessageData { - pub id: proto::MessageIdData, + pub id: MessageIdData, batch_size: Option, } struct BatchedMessageIterator { messages: std::vec::IntoIter, - message_id: proto::MessageIdData, + message_id: MessageIdData, metadata: Metadata, total_messages: u32, current_index: u32, @@ -1366,9 +1457,11 @@ struct BatchedMessageIterator { impl BatchedMessageIterator { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - fn new(message_id: proto::MessageIdData, payload: Payload) -> Result { - let total_messages = - payload.metadata.num_messages_in_batch.expect("expected batched message") as u32; + fn new(message_id: MessageIdData, payload: Payload) -> Result { + let total_messages = payload + .metadata + .num_messages_in_batch + .expect("expected batched message") as u32; let messages = parse_batched_message(total_messages, &payload.data)?; Ok(Self { @@ -1382,7 +1475,7 @@ impl BatchedMessageIterator { } impl Iterator for BatchedMessageIterator { - type Item = (proto::MessageIdData, Payload); + type Item = (MessageIdData, Payload); #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn next(&mut self) -> Option { @@ -1393,8 +1486,10 @@ impl Iterator for BatchedMessageIterator { let index = self.current_index; self.current_index += 1; if let Some(batched_message) = self.messages.next() { - let id = - proto::MessageIdData { batch_index: Some(index as i32), ..self.message_id.clone() }; + let id = MessageIdData { + batch_index: Some(index as i32), + ..self.message_id.clone() + }; let metadata = Metadata { properties: batched_message.metadata.properties, @@ -1403,7 +1498,10 @@ impl Iterator for BatchedMessageIterator { ..self.metadata.clone() }; - let payload = Payload { metadata, data: batched_message.payload }; + let payload = Payload { + metadata, + data: batched_message.payload, + }; Some((id, payload)) } else { @@ -1606,7 +1704,10 @@ impl ConsumerBuilder { } let topics: Vec<(String, BrokerAddress)> = try_join_all( - topics.into_iter().flatten().map(|topic| pulsar.lookup_partitioned_topic(topic)), + topics + .into_iter() + .flatten() + .map(|topic| pulsar.lookup_partitioned_topic(topic)), ) .await? .into_iter() @@ -1614,7 +1715,9 @@ impl ConsumerBuilder { .collect(); if topics.is_empty() && topic_regex.is_none() { - return Err(Error::Custom("Unable to create consumer - topic not found".to_string())); + return Err(Error::Custom( + "Unable to create consumer - topic not found".to_string(), + )); } let consumer_id = match (consumer_id, topics.len()) { @@ -1634,7 +1737,10 @@ impl ConsumerBuilder { .map(|c| c as char) .collect(); let subscription = format!("sub_{}", s); - warn!("Subscription not specified. Using new subscription `{}`.", subscription); + warn!( + "Subscription not specified. Using new subscription `{}`.", + subscription + ); subscription }); let sub_type = subscription_type.unwrap_or_else(|| { @@ -1670,14 +1776,20 @@ impl ConsumerBuilder { let consumer = consumers.into_iter().next().unwrap(); InnerConsumer::Single(consumer) } else { - let consumers: BTreeMap<_, _> = - consumers.into_iter().map(|c| (c.topic(), Box::pin(c))).collect(); + let consumers: BTreeMap<_, _> = consumers + .into_iter() + .map(|c| (c.topic(), Box::pin(c))) + .collect(); let topics: VecDeque = consumers.keys().cloned().collect(); let existing_topics = topics.clone(); - let topic_refresh = self.topic_refresh.unwrap_or_else(|| Duration::from_secs(30)); + let topic_refresh = self + .topic_refresh + .unwrap_or_else(|| Duration::from_secs(30)); let refresh = Box::pin(self.pulsar.executor.interval(topic_refresh).map(drop)); let mut consumer = MultiTopicConsumer { - namespace: self.namespace.unwrap_or_else(|| "public/default".to_string()), + namespace: self + .namespace + .unwrap_or_else(|| "public/default".to_string()), topic_regex: self.topic_regex, pulsar: self.pulsar, consumers, @@ -1711,13 +1823,18 @@ impl ConsumerBuilder { config.sub_type = SubType::Exclusive; if self.topics.unwrap().len() > 1 { - return Err(Error::Custom("Unable to create a reader - one topic max".to_string())); + return Err(Error::Custom( + "Unable to create a reader - one topic max".to_string(), + )); } let (topic, addr) = joined_topics.pop().unwrap(); let consumer = TopicConsumer::new(self.pulsar.clone(), topic, addr, config.clone()).await?; - Ok(Reader { consumer, state: Some(State::PollingConsumer) }) + Ok(Reader { + consumer, + state: Some(State::PollingConsumer), + }) } } @@ -1802,7 +1919,13 @@ impl MultiTopicConsumer { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] async fn check_connections(&mut self) -> Result<(), Error> { - self.pulsar.manager.get_base_connection().await?.sender().send_ping().await?; + self.pulsar + .manager + .get_base_connection() + .await? + .sender() + .send_ping() + .await?; for consumer in self.consumers.values_mut() { consumer.connection().await?.sender().send_ping().await?; @@ -1874,8 +1997,10 @@ impl MultiTopicConsumer { .await?; trace!("fetched topics {:?}", topics); - let mut matched_topics = - all_topics.into_iter().filter(|t| regex.is_match(t)).collect(); + let mut matched_topics = all_topics + .into_iter() + .filter(|t| regex.is_match(t)) + .collect(); trace!("matched topics {:?} (regex: {})", matched_topics, ®ex); @@ -1883,22 +2008,25 @@ impl MultiTopicConsumer { } // 2. lookup partitioned topic - let topics: Vec<_> = try_join_all( - topics.into_iter().map(|topic| pulsar.lookup_partitioned_topic(topic)), + let topics = try_join_all( + topics + .into_iter() + .map(|topic| pulsar.lookup_partitioned_topic(topic)), ) .await? .into_iter() - .flatten() - .collect(); + .flatten(); // 3. create consumers - let consumers = - try_join_all(topics.into_iter().filter(|(t, _)| !existing_topics.contains(t)).map( - |(topic, addr)| { + let consumers = try_join_all( + topics + .into_iter() + .filter(|(t, _)| !existing_topics.contains(t)) + .map(|(topic, addr)| { TopicConsumer::new(pulsar.clone(), topic, addr, consumer_config.clone()) - }, - )) - .await?; + }), + ) + .await?; trace!("created {} consumers", consumers.len()); Ok(consumers) })); @@ -1995,7 +2123,7 @@ impl Message { /// Get Pulsar message id for the message #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub fn message_id(&self) -> &proto::MessageIdData { + pub fn message_id(&self) -> &MessageIdData { &self.message_id.id } @@ -2016,7 +2144,11 @@ impl Message { impl Debug for MultiTopicConsumer { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "MultiTopicConsumer({:?}, {:?})", &self.namespace, &self.topic_regex) + write!( + f, + "MultiTopicConsumer({:?}, {:?})", + &self.namespace, &self.topic_regex + ) } } @@ -2053,7 +2185,11 @@ impl Stream for MultiTopicConsum break; } let topic = self.existing_topics.pop_front().unwrap(); - if let Some(item) = self.consumers.get_mut(&topic).map(|c| c.as_mut().poll_next(cx)) { + if let Some(item) = self + .consumers + .get_mut(&topic) + .map(|c| c.as_mut().poll_next(cx)) + { match item { Poll::Pending => {} Poll::Ready(Some(Ok(msg))) => result = Some(msg), @@ -2062,7 +2198,10 @@ impl Stream for MultiTopicConsum topics_to_remove.push(topic.clone()); } Poll::Ready(Some(Err(e))) => { - error!("Unexpected error consuming from pulsar topic {}: {}", &topic, e); + error!( + "Unexpected error consuming from pulsar topic {}: {}", + &topic, e + ); topics_to_remove.push(topic.clone()); } } @@ -2084,19 +2223,20 @@ impl Stream for MultiTopicConsum mod tests { use std::time::{SystemTime, UNIX_EPOCH}; - use futures::{StreamExt, TryStreamExt}; + use futures::{ + future::{select, Either}, + StreamExt, TryStreamExt, + }; use log::LevelFilter; use regex::Regex; #[cfg(feature = "tokio-runtime")] use tokio::time::timeout; + use super::*; #[cfg(feature = "tokio-runtime")] use crate::executor::TokioExecutor; use crate::{producer, tests::TEST_LOGGER, Pulsar, SerializeMessage}; - use super::*; - use futures::future::{select, Either}; - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct TestData { topic: String, @@ -2106,7 +2246,10 @@ mod tests { impl<'a> SerializeMessage for &'a TestData { fn serialize_message(input: Self) -> Result { let payload = serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?; - Ok(producer::Message { payload, ..Default::default() }) + Ok(producer::Message { + payload, + ..Default::default() + }) } } @@ -2118,8 +2261,9 @@ mod tests { } } - pub static MULTI_LOGGER: crate::tests::SimpleLogger = - crate::tests::SimpleLogger { tag: "multi_consumer" }; + pub static MULTI_LOGGER: crate::tests::SimpleLogger = crate::tests::SimpleLogger { + tag: "multi_consumer", + }; #[tokio::test] #[cfg(feature = "tokio-runtime")] async fn multi_consumer() { @@ -2131,10 +2275,22 @@ mod tests { let topic1 = format!("multi_consumer_a_{}", topic_n); let topic2 = format!("multi_consumer_b_{}", topic_n); - let data1 = TestData { topic: "a".to_owned(), msg: 1 }; - let data2 = TestData { topic: "a".to_owned(), msg: 2 }; - let data3 = TestData { topic: "b".to_owned(), msg: 3 }; - let data4 = TestData { topic: "b".to_owned(), msg: 4 }; + let data1 = TestData { + topic: "a".to_owned(), + msg: 1, + }; + let data2 = TestData { + topic: "a".to_owned(), + msg: 2, + }; + let data3 = TestData { + topic: "b".to_owned(), + msg: 3, + }; + let data4 = TestData { + topic: "b".to_owned(), + msg: 4, + }; let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap(); @@ -2174,14 +2330,19 @@ mod tests { let expected: HashSet<_> = vec![data1, data2, data3, data4].into_iter().collect(); for consumer in [consumer_1, consumer_2].iter_mut() { let connected_topics = consumer.topics(); - debug!("connected topics for {}: {:?}", consumer.subscription(), &connected_topics); + debug!( + "connected topics for {}: {:?}", + consumer.subscription(), + &connected_topics + ); assert_eq!(connected_topics.len(), 2); assert!(connected_topics.iter().any(|t| t.ends_with(&topic1))); assert!(connected_topics.iter().any(|t| t.ends_with(&topic2))); let mut received = HashSet::new(); - while let Some(message) = - timeout(Duration::from_secs(1), consumer.next()).await.unwrap() + while let Some(message) = timeout(Duration::from_secs(1), consumer.next()) + .await + .unwrap() { received.insert(message.unwrap().deserialize().unwrap()); if received.len() == 4 { @@ -2202,12 +2363,15 @@ mod tests { log::set_max_level(LevelFilter::Debug); let addr = "pulsar://127.0.0.1:6650"; - let topic = format!("consumer_dropped_with_lingering_acks_{}", rand::random::()); + let topic = format!( + "consumer_dropped_with_lingering_acks_{}", + rand::random::() + ); let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap(); let message = TestData { - topic: std::iter::repeat(()) + topic: iter::repeat(()) .map(|()| rand::thread_rng().sample(Alphanumeric) as char) .take(8) .map(|c| c as char) @@ -2237,8 +2401,11 @@ mod tests { println!("created consumer"); // consumer.next().await - let msg: Message = - timeout(Duration::from_secs(1), consumer.next()).await.unwrap().unwrap().unwrap(); + let msg: Message = timeout(Duration::from_secs(1), consumer.next()) + .await + .unwrap() + .unwrap() + .unwrap(); println!("got message: {:?}", msg.payload); assert_eq!( message, @@ -2297,11 +2464,14 @@ mod tests { let topic = format!("dead_letter_queue_test_{}", test_id); let test_msg: u32 = rand::random(); - let message = TestData { topic: topic.clone(), msg: test_msg }; + let message = TestData { + topic: topic.clone(), + msg: test_msg, + }; let dead_letter_topic = format!("{}_dlq", topic); - let dead_letter_policy = crate::consumer::DeadLetterPolicy { + let dead_letter_policy = DeadLetterPolicy { max_redeliver_count: 1, dead_letter_topic: dead_letter_topic.clone(), }; @@ -2367,7 +2537,9 @@ mod tests { let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap(); let msg_count = 100_u32; - try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string()))).await.unwrap(); + try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string()))) + .await + .unwrap(); let builder = client .consumer() @@ -2429,14 +2601,18 @@ mod tests { // send 100 messages and record the starting time let msg_count = 100_u32; - let start_time: u64 = - SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + let start_time: u64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; std::thread::sleep(Duration::from_secs(2)); println!("this is the starting time: {}", start_time); - try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string()))).await.unwrap(); + try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string()))) + .await + .unwrap(); log::info!("sent all messages"); let mut consumer_1: Consumer = client @@ -2480,7 +2656,10 @@ mod tests { // // call seek(timestamp), roll back the consumer to start_time log::info!("calling seek method"); - consumer_1.seek(None, None, Some(start_time), client).await.unwrap(); + consumer_1 + .seek(None, None, Some(start_time), client) + .await + .unwrap(); // let mut consumer_2: Consumer = client // .consumer() diff --git a/src/error.rs b/src/error.rs index b34975d..8a5200d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,9 +1,13 @@ //! Error types -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, +use std::{ + fmt, io, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, }; -use std::{fmt, io}; + +use crate::{message::proto::ServerError, producer::SendFuture}; #[derive(Debug)] pub enum Error { @@ -94,9 +98,9 @@ pub enum ConnectionError { impl ConnectionError { pub fn establish_retryable(&self) -> bool { match self { - ConnectionError::Io(e) => - e.kind() == io::ErrorKind::ConnectionRefused || - e.kind() == io::ErrorKind::TimedOut, + ConnectionError::Io(e) => { + e.kind() == io::ErrorKind::ConnectionRefused || e.kind() == io::ErrorKind::TimedOut + } _ => false, } } @@ -230,7 +234,8 @@ pub enum ProducerError { PartialSend(Vec>), /// Indiciates the error was part of sending a batch, and thus shared across the batch Batch(Arc), - /// Indicates this producer has lost exclusive access to the topic. Client can decided whether to recreate or not + /// Indicates this producer has lost exclusive access to the topic. Client can decided whether + /// to recreate or not Fenced, } @@ -424,9 +429,6 @@ impl SharedError { } } -use crate::message::proto::ServerError; -use crate::producer::SendFuture; - #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub(crate) fn server_error(i: i32) -> Option { match i { diff --git a/src/executor.rs b/src/executor.rs index 6bebcb7..3273f30 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -2,8 +2,9 @@ //! //! this crate is compatible with Tokio and async-std, by assembling them //! under the [Executor] trait +use std::{ops::Deref, pin::Pin, sync::Arc, task::Poll}; + use futures::{Future, Stream}; -use std::{ops::Deref, pin::Pin, sync::Arc}; /// indicates which executor is used pub enum ExecutorKind { @@ -159,7 +160,6 @@ pub enum JoinHandle { PlaceHolder(T), } -use std::task::Poll; impl Future for JoinHandle { type Output = Option; diff --git a/src/lib.rs b/src/lib.rs index 68328ab..6df45ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,8 @@ //! //! impl SerializeMessage for TestData { //! fn serialize_message(input: Self) -> Result { -//! let payload = serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?; +//! let payload = +//! serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?; //! Ok(producer::Message { //! payload, //! ..Default::default() @@ -89,8 +90,8 @@ //! ```rust,no_run //! use futures::TryStreamExt; //! use pulsar::{ -//! message::proto::command_subscribe::SubType, message::Payload, Consumer, DeserializeMessage, -//! Pulsar, TokioExecutor, +//! message::{proto::command_subscribe::SubType, Payload}, +//! Consumer, DeserializeMessage, Pulsar, TokioExecutor, //! }; //! use serde::{Deserialize, Serialize}; //! @@ -171,9 +172,8 @@ pub use executor::AsyncStdExecutor; pub use executor::Executor; #[cfg(feature = "tokio-runtime")] pub use executor::TokioExecutor; -pub use message::proto::command_subscribe::SubType; pub use message::{ - proto::{self, CommandSendReceipt}, + proto::{self, command_subscribe::SubType, CommandSendReceipt}, Payload, }; pub use producer::{MultiTopicProducer, Producer, ProducerOptions}; @@ -193,24 +193,25 @@ mod service_discovery; #[cfg(test)] mod tests { + use std::{ + collections::BTreeSet, + time::{Duration, Instant}, + }; + use futures::{future::try_join_all, StreamExt}; use log::{LevelFilter, Metadata, Record}; - use std::collections::BTreeSet; - use std::time::{Duration, Instant}; - #[cfg(feature = "tokio-runtime")] use tokio::time::timeout; + use super::*; #[cfg(feature = "tokio-runtime")] use crate::executor::TokioExecutor; - - use crate::client::SerializeMessage; - use crate::consumer::{InitialPosition, Message}; - use crate::message::proto::command_subscribe::SubType; - use crate::message::Payload; - use crate::Error as PulsarError; - - use super::*; + use crate::{ + client::SerializeMessage, + consumer::{InitialPosition, Message}, + message::{proto::command_subscribe::SubType, Payload}, + Error as PulsarError, + }; #[derive(Debug, Serialize, Deserialize)] struct TestData { diff --git a/src/message.rs b/src/message.rs index afb281c..3c9fdf8 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,6 +1,6 @@ //! low level structures used to send and process raw messages -use crate::connection::RequestKey; -use crate::error::ConnectionError; +use std::{convert::TryFrom, io::Cursor}; + use bytes::{Buf, BufMut, BytesMut}; use nom::{ bytes::streaming::take, @@ -9,15 +9,12 @@ use nom::{ IResult, }; use prost::{self, Message as ImplProtobuf}; -use std::convert::TryFrom; -use std::io::Cursor; - -const CRC_CASTAGNOLI: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISCSI); - -pub use self::proto::BaseCommand; -pub use self::proto::MessageMetadata as Metadata; use self::proto::*; +pub use self::proto::{BaseCommand, MessageMetadata as Metadata}; +use crate::{connection::RequestKey, error::ConnectionError}; + +const CRC_CASTAGNOLI: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISCSI); /// Pulsar binary message /// @@ -278,7 +275,8 @@ impl tokio_util::codec::Decoder for Codec { trace!("Decoder received {} bytes", src.len()); if src.len() >= 4 { let mut buf = Cursor::new(src); - // `messageSize` refers only to _remaining_ message size, so we add 4 to get total frame size + // `messageSize` refers only to _remaining_ message size, so we add 4 to get total frame + // size let message_size = buf.get_u32() as usize + 4; let src = buf.into_inner(); if src.len() >= message_size { @@ -384,7 +382,8 @@ impl asynchronous_codec::Decoder for Codec { trace!("Decoder received {} bytes", src.len()); if src.len() >= 4 { let mut buf = Cursor::new(src); - // `messageSize` refers only to _remaining_ message size, so we add 4 to get total frame size + // `messageSize` refers only to _remaining_ message size, so we add 4 to get total frame + // size let message_size = buf.get_u32() as usize + 4; let src = buf.into_inner(); if src.len() >= message_size { @@ -492,7 +491,7 @@ fn payload_frame(i: &[u8]) -> IResult<&[u8], PayloadFrame> { } pub(crate) struct BatchedMessage { - pub metadata: proto::SingleMessageMetadata, + pub metadata: SingleMessageMetadata, pub payload: Vec, } @@ -500,7 +499,7 @@ pub(crate) struct BatchedMessage { fn batched_message(i: &[u8]) -> IResult<&[u8], BatchedMessage> { let (i, metadata_size) = be_u32(i)?; let (i, metadata) = verify( - map_res(take(metadata_size), proto::SingleMessageMetadata::decode), + map_res(take(metadata_size), SingleMessageMetadata::decode), // payload_size is defined as i32 in protobuf |metadata| metadata.payload_size >= 0, )(i)?; @@ -538,10 +537,11 @@ impl BatchedMessage { } pub mod proto { + #![allow(clippy::all)] include!(concat!(env!("OUT_DIR"), "/pulsar.proto.rs")); //trait implementations used in Consumer::unacked_messages - impl std::cmp::Eq for MessageIdData {} + impl Eq for MessageIdData {} #[allow(clippy::derive_hash_xor_eq)] impl std::hash::Hash for MessageIdData { @@ -557,46 +557,46 @@ pub mod proto { } } -impl TryFrom for proto::base_command::Type { +impl TryFrom for base_command::Type { type Error = (); #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn try_from(value: i32) -> Result { match value { - 2 => Ok(proto::base_command::Type::Connect), - 3 => Ok(proto::base_command::Type::Connected), - 4 => Ok(proto::base_command::Type::Subscribe), - 5 => Ok(proto::base_command::Type::Producer), - 6 => Ok(proto::base_command::Type::Send), - 7 => Ok(proto::base_command::Type::SendReceipt), - 8 => Ok(proto::base_command::Type::SendError), - 9 => Ok(proto::base_command::Type::Message), - 10 => Ok(proto::base_command::Type::Ack), - 11 => Ok(proto::base_command::Type::Flow), - 12 => Ok(proto::base_command::Type::Unsubscribe), - 13 => Ok(proto::base_command::Type::Success), - 14 => Ok(proto::base_command::Type::Error), - 15 => Ok(proto::base_command::Type::CloseProducer), - 16 => Ok(proto::base_command::Type::CloseConsumer), - 17 => Ok(proto::base_command::Type::ProducerSuccess), - 18 => Ok(proto::base_command::Type::Ping), - 19 => Ok(proto::base_command::Type::Pong), - 20 => Ok(proto::base_command::Type::RedeliverUnacknowledgedMessages), - 21 => Ok(proto::base_command::Type::PartitionedMetadata), - 22 => Ok(proto::base_command::Type::PartitionedMetadataResponse), - 23 => Ok(proto::base_command::Type::Lookup), - 24 => Ok(proto::base_command::Type::LookupResponse), - 25 => Ok(proto::base_command::Type::ConsumerStats), - 26 => Ok(proto::base_command::Type::ConsumerStatsResponse), - 27 => Ok(proto::base_command::Type::ReachedEndOfTopic), - 28 => Ok(proto::base_command::Type::Seek), - 29 => Ok(proto::base_command::Type::GetLastMessageId), - 30 => Ok(proto::base_command::Type::GetLastMessageIdResponse), - 31 => Ok(proto::base_command::Type::ActiveConsumerChange), - 32 => Ok(proto::base_command::Type::GetTopicsOfNamespace), - 33 => Ok(proto::base_command::Type::GetTopicsOfNamespaceResponse), - 34 => Ok(proto::base_command::Type::GetSchema), - 35 => Ok(proto::base_command::Type::GetSchemaResponse), + 2 => Ok(base_command::Type::Connect), + 3 => Ok(base_command::Type::Connected), + 4 => Ok(base_command::Type::Subscribe), + 5 => Ok(base_command::Type::Producer), + 6 => Ok(base_command::Type::Send), + 7 => Ok(base_command::Type::SendReceipt), + 8 => Ok(base_command::Type::SendError), + 9 => Ok(base_command::Type::Message), + 10 => Ok(base_command::Type::Ack), + 11 => Ok(base_command::Type::Flow), + 12 => Ok(base_command::Type::Unsubscribe), + 13 => Ok(base_command::Type::Success), + 14 => Ok(base_command::Type::Error), + 15 => Ok(base_command::Type::CloseProducer), + 16 => Ok(base_command::Type::CloseConsumer), + 17 => Ok(base_command::Type::ProducerSuccess), + 18 => Ok(base_command::Type::Ping), + 19 => Ok(base_command::Type::Pong), + 20 => Ok(base_command::Type::RedeliverUnacknowledgedMessages), + 21 => Ok(base_command::Type::PartitionedMetadata), + 22 => Ok(base_command::Type::PartitionedMetadataResponse), + 23 => Ok(base_command::Type::Lookup), + 24 => Ok(base_command::Type::LookupResponse), + 25 => Ok(base_command::Type::ConsumerStats), + 26 => Ok(base_command::Type::ConsumerStatsResponse), + 27 => Ok(base_command::Type::ReachedEndOfTopic), + 28 => Ok(base_command::Type::Seek), + 29 => Ok(base_command::Type::GetLastMessageId), + 30 => Ok(base_command::Type::GetLastMessageIdResponse), + 31 => Ok(base_command::Type::ActiveConsumerChange), + 32 => Ok(base_command::Type::GetTopicsOfNamespace), + 33 => Ok(base_command::Type::GetTopicsOfNamespaceResponse), + 34 => Ok(base_command::Type::GetSchema), + 35 => Ok(base_command::Type::GetSchemaResponse), _ => Err(()), } } @@ -618,11 +618,13 @@ impl From for ConnectionError { #[cfg(test)] mod tests { - use crate::message::Codec; - use bytes::BytesMut; use std::convert::TryFrom; + + use bytes::BytesMut; use tokio_util::codec::{Decoder, Encoder}; + use crate::message::Codec; + #[test] fn parse_simple_command() { let input: &[u8] = &[ @@ -676,7 +678,7 @@ mod tests { #[test] fn base_command_type_parsing() { - use super::proto::base_command::Type; + use super::base_command::Type; let mut successes = 0; for i in 0..40 { if let Ok(type_) = Type::try_from(i) { diff --git a/src/producer.rs b/src/producer.rs index 22c557a..e649ef5 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -1,21 +1,32 @@ //! Message publication -use futures::{channel::oneshot, future::try_join_all, lock::Mutex}; -use std::collections::{BTreeMap, HashMap, VecDeque}; -use std::io::Write; -use std::pin::Pin; -use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use crate::client::SerializeMessage; -use crate::connection::{Connection, SerialId}; -use crate::error::{ConnectionError, ProducerError}; -use crate::executor::Executor; -use crate::message::proto::{self, CommandSendReceipt, EncryptionKeys, Schema}; -use crate::message::BatchedMessage; -use crate::{Error, Pulsar}; -use futures::task::{Context, Poll}; -use futures::Future; -use crate::compression::{Compression}; +use std::{ + collections::{BTreeMap, HashMap, VecDeque}, + io::Write, + pin::Pin, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use futures::{ + channel::oneshot, + future::try_join_all, + lock::Mutex, + task::{Context, Poll}, + Future, +}; + +use crate::{ + client::SerializeMessage, + compression::Compression, + connection::{Connection, SerialId}, + error::{ConnectionError, ProducerError}, + executor::Executor, + message::{ + proto::{self, CommandSendReceipt, EncryptionKeys, Schema}, + BatchedMessage, + }, + Error, Pulsar, +}; type ProducerId = u64; type ProducerName = String; @@ -129,7 +140,8 @@ pub struct ProducerOptions { pub batch_size: Option, /// algorithm used to compress the messages pub compression: Option, - /// producer access mode: shared = 0, exclusive = 1, waitforexclusive =2, exclusivewithoutfencing =3 + /// producer access mode: shared = 0, exclusive = 1, waitforexclusive =2, + /// exclusivewithoutfencing =3 pub access_mode: Option, } @@ -142,9 +154,7 @@ pub struct ProducerOptions { /// # let topic = "topic"; /// # let message = "data".to_owned(); /// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?; -/// let mut producer = pulsar.producer() -/// .with_name("name") -/// .build_multi_topic(); +/// let mut producer = pulsar.producer().with_name("name").build_multi_topic(); /// let send_1 = producer.send(topic, &message).await?; /// let send_2 = producer.send(topic, &message).await?; /// send_1.await?; @@ -226,8 +236,9 @@ impl MultiTopicProducer { for msg in messages { sends.push(self.send(&topic, msg).await); } - // TODO determine whether to keep this approach or go with the partial send, but more mem friendly lazy approach. - // serialize all messages before sending to avoid a partial send + // TODO determine whether to keep this approach or go with the partial send, but more mem + // friendly lazy approach. serialize all messages before sending to avoid a partial + // send if sends.iter().all(|s| s.is_ok()) { Ok(sends.into_iter().map(|s| s.unwrap()).collect()) } else { @@ -721,8 +732,7 @@ impl TopicProducer { } #[cfg(feature = "flate2")] Some(Compression::Zlib(compression)) => { - let mut e = - flate2::write::ZlibEncoder::new(Vec::new(), compression.level); + let mut e = flate2::write::ZlibEncoder::new(Vec::new(), compression.level); e.write_all(&message.payload[..]) .map_err(ProducerError::Io)?; let compressed_payload = e.finish().map_err(ProducerError::Io)?; @@ -734,8 +744,8 @@ impl TopicProducer { } #[cfg(feature = "zstd")] Some(Compression::Zstd(compression)) => { - let compressed_payload = - zstd::encode_all(&message.payload[..], compression.level).map_err(ProducerError::Io)?; + let compressed_payload = zstd::encode_all(&message.payload[..], compression.level) + .map_err(ProducerError::Io)?; message.uncompressed_size = Some(message.payload.len() as u32); message.payload = compressed_payload; message.compression = Some(proto::CompressionType::Zstd.into()); diff --git a/src/reader.rs b/src/reader.rs index ffcb77c..7db0e20 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,15 +1,21 @@ -use crate::client::DeserializeMessage; -use crate::consumer::{ConsumerOptions, DeadLetterPolicy, EngineMessage, Message, TopicConsumer}; -use crate::error::Error; -use crate::executor::Executor; -use crate::message::proto::{command_subscribe::SubType, MessageIdData}; -use chrono::{DateTime, Utc}; -use futures::channel::mpsc::SendError; -use futures::task::{Context, Poll}; -use futures::{Future, SinkExt, Stream}; use std::pin::Pin; + +use chrono::{DateTime, Utc}; +use futures::{ + channel::mpsc::SendError, + task::{Context, Poll}, + Future, SinkExt, Stream, +}; use url::Url; +use crate::{ + client::DeserializeMessage, + consumer::{ConsumerOptions, DeadLetterPolicy, EngineMessage, Message, TopicConsumer}, + error::Error, + executor::Executor, + message::proto::{command_subscribe::SubType, MessageIdData}, +}; + /// A client that acknowledges messages systematically pub struct Reader { pub(crate) consumer: TopicConsumer, diff --git a/src/service_discovery.rs b/src/service_discovery.rs index ae9502c..3bd4ad3 100644 --- a/src/service_discovery.rs +++ b/src/service_discovery.rs @@ -1,14 +1,18 @@ -use crate::connection_manager::{BrokerAddress, ConnectionManager}; -use crate::error::{ConnectionError, ServiceDiscoveryError}; -use crate::executor::Executor; -use crate::message::proto::{ - command_lookup_topic_response, command_partitioned_topic_metadata_response, - CommandLookupTopicResponse, -}; -use futures::{future::try_join_all, FutureExt}; use std::sync::Arc; + +use futures::{future::try_join_all, FutureExt}; use url::Url; +use crate::{ + connection_manager::{BrokerAddress, ConnectionManager}, + error::{ConnectionError, ServiceDiscoveryError}, + executor::Executor, + message::proto::{ + command_lookup_topic_response, command_partitioned_topic_metadata_response, + CommandLookupTopicResponse, + }, +}; + /// Look up broker addresses for topics and partitioned topics /// /// The ServiceDiscovery object provides a single interface to start