Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for partial batch responses #29

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions AWSLambdaSharpTemplate.sln
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqsEventFunction", "samples
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqsEventFunctionWithParallelism", "samples\SqsEventFunctionWithParallelism\SqsEventFunctionWithParallelism.csproj", "{EC8A27C9-363C-434A-BEBE-0955E1BB40FC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqsBatchResponseFunction", "samples\SqsBatchResponseFunction\SqsBatchResponseFunction.csproj", "{968317B0-6204-414B-8CC0-6B7D0B616236}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{AF4300F6-9636-4925-B8D5-F98E1BDC5EF5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kralizek.Lambda.Template", "src\Kralizek.Lambda.Template\Kralizek.Lambda.Template.csproj", "{D092A193-823E-4205-872B-E3060BC3CF1A}"
Expand Down Expand Up @@ -62,6 +64,10 @@ Global
{EC8A27C9-363C-434A-BEBE-0955E1BB40FC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EC8A27C9-363C-434A-BEBE-0955E1BB40FC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EC8A27C9-363C-434A-BEBE-0955E1BB40FC}.Release|Any CPU.Build.0 = Release|Any CPU
{968317B0-6204-414B-8CC0-6B7D0B616236}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{968317B0-6204-414B-8CC0-6B7D0B616236}.Debug|Any CPU.Build.0 = Debug|Any CPU
{968317B0-6204-414B-8CC0-6B7D0B616236}.Release|Any CPU.ActiveCfg = Release|Any CPU
{968317B0-6204-414B-8CC0-6B7D0B616236}.Release|Any CPU.Build.0 = Release|Any CPU
{D092A193-823E-4205-872B-E3060BC3CF1A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D092A193-823E-4205-872B-E3060BC3CF1A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D092A193-823E-4205-872B-E3060BC3CF1A}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand All @@ -86,6 +92,7 @@ Global
{E93EC39F-7934-46CE-8FD1-6D882576D66D} = {3D496921-7E86-44BE-AF72-9786A924052A}
{DF86D670-8E2F-40B8-899E-E544B50C4024} = {3D496921-7E86-44BE-AF72-9786A924052A}
{EC8A27C9-363C-434A-BEBE-0955E1BB40FC} = {3D496921-7E86-44BE-AF72-9786A924052A}
{968317B0-6204-414B-8CC0-6B7D0B616236} = {3D496921-7E86-44BE-AF72-9786A924052A}
{D092A193-823E-4205-872B-E3060BC3CF1A} = {AF4300F6-9636-4925-B8D5-F98E1BDC5EF5}
{5574076F-7D6D-4AD2-9231-76A18D2A85D7} = {AF4300F6-9636-4925-B8D5-F98E1BDC5EF5}
{F9D3C337-5936-4BCB-BA2F-03A9E7C148DD} = {AF4300F6-9636-4925-B8D5-F98E1BDC5EF5}
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ protected override void ConfigureServices(IServiceCollection services, IExecutio
}
```

### Handling SQS messages with partial batch responses

The [SQS Event Source](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) can be configured to send batches of more than one message to the Lambda. When a class that derives from `EventFunction<SQSEvent>` is used as the Lambda entry point, any exceptions thrown from the message handler will propagate to the Lambda runtime, causing the entire batch that's being processed by that Lambda invocation to fail. All of the messages in the failed batch will be retried (subject to SQS configuration).

As an alternative to this default behavior, [partial batch support](https://aws.amazon.com/about-aws/whats-new/2021/11/aws-lambda-partial-batch-response-sqs-event-source/) can be enabled by deriving the entry point class from `RequestResponseFunction<SQSEvent, SQSBatchResponse>` instead, and configuring the SQS Event Source to look in the Lambda response body for batch item failure information. When partial batch support is enabled, exceptions thrown from the handler are caught, and failed messages are reported to Lambda in the response payload. Only failed messages will be retried (subject to SQS configuration).

The Event Source configuration can be done using [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes), or by turning on "Report batch item failures" in the AWS console (trigger configuration), or by some other means such as the AWS CLI.

Configuring the SQS Event Source is critical to partial batch support working properly. If this step is omitted, yet an entry point class deriving from `RequestResponseFunction<SQSEvent, SQSBatchResponse>` is used, exceptions thrown from the message handler will be logged but then ignored (not retried).


# Creating a new function

The best way to create a new AWS Lambda that uses this structure is to use the `dotnet new` template provided via NuGet.
Expand All @@ -224,6 +235,7 @@ Here is a list of all the available templates
|Lambda Boilerplate RequestResponse Function|lambda-template-requestresponse-boilerplate|Creates a RequestResponse function with some boilerplate added|
|Lambda SNS Handler Function|lambda-template-sns-event|Creates a function to handle SNS notifications|
|Lambda SQS Handler Function|lambda-template-sqs-event|Creates a function to handle SQS messages|
|Lambda SQS Partial Batch Response Function|lambda-template-sqs-partial-batch|Creates a function to handle SQS messages, with partial batch responses|

All the templates support the following parameters

Expand Down
79 changes: 79 additions & 0 deletions samples/SqsBatchResponseFunction/Function.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Kralizek.Lambda;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace SqsBatchResponseFunction;

/// <summary>
/// Deriving the entry point class from <see cref="RequestResponseFunction{SQSEvent, SQSBatchResponse}" />,
/// rather than <see cref="EventFunction{SQSEvent}" />, facilitates partial batch support by catching
/// exceptions thrown from the <see cref="IMessageHandler{TMessage}" />, and reporting them to Lambda in
/// the response payload.
/// </summary>
/// <remarks>
/// Configuring the SQS Event Source (on the AWS side) is critical to partial batch support working properly.
/// Please refer to the README for further information.
/// </remarks>
public class Function : RequestResponseFunction<SQSEvent, SQSBatchResponse>
{
protected override void Configure(IConfigurationBuilder builder)
{
builder.AddEnvironmentVariables();
}

protected override void ConfigureLogging(ILoggingBuilder logging, IExecutionEnvironment executionEnvironment)
{
logging.AddConfiguration(Configuration.GetSection("Logging"));

logging.AddLambdaLogger(new LambdaLoggerOptions
{
IncludeCategory = true,
IncludeLogLevel = true,
IncludeNewline = true
});
}

protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
{
services.UseQueueMessageHandler<TestMessage, TestMessageHandler>();
}
}

public class TestMessage
{
[JsonPropertyName("message")]
public string? Message { get; set; }
}

public class TestMessageHandler : IMessageHandler<TestMessage>
{
private readonly ILogger<TestMessageHandler> _logger;

public TestMessageHandler(ILogger<TestMessageHandler> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public Task HandleAsync(TestMessage? message, ILambdaContext context)
{
_logger.LogInformation("Received notification: {Message}", message?.Message);

if (message is { Message.Length: > 0 })
{
if (message.Message.Contains("bad message", StringComparison.OrdinalIgnoreCase))
{
throw new ArgumentException("message supplied was a bad message", nameof(message));
}
}

return Task.CompletedTask;
}
}
19 changes: 19 additions & 0 deletions samples/SqsBatchResponseFunction/SqsBatchResponseFunction.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Amazon.Lambda.Logging.AspNetCore" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Configuration" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Kralizek.Lambda.Template.Sqs\Kralizek.Lambda.Template.Sqs.csproj" />
</ItemGroup>

</Project>
21 changes: 21 additions & 0 deletions samples/SqsBatchResponseFunction/aws-lambda-tools-defaults.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"Information": [
"This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.",
"To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.",

"dotnet lambda help",

"All the command line options for the Lambda command can be specified in this file."
],

"profile": "",
"region": "",
"configuration": "Release",
"framework": "net6.0",
"function-name": "SqsBatchResponseFunction",
"function-role": "",
"function-runtime": "dotnet6",
"function-memory-size": 128,
"function-timeout": 30,
"function-handler": "SqsBatchResponseFunction::SqsBatchResponseFunction.Function::FunctionHandlerAsync"
}
42 changes: 38 additions & 4 deletions src/Kralizek.Lambda.Template.Sqs/ParallelSqsEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using static Amazon.Lambda.SQSEvents.SQSBatchResponse;

namespace Kralizek.Lambda;

Expand All @@ -23,9 +24,11 @@ public class ParallelSqsExecutionOptions

/// <summary>
/// An implementation of <see cref="IEventHandler{TInput}"/> specialized for <see cref="SQSEvent"/> that processes all the records in parallel.
/// Also implements <see cref="IRequestResponseHandler{TInput, TOutput}"/>, specialized for <see cref="SQSEvent"/> and <see cref="SQSBatchResponse"/>,
/// to support parallel processing with partial batch responses.
/// </summary>
/// <typeparam name="TMessage">The internal type of the SQS message.</typeparam>
public class ParallelSqsEventHandler<TMessage>: IEventHandler<SQSEvent> where TMessage : class
public class ParallelSqsEventHandler<TMessage> : IEventHandler<SQSEvent>, IRequestResponseHandler<SQSEvent, SQSBatchResponse> where TMessage : class
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
Expand All @@ -44,7 +47,30 @@ public ParallelSqsEventHandler(IServiceProvider serviceProvider, ILoggerFactory
/// <param name="input">The incoming event.</param>
/// <param name="context">The execution context.</param>
/// <exception cref="InvalidOperationException">Thrown if there is no registered implementation of <see cref="IMessageHandler{TMessage}"/>.</exception>
public async Task HandleAsync(SQSEvent? input, ILambdaContext context)
public async Task HandleAsync(SQSEvent? input, ILambdaContext context) =>
await ((IEventHandler<SQSEvent>)this).HandleAsync(input, context);

/// <inheritdoc cref="HandleAsync(SQSEvent?, ILambdaContext)"/>
async Task IEventHandler<SQSEvent>.HandleAsync(SQSEvent? input, ILambdaContext context) =>
await HandleAsync(input, context, null);

/// <summary>
/// Handles the <see cref="SQSEvent"/> by processing each record in parallel.
/// Catches any exceptions thrown by the <see cref="IMessageHandler{TMessage}"/>, logs them, and reports them as batch response item failures.
/// </summary>
/// <param name="input">The incoming event.</param>
/// <param name="context">The execution context.</param>
/// <returns>Object conveying SQS message item failures.</returns>
/// <exception cref="InvalidOperationException">Thrown if there is no registered implementation of <see cref="IMessageHandler{TMessage}"/>.</exception>
/// <seealso href="https://aws.amazon.com/about-aws/whats-new/2021/11/aws-lambda-partial-batch-response-sqs-event-source/"/>
async Task<SQSBatchResponse> IRequestResponseHandler<SQSEvent, SQSBatchResponse>.HandleAsync(SQSEvent? input, ILambdaContext context)
{
var batchItemFailures = new ConcurrentBag<BatchItemFailure>();
await HandleAsync(input, context, batchItemFailures);
return new(batchItemFailures.ToList());
}

private async Task HandleAsync(SQSEvent? input, ILambdaContext context, ConcurrentBag<BatchItemFailure>? batchItemFailures)
{
if (input is { Records.Count: > 0 })
{
Expand All @@ -68,7 +94,15 @@ await input.Records.ForEachAsync(_options.MaxDegreeOfParallelism, async singleSq
throw new InvalidOperationException($"No IMessageHandler<{typeof(TMessage).Name}> could be found.");
}

await messageHandler.HandleAsync(message, context);
try
{
await messageHandler.HandleAsync(message, context).ConfigureAwait(false);
}
catch (Exception exc) when (batchItemFailures is not null)
{
_logger.LogError(exc, "Recording batch item failure for message {MessageId}", singleSqsMessage.MessageId);
batchItemFailures.Add(new() { ItemIdentifier = singleSqsMessage.MessageId });
}
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static IMessageHandlerConfigurator<TMessage> WithParallelExecution<TMessa
if (maxDegreeOfParallelism <= 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism), $"{nameof(maxDegreeOfParallelism)} must be greater than 1");

configurator.Services.AddTransient<IEventHandler<SQSEvent>, ParallelSqsEventHandler<TMessage>>();
configurator.Services.AddTransient<IRequestResponseHandler<SQSEvent, SQSBatchResponse>, ParallelSqsEventHandler<TMessage>>();

if (maxDegreeOfParallelism.HasValue)
{
Expand Down Expand Up @@ -87,7 +88,8 @@ public static IMessageHandlerConfigurator<TMessage> UseQueueMessageHandler<TMess
services.AddOptions();

services.AddTransient<IEventHandler<SQSEvent>, SqsEventHandler<TMessage>>();

services.AddTransient<IRequestResponseHandler<SQSEvent, SQSBatchResponse>, SqsEventHandler<TMessage>>();

services.TryAddSingleton<IMessageSerializer, DefaultJsonMessageSerializer>();

services.Add(ServiceDescriptor.Describe(typeof(IMessageHandler<TMessage>), typeof(THandler), lifetime));
Expand Down
41 changes: 38 additions & 3 deletions src/Kralizek.Lambda.Template.Sqs/SqsEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using static Amazon.Lambda.SQSEvents.SQSBatchResponse;

namespace Kralizek.Lambda;

/// <summary>
/// An implementation of <see cref="IEventHandler{TInput}"/> specialized for <see cref="SQSEvent"/> that processes all the records in sequence.
/// Also implements <see cref="IRequestResponseHandler{TInput, TOutput}"/>, specialized for <see cref="SQSEvent"/> and <see cref="SQSBatchResponse"/>,
/// to support sequential processing with partial batch responses.
/// </summary>
/// <typeparam name="TMessage">The internal type of the SQS message.</typeparam>
public class SqsEventHandler<TMessage> : IEventHandler<SQSEvent> where TMessage : class
public class SqsEventHandler<TMessage> : IEventHandler<SQSEvent>, IRequestResponseHandler<SQSEvent, SQSBatchResponse> where TMessage : class
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
Expand All @@ -28,7 +32,30 @@ public SqsEventHandler(IServiceProvider serviceProvider, ILoggerFactory loggerFa
/// <param name="input">The incoming event.</param>
/// <param name="context">The execution context.</param>
/// <exception cref="InvalidOperationException">Thrown if there is no registered implementation of <see cref="IMessageHandler{TMessage}"/>.</exception>
public async Task HandleAsync(SQSEvent? input, ILambdaContext context)
public async Task HandleAsync(SQSEvent? input, ILambdaContext context) =>
await ((IEventHandler<SQSEvent>)this).HandleAsync(input, context);

/// <inheritdoc cref="HandleAsync(SQSEvent?, ILambdaContext)"/>
async Task IEventHandler<SQSEvent>.HandleAsync(SQSEvent? input, ILambdaContext context) =>
await HandleAsync(input, context, null);

/// <summary>
/// Handles the <see cref="SQSEvent"/> by processing each record in sequence.
/// Catches any exceptions thrown by the <see cref="IMessageHandler{TMessage}"/>, logs them, and reports them as batch response item failures.
/// </summary>
/// <param name="input">The incoming event.</param>
/// <param name="context">The execution context.</param>
/// <returns>Object conveying SQS message item failures.</returns>
/// <exception cref="InvalidOperationException">Thrown if there is no registered implementation of <see cref="IMessageHandler{TMessage}"/>.</exception>
/// <seealso href="https://aws.amazon.com/about-aws/whats-new/2021/11/aws-lambda-partial-batch-response-sqs-event-source/"/>
async Task<SQSBatchResponse> IRequestResponseHandler<SQSEvent, SQSBatchResponse>.HandleAsync(SQSEvent? input, ILambdaContext context)
{
var batchItemFailures = new List<BatchItemFailure>();
await HandleAsync(input, context, batchItemFailures);
return new(batchItemFailures);
}

private async Task HandleAsync(SQSEvent? input, ILambdaContext context, List<BatchItemFailure>? batchItemFailures)
{
if (input is { Records: { } })
{
Expand All @@ -52,7 +79,15 @@ public async Task HandleAsync(SQSEvent? input, ILambdaContext context)
}

_logger.LogInformation("Invoking notification handler");
await handler.HandleAsync(message, context).ConfigureAwait(false);
try
{
await handler.HandleAsync(message, context).ConfigureAwait(false);
}
catch (Exception exc) when (batchItemFailures is not null)
{
_logger.LogError(exc, "Recording batch item failure for message {MessageId}", record.MessageId);
batchItemFailures.Add(new() { ItemIdentifier = record.MessageId });
}
}
}
}
Expand Down
Loading