Skip to content

Commit

Permalink
Initial Kafka Messaging Transport. Closes GH-390
Browse files Browse the repository at this point in the history
Barebones KafkaTransport

Roughed in more of the Kafka transport pieces

Roughed in Kafka compliance tests

All the compliance tests for kafka are working

tests for the stateful resource model in kafka

Basics of the Kafka transport are done

Last organzation and XML API comments on Kafka code
  • Loading branch information
jeremydmiller committed Oct 18, 2023
1 parent d7ba191 commit 94fd060
Show file tree
Hide file tree
Showing 27 changed files with 1,109 additions and 13 deletions.
1 change: 1 addition & 0 deletions build/build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ private static void Main(string[] args)
"./src/Transports/Azure/Wolverine.AzureServiceBus",
"./src/Transports/AWS/Wolverine.AmazonSqs",
"./src/Transports/MQTT/Wolverine.MQTT",
"./src/Transports/Kafka/Wolverine.Kafka",
"./src/Persistence/Wolverine.RDBMS",
"./src/Persistence/Wolverine.Postgresql",
"./src/Persistence/Wolverine.EntityFrameworkCore",
Expand Down
29 changes: 28 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
version: '3'

networks:
app-tier:
driver: bridge

services:
postgresql:
image: "postgres:latest"
Expand Down Expand Up @@ -43,4 +48,26 @@ services:
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
- "/var/run/docker.sock:/var/run/docker.sock"

zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public virtual async Task can_apply_requeue_mechanics()
var session = await theSender.TrackActivity(Fixture.DefaultTimeout)
.AlsoTrack(theReceiver)
.DoNotAssertOnExceptionsDetected()
.Timeout(60.Seconds())
.Timeout(30.Seconds())
.ExecuteAndWaitAsync(c => c.EndpointFor(theOutboundAddress).SendAsync(new Message2()));

session.FindSingleTrackedMessageOfType<Message2>(MessageEventType.MessageSucceeded)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Wolverine.AmazonSqs.Internal;

internal class SqsSenderProtocol : ISenderProtocol
internal class SqsSenderProtocol :ISenderProtocol
{
private readonly ILogger _logger;
private readonly AmazonSqsQueue _queue;
Expand Down
35 changes: 35 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using Shouldly;
using Wolverine.Configuration;
using Wolverine.Kafka.Internals;

namespace Wolverine.Kafka.Tests;

public class KafkaTransportTests
{
[Theory]
[InlineData("kafka://topic/one", "one")]
[InlineData("kafka://topic/one.two", "one.two")]
[InlineData("kafka://topic/one.two/", "one.two")]
[InlineData("kafka://topic/one.two.three", "one.two.three")]
public void get_topic_name_from_uri(string uriString, string expected)
{
KafkaTopic.TopicNameForUri(new Uri(uriString))
.ShouldBe(expected);
}

[Fact]
public void build_uri_for_endpoint()
{
var transport = new KafkaTransport();
new KafkaTopic(transport, "one.two", EndpointRole.Application)
.Uri.ShouldBe(new Uri("Kafka://topic/one.two"));
}

[Fact]
public void endpoint_name_is_topic_name()
{
var transport = new KafkaTransport();
new KafkaTopic(transport, "one.two", EndpointRole.Application)
.EndpointName.ShouldBe("one.two");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using Xunit;

[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly)]
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Oakton;
using Shouldly;

namespace Wolverine.Kafka.Tests;

public class StatefulResourceSmokeTests
{
private IHostBuilder ConfigureBuilder(bool autoProvision, int starting = 1)
{
return Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
if (autoProvision)
{
opts.UseKafka("localhost:29092").AutoProvision();
}
else
{
opts.UseKafka("localhost:29092");;
}

opts.PublishMessage<SRMessage1>()
.ToKafkaTopic("sr" + starting++);

opts.PublishMessage<SRMessage2>()
.ToKafkaTopic("sr" + starting++);

opts.PublishMessage<SRMessage3>()
.ToKafkaTopic("sr" + starting++);

opts.PublishMessage<SRMessage3>()
.ToKafkaTopic("sr" + starting++);
});
}

[Fact]
public async Task run_setup()
{
var result = await ConfigureBuilder(false)
.RunOaktonCommands(new[] { "resources", "setup" });
result.ShouldBe(0);
}


[Fact]
public async Task statistics()
{
(await ConfigureBuilder(false)
.RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0);

var result = await ConfigureBuilder(false)
.RunOaktonCommands(new[] { "resources", "statistics" });

result.ShouldBe(0);
}

[Fact]
public async Task check_positive()
{
(await ConfigureBuilder(false)
.RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0);

var result = await ConfigureBuilder(false)
.RunOaktonCommands(new[] { "resources", "check" });

result.ShouldBe(0);
}

// [Fact]
// public async Task check_negative()
// {
// var result = await ConfigureBuilder(false, 10)
// .RunOaktonCommands(new[] { "resources", "check" });
//
// result.ShouldBe(1);
// }

[Fact]
public async Task clear_state()
{
(await ConfigureBuilder(false, 20)
.RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0);

(await ConfigureBuilder(false, 20)
.RunOaktonCommands(new[] { "resources", "clear" })).ShouldBe(0);
}

[Fact]
public async Task teardown()
{
(await ConfigureBuilder(false, 30)
.RunOaktonCommands(new[] { "resources", "setup" })).ShouldBe(0);

(await ConfigureBuilder(false, 30)
.RunOaktonCommands(new[] { "resources", "teardown" })).ShouldBe(0);
}
}

public class SRMessage1
{
}

public class SRMessage2
{
}

public class SRMessage3
{
}

public class SRMessage4
{
}

public class SRMessageHandlers
{
public Task Handle(SRMessage1 message)
{
return Task.Delay(100.Milliseconds());
}

public Task Handle(SRMessage2 message)
{
return Task.Delay(100.Milliseconds());
}

public Task Handle(SRMessage3 message)
{
return Task.Delay(100.Milliseconds());
}

public Task Handle(SRMessage4 message)
{
return Task.Delay(100.Milliseconds());
}
}
9 changes: 0 additions & 9 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/UnitTest1.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System.Diagnostics;
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Oakton.Resources;
using Shouldly;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace Wolverine.Kafka.Tests;

public class broadcast_to_topic_async : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private IHost _sender;
private IHost _receiver;

public broadcast_to_topic_async(ITestOutputHelper output)
{
_output = output;
}

public async Task InitializeAsync()
{

_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:29092").AutoProvision();
opts.Policies.DisableConventionalLocalRouting();

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:29092").AutoProvision();
opts.ListenToKafkaTopic("incoming.one");

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

}

[Fact]
public async Task broadcast()
{
var session = await _sender.TrackActivity()
.AlsoTrack(_receiver)
.Timeout(30.Seconds())
.ExecuteAndWaitAsync(m => m.BroadcastToTopicAsync("incoming.one", new ColorMessage("blue")));

var received = session.Received.SingleMessage<ColorMessage>();
received.Color.ShouldBe("blue");
}


public async Task DisposeAsync()
{
await _sender.StopAsync();
await _receiver.StopAsync();
}
}

public class ColorMessage
{
public ColorMessage()
{
}

public ColorMessage(string color)
{
Color = color;
}

public string Color { get; set; }
}

public static class ColorMessageHandler
{
public static void Handle(ColorMessage message)
{
Debug.WriteLine("Got " + message.Color);
}
}
Loading

0 comments on commit 94fd060

Please sign in to comment.