Skip to content

Commit

Permalink
Add gzip compression (#204)
Browse files Browse the repository at this point in the history
This also adds a new summary for deserialization timings, together with
summaries for compression and decompression timings.

Fixes #200

Co-authored-by: Florian Grieskamp <[email protected]>
Co-authored-by: Phillip Kemkes <[email protected]>

Co-authored-by: Florian Hockmann <[email protected]>
Co-authored-by: Phillip Kemkes <[email protected]>
  • Loading branch information
3 people authored May 19, 2021
1 parent 15d9372 commit 5765e00
Show file tree
Hide file tree
Showing 35 changed files with 718 additions and 63 deletions.
60 changes: 60 additions & 0 deletions Motor.NET.sln
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Motor.Extensions.Utilities_
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Motor.Extensions.Diagnostics.Telemetry", "src\Motor.Extensions.Diagnostics.Telemetry\Motor.Extensions.Diagnostics.Telemetry.csproj", "{6BCF181C-1BAA-4B9C-8E32-EB46409CDF01}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Motor.Extensions.Compression.Abstractions", "src\Motor.Extensions.Compression.Abstractions\Motor.Extensions.Compression.Abstractions.csproj", "{EDCD789B-FD17-449E-80D5-70E6E93F35F1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Motor.Extensions.Compression.Gzip", "src\Motor.Extensions.Compression.Gzip\Motor.Extensions.Compression.Gzip.csproj", "{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Motor.Extensions.Compression.Gzip_UnitTest", "test\Motor.Extensions.Compression.Gzip_UnitTest\Motor.Extensions.Compression.Gzip_UnitTest.csproj", "{7A219941-ABC1-4586-9C6E-8D268D13F96F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Motor.Extensions.Compression.Abstractions_UnitTest", "test\Motor.Extensions.Compression.Abstractions_UnitTest\Motor.Extensions.Compression.Abstractions_UnitTest.csproj", "{C1385EB8-4F54-4869-83A0-72D85A5E986C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -671,6 +679,54 @@ Global
{6BCF181C-1BAA-4B9C-8E32-EB46409CDF01}.Release|x64.Build.0 = Release|Any CPU
{6BCF181C-1BAA-4B9C-8E32-EB46409CDF01}.Release|x86.ActiveCfg = Release|Any CPU
{6BCF181C-1BAA-4B9C-8E32-EB46409CDF01}.Release|x86.Build.0 = Release|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Debug|x64.ActiveCfg = Debug|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Debug|x64.Build.0 = Debug|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Debug|x86.ActiveCfg = Debug|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Debug|x86.Build.0 = Debug|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Release|Any CPU.Build.0 = Release|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Release|x64.ActiveCfg = Release|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Release|x64.Build.0 = Release|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Release|x86.ActiveCfg = Release|Any CPU
{EDCD789B-FD17-449E-80D5-70E6E93F35F1}.Release|x86.Build.0 = Release|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Debug|x64.ActiveCfg = Debug|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Debug|x64.Build.0 = Debug|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Debug|x86.ActiveCfg = Debug|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Debug|x86.Build.0 = Debug|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Release|Any CPU.Build.0 = Release|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Release|x64.ActiveCfg = Release|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Release|x64.Build.0 = Release|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Release|x86.ActiveCfg = Release|Any CPU
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC}.Release|x86.Build.0 = Release|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Debug|x64.ActiveCfg = Debug|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Debug|x64.Build.0 = Debug|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Debug|x86.ActiveCfg = Debug|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Debug|x86.Build.0 = Debug|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Release|Any CPU.Build.0 = Release|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Release|x64.ActiveCfg = Release|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Release|x64.Build.0 = Release|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Release|x86.ActiveCfg = Release|Any CPU
{7A219941-ABC1-4586-9C6E-8D268D13F96F}.Release|x86.Build.0 = Release|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Debug|x64.ActiveCfg = Debug|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Debug|x64.Build.0 = Debug|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Debug|x86.ActiveCfg = Debug|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Debug|x86.Build.0 = Debug|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Release|Any CPU.Build.0 = Release|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Release|x64.ActiveCfg = Release|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Release|x64.Build.0 = Release|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Release|x86.ActiveCfg = Release|Any CPU
{C1385EB8-4F54-4869-83A0-72D85A5E986C}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -722,6 +778,10 @@ Global
{62234AB3-F8B0-41A0-B3AE-988D7A26F91C} = {749B1421-3177-4C7A-A66B-541BD4E925B0}
{B51A4631-F1C4-44A6-9F3C-F0AD0AE65697} = {ADD2EBBA-A839-4E4A-9253-CDE29A372F07}
{6BCF181C-1BAA-4B9C-8E32-EB46409CDF01} = {749B1421-3177-4C7A-A66B-541BD4E925B0}
{EDCD789B-FD17-449E-80D5-70E6E93F35F1} = {749B1421-3177-4C7A-A66B-541BD4E925B0}
{ADAEFE79-278A-4945-9FBD-4D0DA81237FC} = {749B1421-3177-4C7A-A66B-541BD4E925B0}
{7A219941-ABC1-4586-9C6E-8D268D13F96F} = {ADD2EBBA-A839-4E4A-9253-CDE29A372F07}
{C1385EB8-4F54-4869-83A0-72D85A5E986C} = {ADD2EBBA-A839-4E4A-9253-CDE29A372F07}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {5E91C34C-3AEC-4084-BA02-753C9236AA34}
Expand Down
24 changes: 17 additions & 7 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ You find working examples for different use-cases under the [examples](./example

## Support Matrix

| Component | Consume | Publish | CloudEvents | Metrics | Custom |
| --- | --- | --- | --- | --- | --- |
| RabbitMQ | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | priority, dynamic routing |
| Kafka | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | partitioning key, dynamic topic |
| Http | (:heavy_check_mark:) | (:heavy_check_mark:) | :x: |:heavy_check_mark:| |
| Timer | (:heavy_check_mark:) | - | :x: | :x:| |
| SQS | (:heavy_check_mark:) | - | :x: | :x:| |
| Component | Consume | Publish | CloudEvents | Metrics | Compression | Custom |
| --- | --- | --- | --- | --- | --- | --- |
| RabbitMQ | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | priority, dynamic routing |
| Kafka | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | partitioning key, dynamic topic |
| Http | (:heavy_check_mark:) | (:heavy_check_mark:) | :x: |:heavy_check_mark: | :x: | |
| Timer | (:heavy_check_mark:) | - | :x: | :x: | :x: | |
| SQS | (:heavy_check_mark:) | - | :x: | :x: | :x: | |

## Health Checks

Expand All @@ -39,6 +39,16 @@ Motor.NET comes by default already with two health checks for message processing
- `MessageProcessingHealthCheck`: Fails when no messages were consumed in a certain time frame from the Motor.NET internal queue although it has at least some messages.
- `TooManyTemporaryFailuresHealthCheck`: Fails when too many messages led to a failure since the last message was correctly handled (either successful or as invalid input).

## Compression (Optional)

[Gzip][gzip] compression can optionally be enabled for consumers and publishers. By enabling it for a publisher, the payload of
all published messages will be compressed. Enabling it for a consumer will allow the consumer to decompress these
messages. Consumers can however still consume uncompressed messages. This should make it easy to enable compression in
an existing environment that does not use compression yet. It just needs to be enabled first for the consumers and
afterwards for the publishers.

## License

Motor.NET is provided under the [MIT](./LICENSE) license.

[gzip]: https://www.gnu.org/software/gzip/
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Motor.Extensions.Compression.Gzip\Motor.Extensions.Compression.Gzip.csproj" />
<ProjectReference Include="..\..\src\Motor.Extensions.Conversion.SystemJson\Motor.Extensions.Conversion.SystemJson.csproj" />
<ProjectReference Include="..\..\src\Motor.Extensions.Hosting.RabbitMQ\Motor.Extensions.Hosting.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\Motor.Extensions.Utilities\Motor.Extensions.Utilities.csproj" />
Expand Down
6 changes: 6 additions & 0 deletions examples/ConsumeAndPublishWithRabbitMQ/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using ConsumeAndPublishWithRabbitMQ.Model;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Motor.Extensions.Compression.Gzip;
using Motor.Extensions.Conversion.SystemJson;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.Consumer;
Expand All @@ -25,6 +26,9 @@ await MotorHost.CreateDefaultBuilder()
builder.AddRabbitMQ();
// The encoding of the incoming message, such that the handler is able to deserialize the message
builder.AddSystemJson();
// (Optional) Enable support for incoming messages that are gzip compressed. Uncompressed messages will still
// work to make the migration to compression backwards-compatible.
builder.AddGzipDecompression();
})
// Add the outgoing communication module.
.ConfigurePublisher<OutputMessage>((_, builder) =>
Expand All @@ -33,5 +37,7 @@ await MotorHost.CreateDefaultBuilder()
builder.AddRabbitMQ();
// The encoding of the outgoing message, such that the handler is able to serialize the message
builder.AddSystemJson();
// (Optional) Compress the serialized data of the outgoing message with gzip.
builder.AddGzipCompression();
})
.RunConsoleAsync();
2 changes: 1 addition & 1 deletion shared.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>

<PropertyGroup>
<Version>0.6.13</Version>
<Version>0.6.14</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
<WarningsAsErrors>CS8600;CS8602;CS8625;CS8618;CS8604;CS8601</WarningsAsErrors>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using CloudNative.CloudEvents;

namespace Motor.Extensions.Compression.Abstractions
{
public class CompressionTypeExtension : ICloudEventExtension
{
private const string CompressionTypeAttributeName = "compressionType";
private IDictionary<string, object> _attributes = new Dictionary<string, object>();

public CompressionTypeExtension(string compressionType)
{
CompressionType = compressionType;
}

public string? CompressionType
{
get => (string?)_attributes[CompressionTypeAttributeName];
private init => _attributes[CompressionTypeAttributeName] =
value ?? throw new ArgumentNullException(nameof(CompressionType));
}

public void Attach(CloudEvent cloudEvent)
{
var eventAttributes = cloudEvent.GetAttributes();
if (_attributes == eventAttributes)
{
// already done
return;
}

foreach (var (key, value) in _attributes)
{
eventAttributes[key] = value;
}

_attributes = eventAttributes;
}

public bool ValidateAndNormalize(string key, ref object value)
{
if (key is CompressionTypeAttributeName)
{
return value switch
{
null => true,
string _ => true,
_ => throw new InvalidOperationException("ErrorCompressionTypeValueIsNotAString")
};
}

return false;
}

// Disabled null check because CloudEvent SDK doesn't
// implement null-checks
#pragma warning disable CS8603
public Type GetAttributeType(string name)
{
return name is CompressionTypeAttributeName ? typeof(string) : null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Threading;
using System.Threading.Tasks;

namespace Motor.Extensions.Compression.Abstractions
{
public interface IMessageCompressor
{
string CompressionType { get; }
Task<byte[]> CompressAsync(byte[] rawMessage, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Threading;
using System.Threading.Tasks;

namespace Motor.Extensions.Compression.Abstractions
{
public interface IMessageDecompressor
{
string CompressionType { get; }
Task<byte[]> DecompressAsync(byte[] compressedMessage, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CloudNative.CloudEvents" Version="1.3.80" />
</ItemGroup>

<Import Project="$(MSBuildThisFileDirectory)../../shared.csproj" />

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Threading;
using System.Threading.Tasks;

namespace Motor.Extensions.Compression.Abstractions
{

public class NoOpMessageCompressor : IMessageCompressor
{
public string CompressionType => NoOpCompressionType;

public const string NoOpCompressionType = "uncompressed";

public Task<byte[]> CompressAsync(byte[] rawMessage, CancellationToken cancellationToken)
{
return Task.FromResult(rawMessage);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Threading;
using System.Threading.Tasks;

namespace Motor.Extensions.Compression.Abstractions
{
public class NoOpMessageDecompressor : IMessageDecompressor
{
public string CompressionType => NoOpMessageCompressor.NoOpCompressionType;

public Task<byte[]> DecompressAsync(byte[] compressedMessage, CancellationToken cancellationToken)
{
return Task.FromResult(compressedMessage);
}
}
}
21 changes: 21 additions & 0 deletions src/Motor.Extensions.Compression.Gzip/GzipHostBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Motor.Extensions.Hosting.Abstractions;

namespace Motor.Extensions.Compression.Gzip
{
public static class GzipHostBuilderExtensions
{
public static IPublisherBuilder<TOut> AddGzipCompression<TOut>(this IPublisherBuilder<TOut> hostBuilder)
where TOut : notnull
{
hostBuilder.AddCompressor<GzipMessageCompressor>();
return hostBuilder;
}

public static IConsumerBuilder<TOut> AddGzipDecompression<TOut>(this IConsumerBuilder<TOut> hostBuilder)
where TOut : notnull
{
hostBuilder.AddDecompressor<GzipMessageDecompressor>();
return hostBuilder;
}
}
}
26 changes: 26 additions & 0 deletions src/Motor.Extensions.Compression.Gzip/GzipMessageCompressor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.IO;
using System.IO.Compression;
using System.Threading;
using System.Threading.Tasks;
using Motor.Extensions.Compression.Abstractions;

namespace Motor.Extensions.Compression.Gzip
{
public class GzipMessageCompressor : IMessageCompressor
{
public string CompressionType => GzipCompressionType;

public const string GzipCompressionType = "gzip";

public async Task<byte[]> CompressAsync(byte[] rawMessage, CancellationToken cancellationToken)
{
await using var compressedStream = new MemoryStream();
//use using with explicit scope to close/flush the GZipStream before using the outputstream
await using (var zipStream = new GZipStream(compressedStream, CompressionMode.Compress))
{
await zipStream.WriteAsync(rawMessage, cancellationToken);
}
return compressedStream.ToArray();
}
}
}
27 changes: 27 additions & 0 deletions src/Motor.Extensions.Compression.Gzip/GzipMessageDecompressor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System.IO;
using System.IO.Compression;
using System.Threading;
using System.Threading.Tasks;
using Motor.Extensions.Compression.Abstractions;

namespace Motor.Extensions.Compression.Gzip
{
public class GzipMessageDecompressor : IMessageDecompressor
{
public string CompressionType => GzipMessageCompressor.GzipCompressionType;

public async Task<byte[]> DecompressAsync(byte[] compressedMessage, CancellationToken cancellationToken)
{
var compressedStream = new MemoryStream(compressedMessage);

var output = new MemoryStream();
//use using with explicit scope to close/flush the GZipStream before using the outputstream
await using (var decompressionStream = new GZipStream(compressedStream, CompressionMode.Decompress))
{
await decompressionStream.CopyToAsync(output, cancellationToken);
}

return output.ToArray();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Motor.Extensions.Compression.Abstractions\Motor.Extensions.Compression.Abstractions.csproj" />
<ProjectReference Include="..\Motor.Extensions.Hosting.Abstractions\Motor.Extensions.Hosting.Abstractions.csproj" />
</ItemGroup>

<Import Project="$(MSBuildThisFileDirectory)../../shared.csproj" />

</Project>
Loading

0 comments on commit 5765e00

Please sign in to comment.