Skip to content

Commit

Permalink
Merge branch 'main' into fix/pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
artursouza authored Nov 19, 2024
2 parents 2a7780f + 1a6a75a commit 7efaf44
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 13 deletions.
8 changes: 8 additions & 0 deletions bindings/azure/eventhubs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ builtinAuthenticationProfiles:
default: "false"
example: "false"
description: |
Allow management of the Event Hub namespace and storage account.
- name: enableInOrderMessageDelivery
type: bool
required: false
default: "false"
example: "false"
description: |
Enable in order processing of messages within a partition.
- name: resourceGroupName
type: string
required: false
Expand Down
6 changes: 5 additions & 1 deletion common/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,11 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, partition

if len(events) != 0 {
// Handle received message
go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)
if aeh.metadata.EnableInOrderMessageDelivery {
aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)
} else {
go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)
}

// Checkpointing disabled for CheckPointFrequencyPerPartition == 0
if config.CheckPointFrequencyPerPartition > 0 {
Expand Down
12 changes: 12 additions & 0 deletions common/component/azure/eventhubs/eventhubs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ func TestParseEventHubsMetadata(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "one of connectionString or eventHubNamespace is required")
})

t.Run("test in order delivery", func(t *testing.T) {
metadata := map[string]string{
"enableInOrderMessageDelivery": "true",
"connectionString": "fake",
}

m, err := parseEventHubsMetadata(metadata, false, testLogger)

require.NoError(t, err)
require.True(t, m.EnableInOrderMessageDelivery)
})
}

func TestConstructConnectionStringFromTopic(t *testing.T) {
Expand Down
25 changes: 13 additions & 12 deletions common/component/azure/eventhubs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ import (
)

type AzureEventHubsMetadata struct {
ConnectionString string `json:"connectionString" mapstructure:"connectionString"`
EventHubNamespace string `json:"eventHubNamespace" mapstructure:"eventHubNamespace"`
ConsumerID string `json:"consumerID" mapstructure:"consumerID"`
StorageConnectionString string `json:"storageConnectionString" mapstructure:"storageConnectionString"`
StorageAccountName string `json:"storageAccountName" mapstructure:"storageAccountName"`
StorageAccountKey string `json:"storageAccountKey" mapstructure:"storageAccountKey"`
StorageContainerName string `json:"storageContainerName" mapstructure:"storageContainerName"`
EnableEntityManagement bool `json:"enableEntityManagement,string" mapstructure:"enableEntityManagement"`
MessageRetentionInDays int32 `json:"messageRetentionInDays,string" mapstructure:"messageRetentionInDays"`
PartitionCount int32 `json:"partitionCount,string" mapstructure:"partitionCount"`
SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"`
ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"`
ConnectionString string `json:"connectionString" mapstructure:"connectionString"`
EventHubNamespace string `json:"eventHubNamespace" mapstructure:"eventHubNamespace"`
ConsumerID string `json:"consumerID" mapstructure:"consumerID"`
StorageConnectionString string `json:"storageConnectionString" mapstructure:"storageConnectionString"`
StorageAccountName string `json:"storageAccountName" mapstructure:"storageAccountName"`
StorageAccountKey string `json:"storageAccountKey" mapstructure:"storageAccountKey"`
StorageContainerName string `json:"storageContainerName" mapstructure:"storageContainerName"`
EnableEntityManagement bool `json:"enableEntityManagement,string" mapstructure:"enableEntityManagement"`
MessageRetentionInDays int32 `json:"messageRetentionInDays,string" mapstructure:"messageRetentionInDays"`
PartitionCount int32 `json:"partitionCount,string" mapstructure:"partitionCount"`
SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"`
ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"`
EnableInOrderMessageDelivery bool `json:"enableInOrderMessageDelivery,string" mapstructure:"enableInOrderMessageDelivery"`

// Binding only
EventHub string `json:"eventHub" mapstructure:"eventHub" mdonly:"bindings"`
Expand Down
7 changes: 7 additions & 0 deletions pubsub/azure/eventhubs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ builtinAuthenticationProfiles:
example: "false"
description: |
Allow management of the Event Hub namespace and storage account.
- name: enableInOrderMessageDelivery
type: bool
required: false
default: "false"
example: "false"
description: |
Enable in order processing of messages within a partition.
# The following four properties are needed only if enableEntityManagement is set to true
- name: resourceGroupName
Expand Down

0 comments on commit 7efaf44

Please sign in to comment.