This pattern creates an AWS Lambda function that consumes messages from an Amazon Kinesis Data Streams and dumps them to Amazon DynamoDB using SAM and .NET.
Important: This application uses various AWS services and there are costs associated with these services after the Free Tier usage. Please see the AWS Pricing page for details. You are responsible for any AWS costs incurred.
.NET 8
The framework used to deploy the infrastructure is SAM
The AWS services used in this pattern are
Amazon Kinesis → AWS Lambda → Amazon DynamoDB
Amazon DynamoDB is not part of SUT in this pattern
The SUT is a streaming data processing system. A Lambda function has an Event Source Mapping to a Kinesis Data Stream. The Lambda Event Source Mapping(ESM) polls the Kinesis Data Stream and then synchronously invokes the Lambda function with a batch of messages. The Lambda function processes batches of messages and writes results to a DynamoDB Table.
The goal of this example is to show how to test Lambda functions that are part of a streaming data processing application. In streaming workloads the number of messages that are sent to Lambda in a batch can change with the rate of messages being published to the stream, so we show testing with different sized batches.
In this pattern you will deploy a streaming workload where a Lambda function is triggered by messages in a Kinesis Data Stream. This project demonstrates several techniques for executing tests including running Lambda function locally with a simulated payload as well integration tests in the cloud.
This example contains an Amazon Kinesis Data Stream, AWS Lambda and Amazon DynamoDB table core resources.
The Amazon Kinesis Data Stream can stream data but the AWS Lambda function in this example expects Kinesis Stream Event data to contain a JSON object with 6 properties:
{
"employee_id": "string",
"email": "string",
"first_name": "string",
"last_name": "string",
"dob": "DateTime",
"doh": "DateTime"
}
employee_id
: unique identifier for each individual record. Each record should have a uniqueid
property valueemail
: email address of the employeefirst_name
: first name of the employeelast_name
: last name of the employeedob
: date of birth of the employeedoh
: date of hire of the employee
The AWS Lambda function converts the incoming event data into the processed record JSON, setting the employee_id
to be the DynamoDB Partition Key.
The SAM template contains all the information to deploy AWS resources (An Amazon Kinesis Data Stream, an AWS Lambda function and an Amazon DynamoDB) and also the permissions required by these services to communicate.
You will be able to create and delete the CloudFormation stack using the SAM CLI.
The solution is split down into two projects:
-
KinesisEventHandler.Infrastructure Contains code for bootstrapping the ServiceProvider and extensions.
-
KinesisEventHandler.Repositories Contains code for any persistence layer, in this case DynamoDB.
-
Function project(s):
-
Test project(s):
The AWS SAM CLI is used to deploy the application. When working through the sam deploy --guided
take note of the stack name used.
sam build
sam deploy --guided
To test the application, you need to write a record to the Kinesis Data Stream. This can be done in following ways:
-
Put Record API: refer Amazon Kinesis Streams API Reference
Sample Request:
POST / HTTP/1.1 Host: kinesis.<region>.<domain> Content-Length: <PayloadSizeBytes> User-Agent: <UserAgentString> Content-Type: application/x-amz-json-1.1 Authorization: <AuthParams> Connection: Keep-Alive X-Amz-Date: <Date> X-Amz-Target: Kinesis_20131202.PutRecord { "StreamName": "exampleStreamName", "Data": "XzxkYXRhPl8x", "PartitionKey": "partitionKey" }
The source code for this sample includes automated unit and integration tests. xUnit is the primary test framework used to write these tests. A few other libraries and frameworks are used depending on the test case pattern. Please see below.
The goal of these tests is to run a unit test on the ProcessKinesisRecord
method which is called by the handler method of the Lambda function.
The system under test here is completely abstracted from any cloud resources.
[Fact]
public async Task ProcessEmployeeFunction_With_ValidEmployeeRecord_Should_ProcessKinesisRecordSuccessfully()
{
//Arrange
var fakeRepository = A.Fake<IDynamoDbRepository<EmployeeDto>>();
A.CallTo(() => fakeRepository.PutItemAsync(A<EmployeeDto>._, A<CancellationToken>._))
.Returns(Task.FromResult(UpsertResult.Inserted));
var sut = new ProcessEmployeeFunction(fakeRepository);
var employee = new EmployeeBuilder().Build();
var context = new TestLambdaContext();
//Act
var taskResult = await sut.ProcessKinesisRecord(employee, context);
//Assert
Assert.True(taskResult.IsCompleted);
}
The goal of these tests is to run a unit test on the KinesisEventHandler which implements the handler method of the Lambda function.
It uses FakeItEasy for the mocking framework. The PutItemAsync
method in IDynamoDbRepository
is faked.
[Fact]
public async Task KinesisEventHandler_With_N_Records_Should_CallProcessKinesisRecord_N_Times()
{
//Arrange
var randomNumber = (new Random()).Next(2, 20);
var employees = new List<Employee>();
for (var i = 0; i < randomNumber; i++)
{
employees.Add(new EmployeeBuilder().Build());
}
var kinesisEvent = new KinesisEventBuilder().WithEmployees(employees);
var lambdaContext = new TestLambdaContext();
//Act
var result = await _mockKinesisEventTrigger.Object.Handler(kinesisEvent, lambdaContext);
//Assert
result.BatchItemFailures.Should().BeEmpty();
A.CallTo(() => _mockKinesisEventTrigger.ProcessKinesisRecord(
A<Employee>._,
A<ILambdaContext>._))
.MustHaveHappened
Times.Exactly(randomNumber, Times.Exactly);
}
To execute the tests:
Powershell
dotnet test tests\KinesisEventHandler.UnitTests\KinesisEventHandler.UnitTests.csproj
Bash
dotnet test tests/KinesisEventHandler.UnitTests/KinesisEventHandler.UnitTests.csproj
The goal of this test is to demonstrate a test that runs the Lambda function's code against deployed resources. The tests interact with the Kinesis Data Stream directly using AmazonKinesisClient and tests the expected responses returned.
[Fact]
public async Task WriteToEmployeeRecordsStream_Should_Return_HTTP_OK()
{
//Arrange
var employee = new EmployeeBuilder().Build();
//Act
var response = await _processEmployeeFixture.StreamRecordAsync(employee);
//Assert
response.Should().NotBeNull();
response.HttpStatusCode.Should().Be(HttpStatusCode.OK);
//Dispose
_processEmployeeFixture.CreatedEmployeeIds.Add(employee.EmployeeId);
}
[Fact]
public async Task WriteToEmployeeRecordsStream_Should_Upsert_To_EmployeeStreamTable()
{
//Arrange
var employee = new EmployeeBuilder().Build();
//Act
using var cts = new CancellationTokenSource();
await _processEmployeeFixture.StreamRecordAsync(employee);
var response = await _processEmployeeFixture.PollForProcessedMessage(employee, cts.Token);
//Assert
response.Should().NotBeNull().And.BeEquivalentTo(employee);
//Dispose
_processEmployeeFixture.CreatedEmployeeIds.Add(employee.EmployeeId);
}
Before running these tests, resources will need to be deployed using the steps in the
Deployment Commands
section above. Tests are there for both happy and sad paths.
To execute the tests:
Powershell
$env:AWS_SAM_STACK_NAME = <STACK_NAME_USED_IN_SAM_DEPLOY>
$env:AWS_SAM_REGION_NAME = <REGION_NAME_USED_IN_SAM_DEPLOY>
dotnet test ./tests/KinesisEventHandler.IntegrationTests/KinesisEventHandler.IntegrationTests.csproj
Bash
AWS_SAM_STACK_NAME=<STACK_NAME_USED_IN_SAM_DEPLOY>
AWS_SAM_REGION_NAME=<REGION_NAME_USED_IN_SAM_DEPLOY>
dotnet test ./tests/KinesisEventHandler.IntegrationTests/KinesisEventHandler.IntegrationTests.csproj
Run the given command to delete the resources that were created. It might take some time for the CloudFormation stack to get deleted.
sam delete
- Create an AWS account if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
- AWS CLI installed and configured
- Git Installed
- AWS Serverless Application Model (AWS SAM) installed
- .NET 8 installed