Skip to content

Commit

Permalink
Enable eventhubs binding to read all message properties (#3615)
Browse files Browse the repository at this point in the history
Signed-off-by: yaron2 <[email protected]>
  • Loading branch information
yaron2 authored Nov 26, 2024
1 parent 913ba4c commit 85cbbf1
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 1 deletion.
9 changes: 9 additions & 0 deletions bindings/azure/eventhubs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,12 @@ metadata:
description: |
Storage container name.
example: '"myeventhubstoragecontainer"'
- name: getAllMessageProperties
required: false
default: "false"
example: "false"
binding:
input: true
output: false
description: |
When set to true, will retrieve all message properties and include them in the returned event metadata
7 changes: 6 additions & 1 deletion common/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func (aeh *AzureEventHubs) EventHubName() string {
return aeh.metadata.hubName
}

// GetAllMessageProperties returns a boolean to indicate whether to return all properties for an event hubs message.
func (aeh *AzureEventHubs) GetAllMessageProperties() bool {
return aeh.metadata.GetAllMessageProperties
}

// Publish a batch of messages.
func (aeh *AzureEventHubs) Publish(ctx context.Context, topic string, messages []*azeventhubs.EventData, batchOpts *azeventhubs.EventDataBatchOptions) error {
// Get the producer client
Expand Down Expand Up @@ -165,7 +170,7 @@ func (aeh *AzureEventHubs) GetBindingsHandlerFunc(topic string, getAllProperties
return nil, fmt.Errorf("expected 1 message, got %d", len(messages))
}

bindingsMsg, err := NewBindingsReadResponseFromEventData(messages[0], topic, getAllProperties)
bindingsMsg, err := NewBindingsReadResponseFromEventData(messages[0], topic, aeh.GetAllMessageProperties())
if err != nil {
return nil, fmt.Errorf("failed to get bindings read response from azure eventhubs message: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions common/component/azure/eventhubs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type AzureEventHubsMetadata struct {
SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"`
ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"`
EnableInOrderMessageDelivery bool `json:"enableInOrderMessageDelivery,string" mapstructure:"enableInOrderMessageDelivery"`
GetAllMessageProperties bool `json:"getAllMessageProperties,string" mapstructure:"getAllMessageProperties"`

// Binding only
EventHub string `json:"eventHub" mapstructure:"eventHub" mdonly:"bindings"`
Expand Down
7 changes: 7 additions & 0 deletions pubsub/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func (aeh *AzureEventHubs) Subscribe(ctx context.Context, req pubsub.SubscribeRe

// Check if requireAllProperties is set and is truthy
getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"])
if !getAllProperties {
getAllProperties = aeh.GetAllMessageProperties()
}

checkPointFrequencyPerPartition := commonutils.GetIntValFromString(req.Metadata["checkPointFrequencyPerPartition"], impl.DefaultCheckpointFrequencyPerPartition)

pubsubHandler := aeh.GetPubSubHandlerFunc(topic, getAllProperties, handler)
Expand All @@ -155,6 +159,9 @@ func (aeh *AzureEventHubs) BulkSubscribe(ctx context.Context, req pubsub.Subscri

// Check if requireAllProperties is set and is truthy
getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"])
if !getAllProperties {
getAllProperties = aeh.GetAllMessageProperties()
}
checkPointFrequencyPerPartition := commonutils.GetIntValFromString(req.Metadata["checkPointFrequencyPerPartition"], impl.DefaultCheckpointFrequencyPerPartition)
maxBulkSubCount := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxMessagesCount, impl.DefaultMaxBulkSubCount)
maxBulkSubAwaitDurationMs := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxAwaitDurationMs, impl.DefaultMaxBulkSubAwaitDurationMs)
Expand Down
9 changes: 9 additions & 0 deletions pubsub/azure/eventhubs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,12 @@ metadata:
description: |
The name of the Event Hubs Consumer Group to listen on.
example: '"group1"'
- name: getAllMessageProperties
required: false
default: "false"
example: "false"
binding:
input: true
output: false
description: |
When set to true, will retrieve all message properties and include them in the returned event metadata

0 comments on commit 85cbbf1

Please sign in to comment.