diff --git a/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/RecordAggregatorTest.java b/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/RecordAggregatorTest.java index fe814a7..a70593e 100644 --- a/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/RecordAggregatorTest.java +++ b/java/KinesisAggregator/src/test/java/com/amazonaws/kinesis/agg/RecordAggregatorTest.java @@ -18,37 +18,55 @@ package com.amazonaws.kinesis.agg; import java.nio.charset.StandardCharsets; -import com.amazonaws.kinesis.agg.RecordAggregator; -import com.amazonaws.kinesis.agg.AggRecord; +import java.util.Base64; +import java.util.Random; + import org.junit.Assert; import org.junit.Test; -public class RecordAggregatorTest -{ - protected final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; - - @Test - public void testSingleUserRecord() - { - RecordAggregator aggregator = new RecordAggregator(); - - Assert.assertEquals(0, aggregator.getNumUserRecords()); - - try - { - aggregator.addUserRecord("partition_key", ALPHABET.getBytes(StandardCharsets.UTF_8)); - } - catch (Exception e) - { - e.printStackTrace(); - Assert.fail("Encountered unexpected exception: " + e.getMessage()); - } - Assert.assertEquals(1, aggregator.getNumUserRecords()); - - AggRecord record = aggregator.clearAndGet(); - Assert.assertNotNull(record); - Assert.assertEquals(0, aggregator.getNumUserRecords()); - - Assert.assertEquals(1, record.getNumUserRecords()); - } +public class RecordAggregatorTest { + protected final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; + + @Test + public void testSingleUserRecord() { + RecordAggregator aggregator = new RecordAggregator(); + + Assert.assertEquals(0, aggregator.getNumUserRecords()); + + try { + aggregator.addUserRecord("partition_key", ALPHABET.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Encountered unexpected exception: " + e.getMessage()); + } + Assert.assertEquals(1, aggregator.getNumUserRecords()); + + AggRecord record = aggregator.clearAndGet(); + Assert.assertNotNull(record); + Assert.assertEquals(0, aggregator.getNumUserRecords()); + + Assert.assertEquals(1, record.getNumUserRecords()); + } + + @Test + public void testMultiRecord() throws Exception { + RecordAggregator aggregator = new RecordAggregator(); + Random rand = new Random(); + String key = "abc"; + int c = 100; + String encodedTargetValue = "84mawgoDYWJjEicxOTE0MTU2NTgzNDQxNTg3NjYxNjgwMzE0NzMyNzc5MjI4MDM1NzAaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2Jh0I8WvwEDJiGD4YsiKIfUOw=="; + Assert.assertEquals(0, aggregator.getNumUserRecords()); + String flip = new StringBuilder(ALPHABET).reverse().toString(); + + // add 100 random records all with the same partition key + for (int i = 0; i < c; i++) { + String pattern = i % 2 == 0 ? ALPHABET : flip; + aggregator.addUserRecord(key, pattern.getBytes(StandardCharsets.UTF_8)); + } + + AggRecord r = aggregator.clearAndGet(); + Assert.assertEquals(c, r.getNumUserRecords()); + String encodedString = Base64.getEncoder().encodeToString(r.toRecordBytes()); + Assert.assertEquals(encodedTargetValue, encodedString); + } } diff --git a/java/KinesisDeaggregator/README.md b/java/KinesisDeaggregator/README.md index 6bcff6d..082cda8 100644 --- a/java/KinesisDeaggregator/README.md +++ b/java/KinesisDeaggregator/README.md @@ -1,7 +1,9 @@ -# Kinesis Java Record Deaggregator +# Kinesis Java Record Deaggregator for AWS V1 SDK's This library provides a set of convenience functions to perform in-memory record deaggregation that is compatible with the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md) used by the Kinesis Producer Library (KPL) and the KinesisAggregator module. This module can be used in any Java-based application that receives aggregated Kinesis records, including applications running on AWS Lambda. +This module is only compatible with version 1.x AWS SDK's. + ## Record Deaggregation The `RecordDeaggregator` is the class that does the work of extracting individual Kinesis user records from aggregated Kinesis Records received by AWS Lambda or directly through the Kinesis Java SDK. This class provide multiple ways to deaggregate records: stream-based, list-based, batch-based and single record. diff --git a/java/KinesisDeaggregator/pom.xml b/java/KinesisDeaggregator/pom.xml index c0c4c7c..3673de8 100644 --- a/java/KinesisDeaggregator/pom.xml +++ b/java/KinesisDeaggregator/pom.xml @@ -105,6 +105,14 @@ + + maven-assembly-plugin + + + jar-with-dependencies + + + diff --git a/java/KinesisDeaggregatorV2/README.md b/java/KinesisDeaggregatorV2/README.md new file mode 100644 index 0000000..100eb67 --- /dev/null +++ b/java/KinesisDeaggregatorV2/README.md @@ -0,0 +1,219 @@ +# Kinesis Java Record Deaggregator for AWS V2 SDK's + +This library provides a set of convenience functions to perform in-memory record deaggregation that is compatible with the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md) used by the Kinesis Producer Library (KPL) and the KinesisAggregator module. This module can be used in any Java-based application that receives aggregated Kinesis records, including applications running on AWS Lambda. + +This module is compatible with the V2 AWS SDK's. + +## Record Deaggregation + +The `RecordDeaggregator` is the class that does the work of extracting individual Kinesis user records from aggregated Kinesis Records received by AWS Lambda or directly through the Kinesis Java SDK. This class provide multiple ways to deaggregate records: stream-based, list-based, batch-based and single record. + +### Creating a Deaggregator + +There are two supported base classes that can be used for Deaggregation, `com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord` and `software.amazon.awssdk.services.kinesis.model.Record`. These support Lambda based access, and Kinesis V2 SDK access respectively. Use of any other base class will throw an `InvalidArgumentsException`. + +This project uses Java Generics to handle these different types correctly. To create a Lambda compliant Deaggregator, use: + +``` +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; +... +RecordDeaggregator deaggregator = new RecordDeaggregator<>(); +``` + +and for the Kinesis SDK: + +``` +import software.amazon.awssdk.services.kinesis.model.Record; +... +RecordDeaggregator deaggregator = new RecordDeaggregator<>(); +``` + +### Stream-based Deaggregation + +The following examples demonstrate functions to create a new instance of the `RecordDeaggregator` class and then provide it code to run on each extracted UserRecord. For example, using Java 8 Streams: + +``` +deaggregator.stream( + event.getRecords().stream(), + userRecord -> { + // Your User Record Processing Code Here! + logger.log(String.format("Processing UserRecord %s (%s:%s)", + userRecord.partitionKey(), + userRecord.sequenceNumber(), + userRecord.subSequenceNumber())); + } +); +``` + +In this invocation, we are extracting the KinesisEventRecords from the Event provided by AWS Lambda, and converting them to a Stream. We then provide a lambda function which iterates over the extracted user records. You should provide your own application-specific logic in place of the provided `logger.log()` call. + +### List-based Deaggregation + +You can also achieve the same functionality using Lists rather than Java Streams via the `RecordDeaggregator.KinesisUserRecordProcessor` interface: + +``` +try { + // process the user records with an anonymous record processor + // instance + deaggregator.processRecords(event.getRecords(), + new RecordDeaggregator.KinesisUserRecordProcessor() { + public Void process(List userRecords) { + for (KinesisClientRecord userRecord : userRecords) { + // Your User Record Processing Code Here! + logger.log(String.format( + "Processing UserRecord %s (%s:%s)", + userRecord.partitionKey(), + userRecord.sequenceNumber(), + userRecord.subSequenceNumber())); + } + + return null; + } + }); +} catch (Exception e) { + logger.log(e.getMessage()); +} +``` + +As with the previous example, you should provide your own application-specific logic in place of the provided `logger.log()` call. + +### Batch-based Deaggregation + +For those whole prefer simple method call and response mechanisms, the `RecordDeaggregator` provides a `deaggregate` method that takes in a list of aggregated Kinesis records and deaggregates them synchronously in bulk. For example: + +``` +try { + List userRecords = deaggregator.deaggregate(event.getRecords()); + for (KinesisClientRecord userRecord : userRecords) { + // Your User Record Processing Code Here! + logger.log(String.format("Processing KinesisClientRecord %s (%s:%s)", + userRecord.partitionKey(), + userRecord.sequenceNumber(), + userRecord.subSequenceNumber())); + } +} catch (Exception e) { + logger.log(e.getMessage()); +} +``` + +As with the previous example, you should provide your own application-specific logic in place of the provided `logger.log()` call. + +### Single Record Deaggregation + +In some cases, it can also be beneficial to be able to deaggregate a single Kinesis aggregated record at a time. The `RecordDeaggregator` provides a single static `deaggregate` method that takes in a single aggregated Kinesis record, deaggregates it and returns one or more Kinesis user records as a result. For example: + +``` +KinesisEventRecord singleRecord = ...; +try { + List userRecords = deaggregator.deaggregate(singleRecord); + for (KinesisClientRecord userRecord : userRecords) { + // Your User Record Processing Code Here! + logger.log(String.format("Processing UserRecord %s (%s:%s)", + userRecord.partitionKey(), + userRecord.pequenceNumber(), + userRecord.subSequenceNumber())); + } +} catch (Exception e) { + logger.log(e.getMessage()); +} +``` + +As with the previous example, you should provide your own application-specific logic in place of the provided `logger.log()` call. + +### Handling Non-Aggregated Records + +The record deaggregation methods in `RecordDeaggregator` can handle both records in the standard Kinesis aggregated record format as well as Kinesis records in arbitrary user-defined formats. If you pass records to the `RecordDeaggregator` that follow the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md), they will be deaggregated into one or more Kinesis user records per the encoding rules. If you pass records to the `RecordDeaggregator` that are not actually aggregated records, they will be returned unchanged as Kinesis user records. You may also mix aggregated and non-aggregated records in the same deaggregation call. + +## Sample Code + +This project includes a set of sample code to help you create a Lambda function that leverages deaggregation. Both of the below contents are provided in the `src/sample/java` folder. + +### EchoHandler.java + +``` +package com.amazonaws.kinesis.deagg; + +import java.util.List; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.LambdaLogger; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; + +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +public class EchoHandler implements RequestHandler { + + @Override + public Void handleRequest(KinesisEvent event, Context context) { + LambdaLogger logger = context.getLogger(); + + // extract the records from the event + List records = event.getRecords(); + + logger.log(String.format("Recieved %s Raw Records", records.size())); + + try { + // now deaggregate the message contents + List deaggregated = new RecordDeaggregator().deaggregate(records); + logger.log(String.format("Received %s Deaggregated User Records", deaggregated.size())); + + deaggregated.stream().forEachOrdered(rec -> { + logger.log(rec.partitionKey()); + }); + } catch (Exception e) { + logger.log(e.getMessage()); + } + + return null; + } +} +``` + +This class will output the size of the received batch from Kinesis, and then deaggregate the user records and output the count of those records, along with each Partition Key recieved. + +If you would like to test this functionality, create a new Java 8 Lambda function with the above code and required dependencies. You can then use the below TestEvent to show the functionality of the deaggregating Lambda: + +### SampleLambdaEvent.json + +``` +{ + "Records": [ + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "84mawgoDYWJjEicxOTE0MTU2NTgzNDQxNTg3NjYxNjgwMzE0NzMyNzc5MjI4MDM1NzAaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2Jh0I8WvwEDJiGD4YsiKIfUOw==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200961", + "approximateArrivalTimestamp": 1428537600 + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "us-east-1" + } + ] +} +``` + +This file contains an event that simulates an Aggregated Kinesis Event enclosing 100 User Records. The payload of this message is alternating lower case alpha in forward, then backward order. + +---- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0-javadoc.jar b/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0-javadoc.jar new file mode 100644 index 0000000..c508cd7 Binary files /dev/null and b/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0-javadoc.jar differ diff --git a/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0-sources.jar b/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0-sources.jar new file mode 100644 index 0000000..2fabb7a Binary files /dev/null and b/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0-sources.jar differ diff --git a/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0.jar b/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0.jar new file mode 100644 index 0000000..353d4ae Binary files /dev/null and b/java/KinesisDeaggregatorV2/dist/amazon-kinesis-deaggregator-2.0.0.jar differ diff --git a/java/KinesisDeaggregatorV2/license/apache-2.0/header.txt b/java/KinesisDeaggregatorV2/license/apache-2.0/header.txt new file mode 100644 index 0000000..3217b3e --- /dev/null +++ b/java/KinesisDeaggregatorV2/license/apache-2.0/header.txt @@ -0,0 +1,15 @@ +Kinesis Aggregation/Deaggregation Libraries for Java + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/java/KinesisDeaggregatorV2/pom.xml b/java/KinesisDeaggregatorV2/pom.xml new file mode 100644 index 0000000..338ca30 --- /dev/null +++ b/java/KinesisDeaggregatorV2/pom.xml @@ -0,0 +1,163 @@ + + 4.0.0 + + amazon-kinesis-deaggregator + A library for performing in-memory deaggregation of Kinesis aggregated stream records. + + com.amazonaws + amazon-kinesis-deaggregator + 2.0.0 + + jar + + + UTF-8 + + + https://aws.amazon.com/kinesis + + scm:git:git://github.com/awslabs/kinesis-aggregation.git + https://github.com/awslabs/kinesis-aggregation + + + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + + amazonwebservices + Amazon Web Services + https://aws.amazon.com + + developer + + + + + + clean compile + src/main/java + + + org.apache.maven.plugins + maven-compiler-plugin + 3.7.0 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.0.0 + + + attach-javadocs + + jar + + + + + public + true + false + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.8 + + + copy + + + + + + package + + run + + + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + + + com.amazonaws + amazon-kinesis-aggregator + 1.1.0 + test + + + software.amazon.kinesis + amazon-kinesis-client + 2.2.11 + + + com.fasterxml.jackson.core + jackson-databind + 2.8.11.3 + + + com.amazonaws + aws-lambda-java-events + 3.1.0 + + + org.apache.commons + commons-lang3 + 3.10 + + + com.amazonaws. + amazon-kinesis-aggregator + 1.1.0 + test + + + junit + junit + 4.13 + test + + + com.amazonaws + aws-lambda-java-core + 1.2.1 + + + diff --git a/java/KinesisDeaggregatorV2/src/main/java/com/amazonaws/kinesis/deagg/RecordDeaggregator.java b/java/KinesisDeaggregatorV2/src/main/java/com/amazonaws/kinesis/deagg/RecordDeaggregator.java new file mode 100644 index 0000000..3e0fb55 --- /dev/null +++ b/java/KinesisDeaggregatorV2/src/main/java/com/amazonaws/kinesis/deagg/RecordDeaggregator.java @@ -0,0 +1,160 @@ +/** + * Kinesis Aggregation/Deaggregation Libraries for Java + * + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.kinesis.deagg; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.AggregatorUtil; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +/** + * A Kinesis deaggregator convenience class. This class contains a number of + * static methods that provide different interfaces for deaggregating user + * records from an existing aggregated Kinesis record. This class is oriented + * towards deaggregating Kinesis records as provided by AWS Lambda, or through + * the Kinesis SDK. Parameterise the instance with the required types + * (supporting + * com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord + * or com.amazonaws.services.kinesis.model.Record only) + * + * NOTE: Any non-aggregated records passed to any deaggregation methods will be + * returned unchanged. + * + */ +public class RecordDeaggregator { + /** + * Interface used by a calling method to call the process function + * + */ + public interface KinesisUserRecordProcessor { + public Void process(List userRecords); + } + + private Record convertOne(KinesisEventRecord record) { + KinesisEvent.Record r = record.getKinesis(); + Record out = Record.builder().partitionKey(r.getPartitionKey()).encryptionType(r.getEncryptionType()) + .approximateArrivalTimestamp(r.getApproximateArrivalTimestamp().toInstant()) + .sequenceNumber(r.getSequenceNumber()).data(SdkBytes.fromByteBuffer(r.getData())).build(); + + return out; + + } + + private List convertToKinesis(List inputRecords) { + List response = new ArrayList<>(); + + inputRecords.stream().forEachOrdered(record -> { + response.add(KinesisClientRecord.fromRecord(convertOne(record))); + }); + + return response; + + } + + @SuppressWarnings("unchecked") + private List convertType(List inputRecords) throws Exception { + List records = null; + + if (inputRecords.size() > 0 && inputRecords.get(0) instanceof KinesisEventRecord) { + records = convertToKinesis((List) inputRecords); + } else if (inputRecords.size() > 0 && inputRecords.get(0) instanceof Record) { + records = new ArrayList<>(); + for (Record rec : (List) inputRecords) { + records.add(KinesisClientRecord.fromRecord((Record) rec)); + } + } else { + if (inputRecords.size() == 0) { + return new ArrayList(); + } else { + throw new Exception("Input Types must be Kinesis Event or Model Records"); + } + } + + return records; + } + + /** + * Method to process a set of Kinesis user records from a Stream of Kinesis + * Event Records using the Java 8 Streams API + * + * @param inputStream The Kinesis Records provided by AWS Lambda or the + * Kinesis SDK + * @param streamConsumer Instance implementing the Consumer interface to process + * the deaggregated UserRecords + * @return Void + */ + public Void stream(Stream inputStream, Consumer streamConsumer) throws Exception { + // deaggregate UserRecords from the Kinesis Records + + List streamList = inputStream.collect(Collectors.toList()); + List deaggregatedRecords = new AggregatorUtil().deaggregate(convertType(streamList)); + deaggregatedRecords.stream().forEachOrdered(streamConsumer); + + return null; + } + + /** + * Method to process a set of Kinesis user records from a list of Kinesis + * Records using pre-Streams style API + * + * @param inputRecords The Kinesis Records provided by AWS Lambda + * @param processor Instance implementing KinesisUserRecordProcessor + * @return Void + */ + public Void processRecords(List inputRecords, KinesisUserRecordProcessor processor) throws Exception { + // invoke provided processor + return processor.process(new AggregatorUtil().deaggregate(convertType(inputRecords))); + } + + /** + * Method to bulk deaggregate a set of Kinesis user records from a list of + * Kinesis Event Records. + * + * @param inputRecords The Kinesis Records provided by AWS Lambda + * @return A list of Kinesis UserRecord objects obtained by deaggregating the + * input list of KinesisEventRecords + */ + public List deaggregate(List inputRecords) throws Exception { + List outputRecords = new LinkedList<>(); + outputRecords.addAll(new AggregatorUtil().deaggregate(convertType(inputRecords))); + + return outputRecords; + } + + /** + * Method to deaggregate a single Kinesis record into a List of UserRecords + * + * @param inputRecord The Kinesis Record provided by AWS Lambda or Kinesis SDK + * @return A list of Kinesis UserRecord objects obtained by deaggregating the + * input list of KinesisEventRecords + */ + public List deaggregate(T inputRecord) throws Exception { + return new AggregatorUtil().deaggregate(convertType(Arrays.asList(inputRecord))); + } +} diff --git a/java/KinesisDeaggregatorV2/src/main/java/com/amazonaws/kinesis/deagg/util/DeaggregationUtils.java b/java/KinesisDeaggregatorV2/src/main/java/com/amazonaws/kinesis/deagg/util/DeaggregationUtils.java new file mode 100644 index 0000000..ed8cf8d --- /dev/null +++ b/java/KinesisDeaggregatorV2/src/main/java/com/amazonaws/kinesis/deagg/util/DeaggregationUtils.java @@ -0,0 +1,33 @@ +package com.amazonaws.kinesis.deagg.util; + +import java.util.ArrayList; +import java.util.List; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; + +import software.amazon.awssdk.core.SdkBytes; + +public class DeaggregationUtils { + public static software.amazon.awssdk.services.kinesis.model.Record convertOne(KinesisEventRecord record) { + KinesisEvent.Record r = record.getKinesis(); + software.amazon.awssdk.services.kinesis.model.Record out = software.amazon.awssdk.services.kinesis.model.Record + .builder().partitionKey(r.getPartitionKey()).encryptionType(r.getEncryptionType()) + .approximateArrivalTimestamp(r.getApproximateArrivalTimestamp().toInstant()) + .sequenceNumber(r.getSequenceNumber()).data(SdkBytes.fromByteBuffer(r.getData())).build(); + + return out; + } + + public static List convertToKinesis( + List inputRecords) { + List response = new ArrayList<>(); + + inputRecords.stream().forEachOrdered(record -> { + response.add(convertOne(record)); + }); + + return response; + + } +} diff --git a/java/KinesisDeaggregatorV2/src/sample/java/SampleLambdaEvent.json b/java/KinesisDeaggregatorV2/src/sample/java/SampleLambdaEvent.json new file mode 100644 index 0000000..b1171cc --- /dev/null +++ b/java/KinesisDeaggregatorV2/src/sample/java/SampleLambdaEvent.json @@ -0,0 +1,20 @@ +{ + "Records": [ + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "84mawgoDYWJjEicxOTE0MTU2NTgzNDQxNTg3NjYxNjgwMzE0NzMyNzc5MjI4MDM1NzAaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2JhGiAIABAAGhphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ehogCAAQABoaenl4d3Z1dHNycXBvbm1sa2ppaGdmZWRjYmEaIAgAEAAaGmFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6GiAIABAAGhp6eXh3dnV0c3JxcG9ubWxramloZ2ZlZGNiYRogCAAQABoaYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoaIAgAEAAaGnp5eHd2dXRzcnFwb25tbGtqaWhnZmVkY2Jh0I8WvwEDJiGD4YsiKIfUOw==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200961", + "approximateArrivalTimestamp": 1428537600 + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "us-east-1" + } + ] +} \ No newline at end of file diff --git a/java/KinesisDeaggregatorV2/src/sample/java/com/amazonaws/kinesis/deagg/EchoHandler.java b/java/KinesisDeaggregatorV2/src/sample/java/com/amazonaws/kinesis/deagg/EchoHandler.java new file mode 100644 index 0000000..3dc778f --- /dev/null +++ b/java/KinesisDeaggregatorV2/src/sample/java/com/amazonaws/kinesis/deagg/EchoHandler.java @@ -0,0 +1,38 @@ +package com.amazonaws.kinesis.deagg; + +import java.util.List; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.LambdaLogger; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; + +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +public class EchoHandler implements RequestHandler { + + @Override + public Void handleRequest(KinesisEvent event, Context context) { + LambdaLogger logger = context.getLogger(); + + // extract the records from the event + List records = event.getRecords(); + + logger.log(String.format("Recieved %s Raw Records", records.size())); + + try { + // now deaggregate the message contents + List deaggregated = new RecordDeaggregator().deaggregate(records); + logger.log(String.format("Received %s Deaggregated User Records", deaggregated.size())); + + deaggregated.stream().forEachOrdered(rec -> { + logger.log(rec.partitionKey()); + }); + } catch (Exception e) { + logger.log(e.getMessage()); + } + + return null; + } +} diff --git a/java/KinesisDeaggregatorV2/src/test/java/TestDirectDeaggregation.java b/java/KinesisDeaggregatorV2/src/test/java/TestDirectDeaggregation.java new file mode 100644 index 0000000..f449977 --- /dev/null +++ b/java/KinesisDeaggregatorV2/src/test/java/TestDirectDeaggregation.java @@ -0,0 +1,165 @@ +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.amazonaws.kinesis.agg.AggRecord; +import com.amazonaws.kinesis.agg.RecordAggregator; +import com.amazonaws.kinesis.deagg.RecordDeaggregator; +import com.amazonaws.kinesis.deagg.RecordDeaggregator.KinesisUserRecordProcessor; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +public class TestDirectDeaggregation { + private static final int c = 10; + private static Map checkset = new HashMap<>(); + private static List recordList = null; + private static final RecordDeaggregator deaggregator = new RecordDeaggregator<>(); + private static RecordAggregator aggregator = null; + private static AggRecord aggregated = null; + + private final class TestKinesisUserRecordProcessor + implements Consumer, KinesisUserRecordProcessor { + private int recordsProcessed = 0; + + public int getCount() { + return this.recordsProcessed; + } + + @Override + public void accept(KinesisClientRecord t) { + recordsProcessed += 1; + } + + @Override + public Void process(List userRecords) { + recordsProcessed += userRecords.size(); + + return null; + } + + } + + /* Verify that a provided set of UserRecords map 1:1 to the original checkset */ + private void verifyOneToOneMapping(List userRecords) { + userRecords.stream().forEachOrdered(userRecord -> { + // get the original checkset record by ID + Record toCheck = checkset.get(userRecord.partitionKey()); + + // confirm that toCheck is not null + assertNotNull("Found Original CheckSet Record", toCheck); + + // confirm that the data is the same + assertTrue("Data Correct", userRecord.data().compareTo(toCheck.data().asByteBuffer()) == 0); + }); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + aggregator = new RecordAggregator(); + + recordList = new LinkedList<>(); + + // create 10 random records for testing + for (int i = 0; i < c; i++) { + // create trackable id + String id = UUID.randomUUID().toString(); + + // create a kinesis model record + byte[] data = RandomStringUtils.randomAlphabetic(20).getBytes(); + + Record r = Record.builder().partitionKey(id) + .approximateArrivalTimestamp(new Date(System.currentTimeMillis()).toInstant()) + .data(SdkBytes.fromByteArray(data)).build(); + recordList.add(r); + + // add the record to the check set + checkset.put(id, r); + + // add the record to the aggregated AggRecord // create an aggregated set of + aggregator.addUserRecord(id, data); + } + + // get the aggregated data + aggregated = aggregator.clearAndGet(); + assertEquals("Aggregated Record Count Correct", aggregated.getNumUserRecords(), c); + } + + @Test + public void testProcessor() throws Exception { + // create a counting record processor + TestKinesisUserRecordProcessor p = new TestKinesisUserRecordProcessor(); + + // invoke deaggregation on the static records with this processor + deaggregator.processRecords(recordList, p); + + assertEquals("Processed Record Count Correct", p.getCount(), recordList.size()); + } + + @Test + public void testStream() throws Exception { + // create a counting record processor + TestKinesisUserRecordProcessor p = new TestKinesisUserRecordProcessor(); + + // invoke deaggregation on the static records with this processor + deaggregator.stream(recordList.stream(), p); + + assertEquals("Processed Record Count Correct", p.getCount(), recordList.size()); + } + + @Test + public void testList() throws Exception { + // invoke deaggregation on the static records, returning a List of UserRecord + List records = deaggregator.deaggregate(recordList); + + assertEquals("Processed Record Count Correct", records.size(), recordList.size()); + verifyOneToOneMapping(records); + } + + @Test + public void testEmpty() throws Exception { + // invoke deaggregation on the static records, returning a List of UserRecord + List records = deaggregator.deaggregate(new ArrayList()); + + assertEquals("Processed Record Count Correct", records.size(), 0); + verifyOneToOneMapping(records); + } + + @Test + public void testOne() throws Exception { + // invoke deaggregation on the static records, returning a List of UserRecord + List records = deaggregator.deaggregate(recordList.get(0)); + + assertEquals("Processed Record Count Correct", records.size(), 1); + verifyOneToOneMapping(records); + } + + @Test + public void testAggregatedRecord() throws Exception { + // create a new KinesisEvent.Record from the aggregated data + Record r = Record.builder().partitionKey(aggregated.getPartitionKey()) + .approximateArrivalTimestamp(new Date(System.currentTimeMillis()).toInstant()) + .data(SdkBytes.fromByteArray(aggregated.toRecordBytes())).build(); + + // deaggregate the record + List userRecords = deaggregator.deaggregate(Arrays.asList(r)); + + assertEquals("Deaggregated Count Matches", aggregated.getNumUserRecords(), userRecords.size()); + verifyOneToOneMapping(userRecords); + } +} diff --git a/java/KinesisDeaggregatorV2/src/test/java/TestLambdaDeaggregation.java b/java/KinesisDeaggregatorV2/src/test/java/TestLambdaDeaggregation.java new file mode 100644 index 0000000..8082677 --- /dev/null +++ b/java/KinesisDeaggregatorV2/src/test/java/TestLambdaDeaggregation.java @@ -0,0 +1,172 @@ +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.amazonaws.kinesis.agg.AggRecord; +import com.amazonaws.kinesis.agg.RecordAggregator; +import com.amazonaws.kinesis.deagg.RecordDeaggregator; +import com.amazonaws.kinesis.deagg.RecordDeaggregator.KinesisUserRecordProcessor; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; + +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +public class TestLambdaDeaggregation { + private static final int c = 10; + private static Map checkset = new HashMap<>(); + private static List recordList = null; + private static final RecordDeaggregator deaggregator = new RecordDeaggregator<>(); + private static RecordAggregator aggregator = null; + private static AggRecord aggregated = null; + + private final class TestKinesisUserRecordProcessor + implements Consumer, KinesisUserRecordProcessor { + private int recordsProcessed = 0; + + public int getCount() { + return this.recordsProcessed; + } + + @Override + public void accept(KinesisClientRecord t) { + recordsProcessed += 1; + } + + @Override + public Void process(List userRecords) { + recordsProcessed += userRecords.size(); + + return null; + } + + } + + /* Verify that a provided set of UserRecords map 1:1 to the original checkset */ + private void verifyOneToOneMapping(List userRecords) { + userRecords.stream().forEachOrdered(userRecord -> { + // get the original checkset record by ID + KinesisEventRecord toCheck = checkset.get(userRecord.partitionKey()); + + // confirm that toCheck is not null + assertNotNull("Found Original CheckSet Record", toCheck); + + // confirm that the data is the same + assertTrue("Data Correct", userRecord.data().compareTo(toCheck.getKinesis().getData()) == 0); + }); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + aggregator = new RecordAggregator(); + + recordList = new LinkedList<>(); + + // create 10 random records for testing + for (int i = 0; i < c; i++) { + // create trackable id + String id = UUID.randomUUID().toString(); + + // create a kinesis model record + byte[] data = RandomStringUtils.randomAlphabetic(20).getBytes(); + + KinesisEvent.Record r = new KinesisEvent.Record(); + r.withPartitionKey(id).withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())) + .withData(ByteBuffer.wrap(data)); + KinesisEventRecord ker = new KinesisEventRecord(); + ker.setKinesis(r); + recordList.add(ker); + + // add the record to the check set + checkset.put(id, ker); + + // add the record to the aggregated AggRecord // create an aggregated set of + aggregator.addUserRecord(id, data); + } + + // get the aggregated data + aggregated = aggregator.clearAndGet(); + assertEquals("Aggregated Record Count Correct", aggregated.getNumUserRecords(), c); + } + + @Test + public void testProcessor() throws Exception { + // create a counting record processor + TestKinesisUserRecordProcessor p = new TestKinesisUserRecordProcessor(); + + // invoke deaggregation on the static records with this processor + deaggregator.processRecords(recordList, p); + + assertEquals("Processed Record Count Correct", p.getCount(), recordList.size()); + } + + @Test + public void testStream() throws Exception { + // create a counting record processor + TestKinesisUserRecordProcessor p = new TestKinesisUserRecordProcessor(); + + // invoke deaggregation on the static records with this processor + deaggregator.stream(recordList.stream(), p); + + assertEquals("Processed Record Count Correct", p.getCount(), recordList.size()); + } + + @Test + public void testList() throws Exception { + // invoke deaggregation on the static records, returning a List of UserRecord + List records = deaggregator.deaggregate(recordList); + + assertEquals("Processed Record Count Correct", records.size(), recordList.size()); + verifyOneToOneMapping(records); + } + + @Test + public void testAggregatedRecord() throws Exception { + // create a new KinesisEvent.Record from the aggregated data + KinesisEvent.Record r = new KinesisEvent.Record(); + r.setPartitionKey(aggregated.getPartitionKey()); + r.setApproximateArrivalTimestamp(new Date(System.currentTimeMillis())); + r.setData(ByteBuffer.wrap(aggregated.toRecordBytes())); + r.setKinesisSchemaVersion("1.0"); + KinesisEventRecord ker = new KinesisEventRecord(); + ker.setKinesis(r); + + // deaggregate the record + List userRecords = deaggregator.deaggregate(Arrays.asList(ker)); + + assertEquals("Deaggregated Count Matches", aggregated.getNumUserRecords(), userRecords.size()); + verifyOneToOneMapping(userRecords); + } + + @Test + public void testEmpty() throws Exception { + // invoke deaggregation on the static records, returning a List of UserRecord + List records = deaggregator.deaggregate(new ArrayList()); + + assertEquals("Processed Record Count Correct", records.size(), 0); + verifyOneToOneMapping(records); + } + + @Test + public void testOne() throws Exception { + // invoke deaggregation on the static records, returning a List of UserRecord + List records = deaggregator.deaggregate(recordList.get(0)); + + assertEquals("Processed Record Count Correct", records.size(), 1); + verifyOneToOneMapping(records); + } +} diff --git a/java/README.md b/java/README.md index a96e795..527c0b3 100644 --- a/java/README.md +++ b/java/README.md @@ -8,7 +8,14 @@ The [KinesisAggregator](KinesisAggregator) subproject contains Java classes that ## KinesisDeaggregator -The [KinesisDeaggregator](KinesisDeaggregator) subproject contains Java classes that allow you to deaggregate records that were transmitted using the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md), including those transmitted by the Kinesis Producer Library. This library will allow you to deaggregate aggregated records in any Java environment, including AWS Lambda. +The Deaggregation subprojects contain Java classes that allow you to deaggregate records that were transmitted using the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md), including those transmitted by the Kinesis Producer Library. This library will allow you to deaggregate aggregated records in any Java environment, including AWS Lambda. + +There are 2 versions of Deaggregator modules, based upon the AWS SDK version you are using: + +| SDK | Project | +| --- | ------- | +|Version 1 | [KinesisDeaggregator](KinesisDeaggregator) | +|Version 2 | [KinesisDeaggregatorV2](KinesisDeaggregatorV2) | ## KinesisTestConsumers