From f1ca3ecaedf4c259902bbe6d04e8c0173a72435e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 12 Apr 2019 10:11:00 +0100 Subject: [PATCH] Fix CompressionType logging --- src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs b/src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs index a4bfd43..f2d8168 100644 --- a/src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs +++ b/src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs @@ -22,7 +22,7 @@ module private Config = /// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings [] -type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression, acks : Acks) = +type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression : CompressionType, acks : Acks) = member val Conf : ProducerConfig = conf member val Acks = acks member val Broker = broker @@ -50,13 +50,14 @@ type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression, acks : ?partitioner, /// Misc configuration parameter to be passed to the underlying CK producer. ?custom) = + let compression = defaultArg compression CompressionType.None let c = ProducerConfig( ClientId = clientId, BootstrapServers = Config.validateBrokerUri broker, RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), MessageSendMaxRetries = Nullable (defaultArg retries 60), Acks = Nullable acks, - CompressionType = Nullable (defaultArg compression CompressionType.None), + CompressionType = Nullable compression, LingerMs = Nullable (match linger with Some t -> int t.TotalMilliseconds | None -> 10), SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true), Partitioner = Nullable (defaultArg partitioner Partitioner.ConsistentRandom),