Skip to content

Commit

Permalink
Update to CK1.0 RC3; use CK CompressionType (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Apr 2, 2019
1 parent dac8a0b commit b5541fb
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Removed
### Fixed

<a name="1.0.0-rc2"></a>
## [1.0.0-rc2] - 2019-04-02

### Changed

- Updated to target `Confluent.Kafka 1.0.0-RC3` [#24](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/24)

<a name="1.0.0-rc1"></a>
## [1.0.0-rc1] - 2019-03-27

Expand Down Expand Up @@ -43,7 +50,8 @@ The `Unreleased` section name is replaced by the expected version of next releas

(Stripped down repo for history purposes, see [`v0` branch](tree/v0) for implementation targeting `Confluent.Kafka` v `0.9.4`)

[Unreleased]: https://github.com/jet/Jet.ConfluentKafka.FSharp/compare/1.0.0-rc1...HEAD
[Unreleased]: https://github.com/jet/Jet.ConfluentKafka.FSharp/compare/1.0.0-rc2...HEAD
[1.0.0-rc2]: https://github.com/jet/Jet.ConfluentKafka.FSharp/compare/1.0.0-rc1...1.0.0-rc2
[1.0.0-rc1]: https://github.com/jet/Jet.ConfluentKafka.FSharp/compare/1.0.0-preview2...1.0.0-rc1
[1.0.0-preview2]: https://github.com/jet/Jet.ConfluentKafka.FSharp/compare/1.0.0-preview1...1.0.0-preview2
[1.0.0-preview1]: https://github.com/jet/Jet.ConfluentKafka.FSharp/compare/1.0.0-bare...1.0.0-preview1
Expand Down
17 changes: 6 additions & 11 deletions src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ module private Config =

else u.Authority

type Compression = Uncompressed | GZip | Snappy | LZ4 // as soon as CK provides such an Enum, this can go

/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings
[<NoComparison>]
type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression : Compression, acks : Acks) =
type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression, acks : Acks) =
member val Conf : ProducerConfig = conf
member val Acks = acks
member val Broker = broker
Expand All @@ -33,8 +31,8 @@ type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression : Compre

/// Creates a Kafka producer instance with supplied configuration
static member Create
( clientId : string, broker : Uri, acks : Acks,
/// Message compression. Defaults to Uncompressed/'none'.
( clientId : string, broker : Uri, acks,
/// Message compression. Defaults to None
?compression,
/// Maximum in-flight requests; <> 1 implies potential reordering of writes should a batch fail and then succeed in a subsequent retry. Defaults to 1.
?maxInFlight,
Expand All @@ -52,23 +50,20 @@ type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression : Compre
?partitioner,
/// Misc configuration parameter to be passed to the underlying CK producer.
?custom) =
let compression = defaultArg compression Uncompressed
let cfgs = seq {
yield KeyValuePair("compression.codec", match compression with Uncompressed -> "none" | GZip -> "gzip" | Snappy -> "snappy" | LZ4 -> "lz4")
match custom with None -> () | Some miscConfig -> yield! miscConfig }
let c =
ProducerConfig(
ClientId = clientId, BootstrapServers = Config.validateBrokerUri broker,
RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000),
MessageSendMaxRetries = Nullable (defaultArg retries 60),
Acks = Nullable acks,
CompressionType = Nullable (defaultArg compression CompressionType.None),
LingerMs = Nullable (match linger with Some t -> int t.TotalMilliseconds | None -> 10),
SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true),
Partitioner = Nullable (defaultArg partitioner Partitioner.ConsistentRandom),
MaxInFlight = Nullable (defaultArg maxInFlight 1000000),
LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
statisticsInterval |> Option.iter (fun (i : TimeSpan) -> c.StatisticsIntervalMs <- Nullable (int i.TotalMilliseconds))
KafkaProducerConfig(c, cfgs, broker, compression, acks)
KafkaProducerConfig(c, defaultArg custom Seq.empty, broker, compression, acks)

type KafkaProducer private (log: ILogger, producer : IProducer<string, string>, topic : string) =
member __.Topic = topic
Expand Down Expand Up @@ -104,7 +99,7 @@ type KafkaProducer private (log: ILogger, producer : IProducer<string, string>,

static member Create(log : ILogger, config : KafkaProducerConfig, topic : string) =
if String.IsNullOrEmpty topic then nullArg "topic"
log.Information("Producing... {broker} / {topic} compression={compression:l} acks={acks}", config.Broker, topic, config.Compression, config.Acks)
log.Information("Producing... {broker} / {topic} compression={compression} acks={acks}", config.Broker, topic, config.Compression, config.Acks)
let producer =
ProducerBuilder<string, string>(config.Kvps)
.SetLogHandler(fun _p m -> log.Information("{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />

<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Confluent.Kafka" Version="[1.0.0-RC2]" />
<PackageReference Include="Confluent.Kafka" Version="[1.0.0-RC3]" />
<PackageReference Include="librdkafka.redist" Version="[1.0.0]" />
<PackageReference Include="Serilog" Version="2.7.1" />
</ItemGroup>
Expand Down

0 comments on commit b5541fb

Please sign in to comment.