Skip to content

Commit

Permalink
Resolved producer cache #127, and producer handle single partition st…
Browse files Browse the repository at this point in the history
…ation #128
  • Loading branch information
tbazen committed Aug 17, 2023
1 parent 2efbc36 commit 73ad457
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
20 changes: 14 additions & 6 deletions src/Memphis.Client/MemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public async Task<MemphisProducer> CreateProducer(MemphisProducerOptions produce
string stationName = producerOptions.StationName;
string producerName = producerOptions.ProducerName.ToLower();
bool generateRandomSuffix = producerOptions.GenerateUniqueSuffix;
string internalStationName = MemphisUtil.GetInternalName(stationName);

if (_brokerConnection.IsClosed())
{
Expand All @@ -131,6 +132,14 @@ public async Task<MemphisProducer> CreateProducer(MemphisProducerOptions produce
{
producerName = $"{producerName}_{MemphisUtil.GetUniqueKey(8)}";
}
else
{
var producerCacheKey = $"{internalStationName}_{producerName}";
if (_producerCache.TryGetValue(producerCacheKey, out MemphisProducer producer))
{
return producer;
}
}

try
{
Expand Down Expand Up @@ -158,14 +167,13 @@ public async Task<MemphisProducer> CreateProducer(MemphisProducerOptions produce
throw new MemphisException(createProducerResponse.Error);
}

string internalStationName = MemphisUtil.GetInternalName(stationName);

_stationSchemaVerseToDlsMap.AddOrUpdate(internalStationName, createProducerResponse.SchemaVerseToDls, (_, _) => createProducerResponse.SchemaVerseToDls);
_clusterConfigurations.AddOrUpdate(MemphisSdkClientUpdateTypes.SEND_NOTIFICATION, createProducerResponse.SendNotification, (_, _) => createProducerResponse.SendNotification);

await ListenForSchemaUpdate(internalStationName, createProducerResponse.SchemaUpdate);

if(createProducerResponse.PartitionsUpdate is not null)
if (createProducerResponse.PartitionsUpdate is not null)
{
_stationPartitions.AddOrUpdate(internalStationName, createProducerResponse.PartitionsUpdate, (_, _) => createProducerResponse.PartitionsUpdate);
}
Expand All @@ -176,14 +184,14 @@ public async Task<MemphisProducer> CreateProducer(MemphisProducerOptions produce
if (partitionsUpdate.PartitionsList == null)
{
producer.PartitionResolver = new(1);
}
else
}
else
{
producer.PartitionResolver = new(partitionsUpdate.PartitionsList);
}
}
var producerKey = $"{internalStationName}_{producerName.ToLower()}";
_producerCache.AddOrUpdate(producerKey, producer, (_, _) => producer);
var producerCacheKey = $"{internalStationName}_{producerName.ToLower()}";
_producerCache.AddOrUpdate(producerCacheKey, producer, (_, _) => producer);
return producer;
}
catch (MemphisException)
Expand Down
17 changes: 9 additions & 8 deletions src/Memphis.Client/Producer/MemphisProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,18 @@ internal async Task ProduceToBrokerAsync(byte[] message, NameValueCollection hea
await EnsureMessageIsValid(message, headers);

string streamName = _internalStationName;
if(_memphisClient.StationPartitions.TryGetValue(_stationName, out var partitions))
if (_memphisClient.StationPartitions.TryGetValue(_stationName, out var partitions))
{
if (partitions != null)
if (partitions != null && partitions.PartitionsList != null)
{
if (partitions.PartitionsList != null)
if(partitions.PartitionsList.Length == 1)
{
if (partitions.PartitionsList.Length > 0)
{
var partition = PartitionResolver.Resolve();
streamName = $"{streamName}${partition}";
}
streamName = $"{_internalStationName}${partitions.PartitionsList[0]}";
}
else if (partitions.PartitionsList.Length > 1)
{
var partition = PartitionResolver.Resolve();
streamName = $"{streamName}${partition}";
}
}
}
Expand Down

0 comments on commit 73ad457

Please sign in to comment.