From a3f8b095b2137f1fc949223c9128634d31e77009 Mon Sep 17 00:00:00 2001 From: shay23b Date: Tue, 15 Aug 2023 14:16:58 +0300 Subject: [PATCH] fix producer to old station --- src/Memphis.Client/MemphisClient.cs | 9 ++++++++- src/Memphis.Client/Producer/MemphisProducer.cs | 15 ++++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/Memphis.Client/MemphisClient.cs b/src/Memphis.Client/MemphisClient.cs index aa6ac73..6adde56 100644 --- a/src/Memphis.Client/MemphisClient.cs +++ b/src/Memphis.Client/MemphisClient.cs @@ -173,7 +173,14 @@ public async Task CreateProducer(MemphisProducerOptions produce var producer = new MemphisProducer(this, producerName, stationName, producerName.ToLower()); if (_stationPartitions.TryGetValue(internalStationName, out PartitionsUpdate partitionsUpdate)) { - producer.PartitionResolver = new(partitionsUpdate.PartitionsList); + if (partitionsUpdate.PartitionsList == null) + { + producer.PartitionResolver = new(1); + } + else + { + producer.PartitionResolver = new(partitionsUpdate.PartitionsList); + } } var producerKey = $"{internalStationName}_{producerName.ToLower()}"; _producerCache.AddOrUpdate(producerKey, producer, (_, _) => producer); diff --git a/src/Memphis.Client/Producer/MemphisProducer.cs b/src/Memphis.Client/Producer/MemphisProducer.cs index 69583d3..2ac2abe 100644 --- a/src/Memphis.Client/Producer/MemphisProducer.cs +++ b/src/Memphis.Client/Producer/MemphisProducer.cs @@ -75,10 +75,19 @@ internal async Task ProduceToBrokerAsync(byte[] message, NameValueCollection hea await EnsureMessageIsValid(message, headers); string streamName = _internalStationName; - if(_memphisClient.StationPartitions.TryGetValue(_stationName, out var partitions) && partitions.PartitionsList.Length > 0) + if(_memphisClient.StationPartitions.TryGetValue(_stationName, out var partitions)) { - var partition = PartitionResolver.Resolve(); - streamName = $"{streamName}${partition}"; + if (partitions != null) + { + if (partitions.PartitionsList != null) + { + if (partitions.PartitionsList.Length > 0) + { + var partition = PartitionResolver.Resolve(); + streamName = $"{streamName}${partition}"; + } + } + } } var msg = new Msg