From baaddb9d1a78a71ebc8b36b29c08529d8c8d8757 Mon Sep 17 00:00:00 2001 From: David Jensen Date: Mon, 29 Jan 2024 14:50:17 +0100 Subject: [PATCH] feat: Add Apache Pulsar module --- Directory.Packages.props | 3 +- Testcontainers.sln | 14 +++ src/Testcontainers.Pulsar/.editorconfig | 1 + src/Testcontainers.Pulsar/PulsarBuilder.cs | 113 ++++++++++++++++++ .../PulsarConfiguration.cs | 68 +++++++++++ src/Testcontainers.Pulsar/PulsarContainer.cs | 58 +++++++++ .../Testcontainers.Pulsar.csproj | 14 +++ src/Testcontainers.Pulsar/Usings.cs | 8 ++ .../PulsarContainerTest.cs | 80 +++++++++++++ ...sarContainerWithTokenAuthenticationTest.cs | 82 +++++++++++++ .../Testcontainers.Pulsar.Tests.csproj | 19 +++ tests/Testcontainers.Pulsar.Tests/Usings.cs | 4 + 12 files changed, 463 insertions(+), 1 deletion(-) create mode 100644 src/Testcontainers.Pulsar/.editorconfig create mode 100644 src/Testcontainers.Pulsar/PulsarBuilder.cs create mode 100644 src/Testcontainers.Pulsar/PulsarConfiguration.cs create mode 100644 src/Testcontainers.Pulsar/PulsarContainer.cs create mode 100644 src/Testcontainers.Pulsar/Testcontainers.Pulsar.csproj create mode 100644 src/Testcontainers.Pulsar/Usings.cs create mode 100644 tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs create mode 100644 tests/Testcontainers.Pulsar.Tests/PulsarContainerWithTokenAuthenticationTest.cs create mode 100644 tests/Testcontainers.Pulsar.Tests/Testcontainers.Pulsar.Tests.csproj create mode 100644 tests/Testcontainers.Pulsar.Tests/Usings.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index dd5655c7f..4d07bd71a 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -32,6 +32,7 @@ + @@ -59,4 +60,4 @@ - \ No newline at end of file + diff --git a/Testcontainers.sln b/Testcontainers.sln index 94af5e2c3..999678b2b 100644 --- a/Testcontainers.sln +++ b/Testcontainers.sln @@ -191,6 +191,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Tests", "tes EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.WebDriver.Tests", "tests\Testcontainers.WebDriver.Tests\Testcontainers.WebDriver.Tests.csproj", "{EBA72C3B-57D5-43FF-A5B4-3D55B3B6D4C2}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Pulsar", "src\Testcontainers.Pulsar\Testcontainers.Pulsar.csproj", "{27D46863-65B9-4934-B3C8-2383B217A477}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Pulsar.Tests", "tests\Testcontainers.Pulsar.Tests\Testcontainers.Pulsar.Tests.csproj", "{D05FCB31-793E-43E0-BD6C-077013AE9113}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -556,6 +560,14 @@ Global {EBA72C3B-57D5-43FF-A5B4-3D55B3B6D4C2}.Debug|Any CPU.Build.0 = Debug|Any CPU {EBA72C3B-57D5-43FF-A5B4-3D55B3B6D4C2}.Release|Any CPU.ActiveCfg = Release|Any CPU {EBA72C3B-57D5-43FF-A5B4-3D55B3B6D4C2}.Release|Any CPU.Build.0 = Release|Any CPU + {27D46863-65B9-4934-B3C8-2383B217A477}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {27D46863-65B9-4934-B3C8-2383B217A477}.Debug|Any CPU.Build.0 = Debug|Any CPU + {27D46863-65B9-4934-B3C8-2383B217A477}.Release|Any CPU.ActiveCfg = Release|Any CPU + {27D46863-65B9-4934-B3C8-2383B217A477}.Release|Any CPU.Build.0 = Release|Any CPU + {D05FCB31-793E-43E0-BD6C-077013AE9113}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D05FCB31-793E-43E0-BD6C-077013AE9113}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D05FCB31-793E-43E0-BD6C-077013AE9113}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D05FCB31-793E-43E0-BD6C-077013AE9113}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {5365F780-0E6C-41F0-B1B9-7DC34368F80C} = {673F23AE-7694-4BB9-ABD4-136D6C13634E} @@ -647,5 +659,7 @@ Global {1A1983E6-5297-435F-B467-E8E1F11277D6} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF} {27CDB869-A150-4593-958F-6F26E5391E7C} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF} {EBA72C3B-57D5-43FF-A5B4-3D55B3B6D4C2} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF} + {27D46863-65B9-4934-B3C8-2383B217A477} = {673F23AE-7694-4BB9-ABD4-136D6C13634E} + {D05FCB31-793E-43E0-BD6C-077013AE9113} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF} EndGlobalSection EndGlobal diff --git a/src/Testcontainers.Pulsar/.editorconfig b/src/Testcontainers.Pulsar/.editorconfig new file mode 100644 index 000000000..6f066619d --- /dev/null +++ b/src/Testcontainers.Pulsar/.editorconfig @@ -0,0 +1 @@ +root = true \ No newline at end of file diff --git a/src/Testcontainers.Pulsar/PulsarBuilder.cs b/src/Testcontainers.Pulsar/PulsarBuilder.cs new file mode 100644 index 000000000..3480e3ba7 --- /dev/null +++ b/src/Testcontainers.Pulsar/PulsarBuilder.cs @@ -0,0 +1,113 @@ +using System.Text; +using Testcontainers.Pulsar; + +/// +[PublicAPI] +public sealed class PulsarBuilder : ContainerBuilder +{ + private const string AuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; + private const string SecretKeyPath = "/pulsar/secret.key"; + private const string UserName = "test-user"; + private const string PulsarImage = "apachepulsar/pulsar:3.0.2"; + private const string AdminClustersEndpoint = "/admin/v2/clusters"; + internal const string Enabled = "Enabled"; + + private Dictionary _environmentVariables = new Dictionary + { + { "PULSAR_PREFIX_tokenSecretKey", $"file://{SecretKeyPath}" }, + { "PULSAR_PREFIX_authenticationRefreshCheckSeconds", "5" }, + { "superUserRoles", UserName }, + { "authenticationEnabled", "true" }, + { "authorizationEnabled", "true" }, + { "authenticationProviders", "org.apache.pulsar.broker.authentication.AuthenticationProviderToken" }, + { "authenticateOriginalAuthData", "false" }, + { "brokerClientAuthenticationPlugin", AuthenticationPlugin }, + { "CLIENT_PREFIX_authPlugin", AuthenticationPlugin } + }; + + public const ushort PulsarBrokerPort = 6650; + public const ushort PulsarBrokerHttpPort = 8080; + + /// + /// Initializes a new instance of the class. + /// + public PulsarBuilder() + : this(new PulsarConfiguration()) + { + DockerResourceConfiguration = Init().DockerResourceConfiguration; + } + + /// + /// Initializes a new instance of the class. + /// + /// The Docker resource configuration. + private PulsarBuilder(PulsarConfiguration resourceConfiguration) + : base(resourceConfiguration) + { + DockerResourceConfiguration = resourceConfiguration; + } + + /// + protected override PulsarConfiguration DockerResourceConfiguration { get; } + + /// + public override PulsarContainer Build() + { + Validate(); + var pulsarStartupCommands = String.Empty; + if (DockerResourceConfiguration.Authentication == Enabled) + { + pulsarStartupCommands = $"bin/pulsar tokens create-secret-key --output {SecretKeyPath} && " + + $"export brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create --secret-key {SecretKeyPath} --subject {UserName}) && " + + $"export CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && bin/apply-config-from-env.py conf/standalone.conf && " + + $"bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ conf/client.conf && "; + } + pulsarStartupCommands += "bin/pulsar standalone"; + + if (DockerResourceConfiguration.Functions != Enabled) + pulsarStartupCommands += " --no-functions-worker"; + + var pulsarBuilder = WithCommand("/bin/bash", "-c",pulsarStartupCommands); + return new PulsarContainer(pulsarBuilder.DockerResourceConfiguration, TestcontainersSettings.Logger); + } + + /// + protected override PulsarBuilder Init() + { + return base.Init() + .WithImage(PulsarImage) + .WithPortBinding(PulsarBrokerPort, true) + .WithPortBinding(PulsarBrokerHttpPort, true) + .WithWaitStrategy(Wait.ForUnixContainer() + .UntilCommandIsCompleted(["/bin/bash", "-c", "bin/pulsar-admin clusters list"])); + } + + public PulsarBuilder WithTokenAuthentication() + { + return Merge(DockerResourceConfiguration, new PulsarConfiguration(authentication: Enabled)) + .WithEnvironment(_environmentVariables); + } + + public PulsarBuilder WithFunctions() + { + return Merge(DockerResourceConfiguration, new PulsarConfiguration(functions: Enabled)); + } + + /// + protected override PulsarBuilder Clone(IResourceConfiguration resourceConfiguration) + { + return Merge(DockerResourceConfiguration, new PulsarConfiguration(resourceConfiguration)); + } + + /// + protected override PulsarBuilder Clone(IContainerConfiguration resourceConfiguration) + { + return Merge(DockerResourceConfiguration, new PulsarConfiguration(resourceConfiguration)); + } + + /// + protected override PulsarBuilder Merge(PulsarConfiguration oldValue, PulsarConfiguration newValue) + { + return new PulsarBuilder(new PulsarConfiguration(oldValue, newValue)); + } +} \ No newline at end of file diff --git a/src/Testcontainers.Pulsar/PulsarConfiguration.cs b/src/Testcontainers.Pulsar/PulsarConfiguration.cs new file mode 100644 index 000000000..87e591aa1 --- /dev/null +++ b/src/Testcontainers.Pulsar/PulsarConfiguration.cs @@ -0,0 +1,68 @@ +namespace Testcontainers.Pulsar; + +/// +[PublicAPI] +public sealed class PulsarConfiguration : ContainerConfiguration +{ + /// + /// Initializes a new instance of the class. + /// + public PulsarConfiguration(string authentication = null, + string functions = null) + { + Authentication = authentication; + Functions = functions; + } + + /// + /// Initializes a new instance of the class. + /// + /// The Docker resource configuration. + public PulsarConfiguration(IResourceConfiguration resourceConfiguration) + : base(resourceConfiguration) + { + // Passes the configuration upwards to the base implementations to create an updated immutable copy. + } + + /// + /// Initializes a new instance of the class. + /// + /// The Docker resource configuration. + public PulsarConfiguration(IContainerConfiguration resourceConfiguration) + : base(resourceConfiguration) + { + // Passes the configuration upwards to the base implementations to create an updated immutable copy. + } + + /// + /// Initializes a new instance of the class. + /// + /// The Docker resource configuration. + public PulsarConfiguration(PulsarConfiguration resourceConfiguration) + : this(new PulsarConfiguration(), resourceConfiguration) + { + // Passes the configuration upwards to the base implementations to create an updated immutable copy. + } + + /// + /// Initializes a new instance of the class. + /// + /// The old Docker resource configuration. + /// The new Docker resource configuration. + public PulsarConfiguration(PulsarConfiguration oldValue, PulsarConfiguration newValue) + : base(oldValue, newValue) + { + Authentication = BuildConfiguration.Combine(oldValue.Authentication, newValue.Authentication); + Functions = BuildConfiguration.Combine(oldValue.Functions, newValue.Functions); + } + + /// + /// Gets authentication. + /// + public string Authentication { get; } + + /// + /// Gets functions. + /// + public string Functions { get; } +} \ No newline at end of file diff --git a/src/Testcontainers.Pulsar/PulsarContainer.cs b/src/Testcontainers.Pulsar/PulsarContainer.cs new file mode 100644 index 000000000..a91ad7490 --- /dev/null +++ b/src/Testcontainers.Pulsar/PulsarContainer.cs @@ -0,0 +1,58 @@ +namespace Testcontainers.Pulsar; + +/// +[PublicAPI] +public sealed class PulsarContainer : DockerContainer +{ + private readonly PulsarConfiguration _configuration; + + /// + /// Initializes a new instance of the class. + /// + /// The container configuration. + /// The logger. + public PulsarContainer(PulsarConfiguration configuration, ILogger logger) + : base(configuration, logger) + { + _configuration = configuration; + } + + /// + /// Gets the Pulsar broker url. + /// + /// The Pulsar broker url. + public string GetPulsarBrokerUrl() => + new UriBuilder("pulsar://", Hostname, GetMappedPublicPort(PulsarBuilder.PulsarBrokerPort)).ToString(); + + /// + /// Gets the Pulsar service url. + /// + /// The Pulsar service url. + public string GetHttpServiceUrl() => + new UriBuilder("http", Hostname, GetMappedPublicPort(PulsarBuilder.PulsarBrokerHttpPort)).ToString(); + + /// + /// Creates Authentication token + /// + /// Relative expiry time for the token (eg: 1h, 3d, 10y) + /// + /// Authentication token + /// + public async Task CreateToken(TimeSpan expiryTime, CancellationToken cancellationToken = default) + { + if (_configuration.Authentication != PulsarBuilder.Enabled) + throw new Exception($"Could not create the token, because WithAuthentication is not used."); + + var arguments = $"bin/pulsar tokens create --secret-key /pulsar/secret.key --subject test-user"; + + if (expiryTime != Timeout.InfiniteTimeSpan) + arguments += $" --expiry-time {expiryTime.TotalSeconds}s"; + + var result = await ExecAsync(new[] { "/bin/bash", "-c", arguments }, cancellationToken); + + if (result.ExitCode != 0) + throw new Exception($"Could not create the token: {result.Stderr}"); + + return result.Stdout; + } +} \ No newline at end of file diff --git a/src/Testcontainers.Pulsar/Testcontainers.Pulsar.csproj b/src/Testcontainers.Pulsar/Testcontainers.Pulsar.csproj new file mode 100644 index 000000000..22c201030 --- /dev/null +++ b/src/Testcontainers.Pulsar/Testcontainers.Pulsar.csproj @@ -0,0 +1,14 @@ + + + enable + enable + netstandard2.0;netstandard2.1 + latest + + + + + + + + \ No newline at end of file diff --git a/src/Testcontainers.Pulsar/Usings.cs b/src/Testcontainers.Pulsar/Usings.cs new file mode 100644 index 000000000..ca872e8d7 --- /dev/null +++ b/src/Testcontainers.Pulsar/Usings.cs @@ -0,0 +1,8 @@ +global using System; +global using Docker.DotNet.Models; +global using DotNet.Testcontainers; +global using DotNet.Testcontainers.Builders; +global using DotNet.Testcontainers.Configurations; +global using DotNet.Testcontainers.Containers; +global using JetBrains.Annotations; +global using Microsoft.Extensions.Logging; diff --git a/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs b/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs new file mode 100644 index 000000000..85649a982 --- /dev/null +++ b/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs @@ -0,0 +1,80 @@ +using System.Collections.Generic; +using System.Threading; +using DotPulsar; +using DotPulsar.Abstractions; +using DotPulsar.Extensions; +using Xunit.Abstractions; + +namespace Testcontainers.Pulsar.Tests; + +public sealed class PulsarContainerTest : IAsyncLifetime +{ + private readonly CancellationTokenSource _cts; + private readonly PulsarContainer _pulsarContainer; + private readonly ITestOutputHelper _testOutputHelper; + + public PulsarContainerTest(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + _pulsarContainer = new PulsarBuilder().Build(); + _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); + } + + public Task InitializeAsync() + { + return _pulsarContainer.StartAsync(); + } + + public Task DisposeAsync() + { + return _pulsarContainer.DisposeAsync().AsTask(); + } + + [Fact] + [Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))] + public async Task PulsarContainer_WhenBrokerIsStarted_ShouldConnect() + { + // Given + await using var client = CreateClient(); + var expected = new List { MessageId.Earliest }; + await using var reader = CreateReader(client, MessageId.Earliest, await CreateTopic(_cts.Token)); + + // When + var actual = await reader.GetLastMessageIds(_cts.Token); + + // Then + Assert.Equal(expected,actual); + } + + private IReader CreateReader(IPulsarClient pulsarClient, MessageId messageId, string topicName) + => pulsarClient.NewReader(Schema.String) + .StartMessageId(messageId) + .Topic(topicName) + .Create(); + + private static string CreateTopicName() => $"persistent://public/default/{Guid.NewGuid():N}"; + + private async Task CreateTopic(string topic, CancellationToken cancellationToken) + { + var arguments = $"bin/pulsar-admin topics create {topic}"; + + var result = await _pulsarContainer.ExecAsync(new[] { "/bin/bash", "-c", arguments }, cancellationToken); + + if (result.ExitCode != 0) + throw new Exception($"Could not create the topic: {result.Stderr}"); + } + + private async Task CreateTopic(CancellationToken cancellationToken) + { + var topic = CreateTopicName(); + await CreateTopic(topic, cancellationToken); + return topic; + } + + private IPulsarClient CreateClient() + => PulsarClient + .Builder() + .ExceptionHandler(context => _testOutputHelper.WriteLine($"PulsarClient got an exception: {context.Exception}")) + .ServiceUrl(new Uri(_pulsarContainer.GetPulsarBrokerUrl())) + .Build(); +} diff --git a/tests/Testcontainers.Pulsar.Tests/PulsarContainerWithTokenAuthenticationTest.cs b/tests/Testcontainers.Pulsar.Tests/PulsarContainerWithTokenAuthenticationTest.cs new file mode 100644 index 000000000..ced220890 --- /dev/null +++ b/tests/Testcontainers.Pulsar.Tests/PulsarContainerWithTokenAuthenticationTest.cs @@ -0,0 +1,82 @@ +using System.Collections.Generic; +using System.Threading; +using DotPulsar; +using DotPulsar.Abstractions; +using DotPulsar.Extensions; +using DotPulsar.Internal; +using Xunit.Abstractions; + +namespace Testcontainers.Pulsar.Tests; + +public sealed class PulsarContainerWithTokenAuthenticationTest : IAsyncLifetime +{ + private readonly CancellationTokenSource _cts; + private readonly PulsarContainer _pulsarContainer; + private readonly ITestOutputHelper _testOutputHelper; + + public PulsarContainerWithTokenAuthenticationTest(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + _pulsarContainer = new PulsarBuilder().WithTokenAuthentication().WithFunctions().Build(); + _cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); + } + + public Task InitializeAsync() + { + return _pulsarContainer.StartAsync(); + } + + public Task DisposeAsync() + { + return _pulsarContainer.DisposeAsync().AsTask(); + } + + [Fact] + [Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))] + public async Task PulsarContainer_WhenBrokerWithTokenAuthenticationIsStarted_ShouldConnect() + { + // Given + await using var client = CreateClient(); + var expected = new List { MessageId.Earliest }; + await using var reader = CreateReader(client, MessageId.Earliest, await CreateTopic(_cts.Token)); + + // When + var actual = await reader.GetLastMessageIds(_cts.Token); + + // Then + Assert.Equal(expected,actual); + } + + private IReader CreateReader(IPulsarClient pulsarClient, MessageId messageId, string topicName) + => pulsarClient.NewReader(Schema.String) + .StartMessageId(messageId) + .Topic(topicName) + .Create(); + + private static string CreateTopicName() => $"persistent://public/default/{Guid.NewGuid():N}"; + + private async Task CreateTopic(string topic, CancellationToken cancellationToken) + { + var arguments = $"bin/pulsar-admin topics create {topic}"; + + var result = await _pulsarContainer.ExecAsync(new[] { "/bin/bash", "-c", arguments }, cancellationToken); + + if (result.ExitCode != 0) + throw new Exception($"Could not create the topic: {result.Stderr}"); + } + + private async Task CreateTopic(CancellationToken cancellationToken) + { + var topic = CreateTopicName(); + await CreateTopic(topic, cancellationToken); + return topic; + } + + private IPulsarClient CreateClient() + => PulsarClient + .Builder() + .Authentication(new TokenAuthentication(_pulsarContainer.CreateToken(Timeout.InfiniteTimeSpan).Result)) + .ExceptionHandler(context => _testOutputHelper.WriteLine($"PulsarClient got an exception: {context.Exception}")) + .ServiceUrl(new Uri(_pulsarContainer.GetPulsarBrokerUrl())) + .Build(); +} diff --git a/tests/Testcontainers.Pulsar.Tests/Testcontainers.Pulsar.Tests.csproj b/tests/Testcontainers.Pulsar.Tests/Testcontainers.Pulsar.Tests.csproj new file mode 100644 index 000000000..e0f99e2c4 --- /dev/null +++ b/tests/Testcontainers.Pulsar.Tests/Testcontainers.Pulsar.Tests.csproj @@ -0,0 +1,19 @@ + + + net8.0 + false + false + true + + + + + + + + + + + + + diff --git a/tests/Testcontainers.Pulsar.Tests/Usings.cs b/tests/Testcontainers.Pulsar.Tests/Usings.cs new file mode 100644 index 000000000..76e137860 --- /dev/null +++ b/tests/Testcontainers.Pulsar.Tests/Usings.cs @@ -0,0 +1,4 @@ +global using DotNet.Testcontainers.Commons; +global using Xunit; +global using System; +global using System.Threading.Tasks;