Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed no responder issue when producing a message #111

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ public async Task GivenProducerOptions_WhenProduceAsync_ThenMessageIsProduced(
var producer = await client.CreateProducer(producerOptions);

await producer.ProduceAsync(message, _fixture.CommonHeaders);

await producer.DestroyAsync();
await station.DestroyAsync();

Assert.NotNull(station);
Assert.NotNull(producer);
}
Expand Down Expand Up @@ -82,5 +82,33 @@ public async Task GivenProducerOptions_WhenDestroyAsync_ThenProducerIsDestroyed(
Assert.NotNull(producer);
}


// [Theory]
// [InlineData("infinite_st", "infinite_produce", true)]
// public async Task ProduceInfinitely(
// string stationName, string producerName, bool generateUniqueSuffix)
// {
// using var client = await MemphisClientFactory.CreateClient(_fixture.MemphisClientOptions);

// var producerOptions = new MemphisProducerOptions
// {
// StationName = stationName,
// ProducerName = producerName,
// GenerateUniqueSuffix = generateUniqueSuffix
// };
// var producer = await client.CreateProducer(producerOptions);

// int counter = 0;
// while (true)
// {
// string message = $"Hello, World! {counter}";
// await producer.ProduceAsync(message, _fixture.CommonHeaders);
// Console.WriteLine(message);
// await Task.Delay(TimeSpan.FromSeconds(1));
// counter+=1;
// }

// await producer.DestroyAsync();

// Assert.NotNull(producer);
// }
}
153 changes: 110 additions & 43 deletions src/Memphis.Client/MemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Data;
using System.IO;
using System.Text;
using System.Text.RegularExpressions;
Expand Down Expand Up @@ -56,6 +55,10 @@ public sealed class MemphisClient : IMemphisClient
private readonly ConcurrentDictionary<string, bool> _stationSchemaVerseToDlsMap;
private readonly ConcurrentDictionary<string, bool> _clusterConfigurations;


private readonly SemaphoreSlim _schemaUpdateSemaphore = new(1, 1);
private readonly SemaphoreSlim _sdkClientUpdateSemaphore = new(1, 1);

public MemphisClient(Options brokerConnOptions, IConnection brokerConnection,
IJetStream jetStreamContext, string connectionId)
{
Expand Down Expand Up @@ -155,7 +158,10 @@ public async Task<MemphisProducer> CreateProducer(MemphisProducerOptions produce

await ListenForSchemaUpdate(internalStationName, respAsObject.SchemaUpdate);

return new MemphisProducer(this, producerName, stationName, producerName.ToLower());
var producer = new MemphisProducer(this, producerName, stationName, producerName.ToLower());
var producerKey = $"{internalStationName}_{producerName.ToLower()}";
_producerCache.AddOrUpdate(producerKey, producer, (_, _) => producer);
return producer;
}
catch (MemphisException)
{
Expand Down Expand Up @@ -220,12 +226,27 @@ public async Task ProduceAsync(
producer = cacheProducer;
}

if (producer is null)
producer ??= await CreateProducer(options);

await producer.ProduceToBrokerAsync(message, headers, options.MaxAckTimeMs, messageId);
}

internal async Task ProduceAsync(
MemphisProducer producer,
byte[] message,
NameValueCollection headers,
int ackWaitMs,
string? messageId = default)
{
MemphisProducerOptions options = new()
{
producer = await CreateProducer(options);
}
StationName = producer.StationName,
ProducerName = producer.ProducerName,
GenerateUniqueSuffix = false,
MaxAckTimeMs = ackWaitMs
};

await producer.ProduceAsync(message, headers, options.MaxAckTimeMs, messageId);
await ProduceAsync(options, message, headers, messageId);
}

/// <summary>
Expand Down Expand Up @@ -589,7 +610,7 @@ static void HandleSchemaCreationErrorResponse(byte[] responseBytes)
if (!string.IsNullOrWhiteSpace(response.Error))
throw new MemphisException(response.Error);
}
catch(System.Exception e) when (e is not MemphisException)
catch (System.Exception e) when (e is not MemphisException)
{
if (!string.IsNullOrWhiteSpace(responseStr))
throw new MemphisException(responseStr);
Expand Down Expand Up @@ -905,39 +926,67 @@ private void RegisterSchemaValidators()

private async Task ListenForSchemaUpdate(string internalStationName, ProducerSchemaUpdateInit schemaUpdateInit)
{
var schemaUpdateSubject = MemphisSubjects.MEMPHIS_SCHEMA_UPDATE + internalStationName;

if (!string.IsNullOrEmpty(schemaUpdateInit.SchemaName))
try
{
_schemaUpdateDictionary.TryAdd(internalStationName, schemaUpdateInit);
}
await _schemaUpdateSemaphore.WaitAsync();


if (_subscriptionPerSchema.TryGetValue(internalStationName, out ISyncSubscription schemaSub))
{
_producerPerStations.AddOrUpdate(internalStationName, 1, (key, val) => val + 1);
return;
}
var schemaUpdateSubject = MemphisSubjects.MEMPHIS_SCHEMA_UPDATE + internalStationName;

if (!string.IsNullOrEmpty(schemaUpdateInit.SchemaName))
{
_schemaUpdateDictionary.TryAdd(internalStationName, schemaUpdateInit);
}


if (_subscriptionPerSchema.TryGetValue(internalStationName, out ISyncSubscription schemaSub))
{
_producerPerStations.AddOrUpdate(internalStationName, 1, (key, val) => val + 1);
return;
}

var subscription = _brokerConnection.SubscribeSync(schemaUpdateSubject);
var subscription = _brokerConnection.SubscribeSync(schemaUpdateSubject);

if (!_subscriptionPerSchema.TryAdd(internalStationName, subscription))
if (!_subscriptionPerSchema.TryAdd(internalStationName, subscription))
{
throw new MemphisException("Unable to add subscription of schema updates for station");
}

await ProcessAndStoreSchemaUpdate(internalStationName, schemaUpdateInit);

Task.Run(async () =>
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
var schemaUpdateMsg = subscription.NextMessage();
await ProcessAndStoreSchemaUpdate(internalStationName, schemaUpdateMsg);
}
}, _cancellationTokenSource.Token);

_producerPerStations.AddOrUpdate(internalStationName, 1, (key, val) => val + 1);
}
finally
{
throw new MemphisException("Unable to add subscription of schema updates for station");
_schemaUpdateSemaphore.Release();
}
}

await ProcessAndStoreSchemaUpdate(internalStationName, schemaUpdateInit);

Task.Run(async () =>
private async Task RemoveSchemaUpdateListener(string stationName)
{
try
{
while (!_cancellationTokenSource.IsCancellationRequested)
await _schemaUpdateSemaphore.WaitAsync();
var internalStationName = MemphisUtil.GetInternalName(stationName);
if (_subscriptionPerSchema.TryRemove(internalStationName, out ISyncSubscription subscription))
{
var schemaUpdateMsg = subscription.NextMessage();
await ProcessAndStoreSchemaUpdate(internalStationName, schemaUpdateMsg);
subscription.Unsubscribe();
}
}, _cancellationTokenSource.Token);

_producerPerStations.AddOrUpdate(internalStationName, 1, (key, val) => val + 1);
}
finally
{
_schemaUpdateSemaphore.Release();
}
}

internal Task ListenForSdkClientUpdate()
Expand All @@ -956,23 +1005,41 @@ void SyncSdkClientUpdate()
if (updateMsg is null)
continue;
string respAsJson = Encoding.UTF8.GetString(updateMsg.Data);
var sdkClientUpdate =
(SdkClientsUpdate)JsonSerDes.PrepareObjectFromString<SdkClientsUpdate>(respAsJson);

switch (sdkClientUpdate.Type)
SdkClientsUpdate sdkClientUpdate = default;
try
{
sdkClientUpdate = JsonConvert.DeserializeObject<SdkClientsUpdate>(respAsJson);
}
catch (System.Exception exc)
{
throw new MemphisException($"Unable to deserialize sdk client update: {respAsJson}", exc);
}

try
{
_sdkClientUpdateSemaphore.WaitAsync();
bool sdkClientShouldUpdate = sdkClientUpdate.Update ?? false;
switch (sdkClientUpdate.Type)
{
case MemphisSdkClientUpdateTypes.SEND_NOTIFICATION:
_clusterConfigurations.AddOrUpdate(sdkClientUpdate.Type, sdkClientShouldUpdate, (key, _) => sdkClientShouldUpdate);
break;
case MemphisSdkClientUpdateTypes.SCHEMA_VERSE_TO_DLS:
_stationSchemaVerseToDlsMap.AddOrUpdate(sdkClientUpdate.StationName, sdkClientShouldUpdate, (key, _) => sdkClientShouldUpdate);
break;
case MemphisSdkClientUpdateTypes.REMOVE_STATION:
RemoveStationProducers(sdkClientUpdate.StationName);
RemoveStationConsumers(sdkClientUpdate.StationName);
RemoveSchemaUpdateListener(sdkClientUpdate.StationName);
break;
default:
break;
}
}
finally
{
case MemphisSdkClientUpdateTypes.SEND_NOTIFICATION:
_clusterConfigurations.AddOrUpdate(sdkClientUpdate.Type, sdkClientUpdate.Update, (key, _) => sdkClientUpdate.Update);
break;
case MemphisSdkClientUpdateTypes.SCHEMA_VERSE_TO_DLS:
_stationSchemaVerseToDlsMap.AddOrUpdate(sdkClientUpdate.StationName, sdkClientUpdate.Update, (key, _) => sdkClientUpdate.Update);
break;
case MemphisSdkClientUpdateTypes.REMOVE_STATION:
RemoveStationProducers(sdkClientUpdate.StationName);
RemoveStationConsumers(sdkClientUpdate.StationName);
break;
default:
break;
_sdkClientUpdateSemaphore.Release();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Memphis.Client/Models/Response/SdkClientsUpdate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ internal class SdkClientsUpdate
public string Type { get; set; }

[DataMember(Name = "update")]
public bool Update { get; set; }
public bool? Update { get; set; }
}
}
37 changes: 36 additions & 1 deletion src/Memphis.Client/Producer/MemphisProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

namespace Memphis.Client.Producer;

public sealed class MemphisProducer : IMemphisProducer
public sealed class MemphisProducer : IMemphisProducer
{
internal string Key => $"{_internalStationName}_{_realName}";
internal string InternalStationName { get => _internalStationName; }

internal string StationName { get => _stationName; }
internal string ProducerName { get => _producerName; }


private readonly string _realName;
private readonly string _producerName;
private readonly string _stationName;
private readonly string _internalStationName;
private readonly MemphisClient _memphisClient;


public MemphisProducer(MemphisClient memphisClient, string producerName, string stationName, string realName)
{
_realName = realName ?? throw new ArgumentNullException(nameof(realName));
Expand All @@ -47,6 +52,21 @@ public MemphisProducer(MemphisClient memphisClient, string producerName, string
/// <returns></returns>
public async Task ProduceAsync(byte[] message, NameValueCollection headers, int ackWaitMs = 15_000,
string? messageId = default)
{
await _memphisClient.ProduceAsync(this, message, headers, ackWaitMs, messageId);
}


/// <summary>
/// Produce messages into station
/// </summary>
/// <param name="message">message to produce</param>
/// <param name="headers">headers used to send data in the form of key and value</param>
/// <param name="ackWaitMs">duration of time in milliseconds for acknowledgement</param>
/// <param name="messageId">Message ID - for idempotent message production</param>
/// <returns></returns>
internal async Task ProduceToBrokerAsync(byte[] message, NameValueCollection headers, int ackWaitMs = 15_000,
string? messageId = default)
{
await EnsureMessageIsValid(message, headers);

Expand Down Expand Up @@ -86,6 +106,14 @@ public async Task ProduceAsync(byte[] message, NameValueCollection headers, int
throw new MemphisException(publishAck.ErrorDescription);
}
}
catch (NATS.Client.NATSNoRespondersException)
{
/// <summary>
/// This exception is thrown when there are no station available to produce the message.
/// The ReInitializeProducerAndRetry method will try to recreate the producer (which will also create the station) and retry to produce the message.
/// </summary>
await ReInitializeProducerAndRetry(message, headers, ackWaitMs, messageId);
}
catch (MemphisException)
{
throw;
Expand All @@ -94,8 +122,15 @@ public async Task ProduceAsync(byte[] message, NameValueCollection headers, int
{
throw new MemphisException(ex.Message);
}

async Task ReInitializeProducerAndRetry(byte[] message, NameValueCollection headers, int ackWaitMs = 15_000,
string? messageId = default)
{
await _memphisClient.ProduceAsync(this, message, headers, ackWaitMs, messageId);
}
}


/// <summary>
/// Produce messages into station
/// </summary>
Expand Down
Loading