From a454ad2796c35c386c112ddf00f7a56b6c70c3ff Mon Sep 17 00:00:00 2001 From: Andrei Olar Date: Wed, 28 Feb 2024 17:24:39 +0200 Subject: [PATCH 1/3] feature: Allow configuring partition key from a MessageAttribute Add a configuration value for the source connector that enables the user to configure the name of a message attribute to use as the partitioning key instead of the message ID. Caveats: * Only String-valued message attributes are supported. * Message attributes must be enabled in the config => Kafka messages will have headers --- .../kafka/connect/sqs/SqsConnectorConfig.java | 7 +++ .../connect/sqs/SqsConnectorConfigKeys.java | 1 + .../connect/sqs/SqsSourceConnectorTask.java | 62 ++++++++++++++----- 3 files changed, 54 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java index 26e700a..e6124e3 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java @@ -18,6 +18,8 @@ abstract class SqsConnectorConfig extends AbstractConfig { private final List messageAttributesList; + private final String messageAttributePartitionKey; + public SqsConnectorConfig(ConfigDef configDef, Map originals) { super(configDef, originals); queueUrl = getString(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue()); @@ -28,6 +30,7 @@ public SqsConnectorConfig(ConfigDef configDef, Map originals) { List csMessageAttributesList = getList(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue()); messageAttributesList = messageAttributesEnabled ? csMessageAttributesList : new ArrayList<>(); + messageAttributePartitionKey = getString(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY.getValue()); } public String getQueueUrl() { @@ -53,4 +56,8 @@ public Boolean getMessageAttributesEnabled() { public List getMessageAttributesList() { return messageAttributesList; } + + public String getMessageAttributePartitionKey() { + return messageAttributePartitionKey; + } } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java index c68bfc6..0d3dbfe 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java @@ -28,6 +28,7 @@ public enum SqsConnectorConfigKeys { SQS_ENDPOINT_URL("sqs.endpoint.url"), SQS_MESSAGE_ATTRIBUTES_ENABLED("sqs.message.attributes.enabled"), SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST("sqs.message.attributes.include.list"), + SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY("sqs.message.attributes.partition.key"), // These are not part of the connector configuration proper, but just a convenient // place to define the constants. diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java index a89bb6a..7844ba0 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java @@ -20,6 +20,7 @@ import java.util.stream.Collectors ; import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.nordstrom.kafka.connect.utils.StringUtils; import org.apache.kafka.connect.data.Schema ; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.header.ConnectHeaders; @@ -63,6 +64,47 @@ public void start( Map props ) { log.info( "task.start.OK, sqs.queue.url={}, topics={}", config.getQueueUrl(), config.getTopics() ) ; } + private String getPartitionKey(Message message) { + String messageId = message.getMessageId(); + if (!config.getMessageAttributesEnabled()) { + return messageId; + } + String messageAttributePartitionKey = config.getMessageAttributePartitionKey(); + if (StringUtils.isBlank(messageAttributePartitionKey)) { + return messageId; + } + + // search for the String message attribute with the same name as the configured partition key + Map attributes = message.getMessageAttributes(); + for(String attributeKey: attributes.keySet()) { + if (!Objects.equals(attributeKey, messageAttributePartitionKey)) { + continue; + } + MessageAttributeValue attrValue = attributes.get(attributeKey); + if (!attrValue.getDataType().equals("String")) { + continue; + } + return attrValue.getStringValue(); + } + return messageId; + } + + private ConnectHeaders getConnectHeaders(Message message) { + ConnectHeaders headers = new ConnectHeaders(); + if (config.getMessageAttributesEnabled()) { + Map attributes = message.getMessageAttributes(); + // sqs api should return only the fields specified in the list + for(String attributeKey: attributes.keySet()) { + MessageAttributeValue attrValue = attributes.get(attributeKey); + if (attrValue.getDataType().equals("String")) { + SchemaAndValue schemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, attrValue.getStringValue()); + headers.add(attributeKey, schemaAndValue); + } + } + } + return headers; + } + /* * (non-Javadoc) * @@ -95,24 +137,12 @@ public List poll() throws InterruptedException { log.trace( ".poll:source-partition={}", sourcePartition ) ; log.trace( ".poll:source-offset={}", sourceOffset ) ; - final String body = message.getBody() ; - final String key = message.getMessageId() ; + final String body = message.getBody(); + final String key = getPartitionKey(message); final String topic = config.getTopics() ; + final ConnectHeaders headers = getConnectHeaders(message); - ConnectHeaders headers = new ConnectHeaders(); - if (config.getMessageAttributesEnabled()) { - Map attributes = message.getMessageAttributes(); - // sqs api should return only the fields specified in the list - for(String attributeKey: attributes.keySet()) { - MessageAttributeValue attrValue = attributes.get(attributeKey); - if (attrValue.getDataType().equals("String")) { - SchemaAndValue schemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, attrValue.getStringValue()); - headers.add(attributeKey, schemaAndValue); - } - } - } - - return new SourceRecord( sourcePartition, sourceOffset, topic, null, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, + return new SourceRecord(sourcePartition, sourceOffset, topic, null, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, body, null, headers) ; } ).collect( Collectors.toList() ) ; } From 32c6542838ba97a8f5c472077c4b0ed6a28ce7a2 Mon Sep 17 00:00:00 2001 From: Andrei Olar Date: Wed, 28 Feb 2024 17:30:55 +0200 Subject: [PATCH 2/3] Revert extract method --- .../connect/sqs/SqsSourceConnectorTask.java | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java index 7844ba0..c2a44a5 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java @@ -89,22 +89,6 @@ private String getPartitionKey(Message message) { return messageId; } - private ConnectHeaders getConnectHeaders(Message message) { - ConnectHeaders headers = new ConnectHeaders(); - if (config.getMessageAttributesEnabled()) { - Map attributes = message.getMessageAttributes(); - // sqs api should return only the fields specified in the list - for(String attributeKey: attributes.keySet()) { - MessageAttributeValue attrValue = attributes.get(attributeKey); - if (attrValue.getDataType().equals("String")) { - SchemaAndValue schemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, attrValue.getStringValue()); - headers.add(attributeKey, schemaAndValue); - } - } - } - return headers; - } - /* * (non-Javadoc) * @@ -140,7 +124,19 @@ public List poll() throws InterruptedException { final String body = message.getBody(); final String key = getPartitionKey(message); final String topic = config.getTopics() ; - final ConnectHeaders headers = getConnectHeaders(message); + + final ConnectHeaders headers = new ConnectHeaders(); + if (config.getMessageAttributesEnabled()) { + Map attributes = message.getMessageAttributes(); + // sqs api should return only the fields specified in the list + for(String attributeKey: attributes.keySet()) { + MessageAttributeValue attrValue = attributes.get(attributeKey); + if (attrValue.getDataType().equals("String")) { + SchemaAndValue schemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, attrValue.getStringValue()); + headers.add(attributeKey, schemaAndValue); + } + } + } return new SourceRecord(sourcePartition, sourceOffset, topic, null, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, body, null, headers) ; From ab5d113a12ace38d1ffadf320cd60f99af95837c Mon Sep 17 00:00:00 2001 From: Andrei Olar Date: Thu, 29 Feb 2024 16:59:19 +0200 Subject: [PATCH 3/3] Update README.md with new option description Describe the sqs.message.attributes.partition.key optional config setting. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 2d2ac05..44300fb 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ Optional properties: * `sqs.endpoint.url`: Override value for the AWS region specific endpoint. * `sqs.message.attributes.enabled`: If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false. * `sqs.message.attributes.include.list`: The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string. +* `sqs.message.attributes.partition.key`: The name of a single AWS SQS MessageAttribute to use as the partition key. If this is not specified, default to the SQS message ID as the partition key. ### Sample Configuration ```json