Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
fix #160
Browse files Browse the repository at this point in the history
  • Loading branch information
eulerfx committed Aug 3, 2017
1 parent e001da3 commit 65ebbee
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 11 deletions.
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 0.1.4 - 03.08.2017

* Fixed `FetchResponse` `MessageTooBigException` when a message set has been compressed. (#160)

### 0.1.4-beta - 25.07.2017

* Fixed v0.10.1 protocol bug for `Offset` API.
Expand Down
4 changes: 2 additions & 2 deletions src/kafunk/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module internal Stream =
use compStream = makeStream inputStream
compStream.CopyTo(outputStream)
let buf = Binary.Segment(outputStream.GetBuffer(), 0, int outputStream.Length)
MessageSet.Read (messageVer, 0, 0s, buf.Count, BinaryZipper(buf))
MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf))

[<Compile(Module)>]
module GZip =
Expand Down Expand Up @@ -151,7 +151,7 @@ module Snappy =

let decompress (messageVer:ApiVersion) (m:Message) =
let buf = CompressedMessage.decompress m.value
MessageSet.Read (messageVer, 0, 0s, buf.Count, BinaryZipper(buf))
MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf))

let compress (messageVer:int16) (compression:byte) (ms:MessageSet) =
match compression with
Expand Down
21 changes: 18 additions & 3 deletions src/kafunk/Protocol.fs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,9 @@ module Protocol =
buf.WriteInt32 x.messageSize
Message.Write (messageVer, x.message, buf)

static member internal Read (messageVer:ApiVersion, partition:Partition, ec:ErrorCode, messageSetSize:int, buf:BinaryZipper) =
// NB: skipTooLarge=true is for scenarios where decompression is involved and a message set is being decoded from an individual message
// which was itself too small.
static member internal Read (messageVer:ApiVersion, partition:Partition, ec:ErrorCode, messageSetSize:int, skipTooLarge:bool, buf:BinaryZipper) =
let mutable consumed = 0
let arr = ResizeArray<_>()
while consumed < messageSetSize && buf.Buffer.Count > 0 do
Expand All @@ -445,7 +447,20 @@ module Protocol =
let (messageSize:MessageSize) = buf.ReadInt32 ()
let messageSetRemainder = messageSetRemainder - 12 // (Offset + MessageSize)
if messageSize > messageSetSize then
raise (MessageTooBigException(sprintf "partition=%i offset=%i message_set_size=%i message_size=%i" partition offset messageSetSize messageSize))
let errMsg = sprintf "partition=%i offset=%i message_set_size=%i message_size=%i consumed_bytes=%i consumed_count=%i"
partition offset messageSetSize messageSize consumed arr.Count
if not skipTooLarge then
raise (MessageTooBigException(errMsg))
else
// let payload = Binary.toString buf.Buffer
// printfn "|WARN|MessageTooBig|%s" errMsg
// printfn "|WARN|MessageTooBig|payload=%s" payload
// try
// let message = Message.Read (messageVer,buf)
// printfn "|WARN|MessageTooBig|payload=%s" (Binary.toString message.value)
// with ex ->
// printfn "ERROR DECODING MESSAGE|%O" ex
()
try
if messageSetRemainder >= messageSize && buf.Buffer.Count >= messageSize then
let message = Message.Read (messageVer,buf)
Expand Down Expand Up @@ -740,7 +755,7 @@ module Protocol =
let errorCode = buf.ReadInt16 ()
let hwo = buf.ReadInt64 ()
let mss = buf.ReadInt32 ()
let ms = MessageSet.Read (MessageVersions.fetchResMessage ver,partition,errorCode,mss,buf)
let ms = MessageSet.Read (MessageVersions.fetchResMessage ver,partition,errorCode,mss,false,buf)
ps.[j] <- partition, errorCode, hwo, mss, ms
topics.[i] <- (t,ps)
let res = FetchResponse(throttleTime, topics)
Expand Down
12 changes: 6 additions & 6 deletions tests/kafunk.Tests/Consumer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ open FSharp.Control
open Kafunk
open System

Log.MinLevel <- LogLevel.Trace
//Log.MinLevel <- LogLevel.Trace
let Log = Log.create __SOURCE_FILE__

let argiDefault i def = fsi.CommandLineArgs |> Seq.tryItem i |> Option.getOr def
Expand All @@ -20,18 +20,18 @@ let go = async {
let connConfig =
let chanConfig =
ChanConfig.create (
requestTimeout = TimeSpan.FromSeconds 30.0,
receiveBufferSize = 8192 * 20,
sendBufferSize = 8192 * 10,
requestTimeout = TimeSpan.FromSeconds 60.0,
receiveBufferSize = 8192 * 50,
sendBufferSize = 8192 * 50,
connectRetryPolicy = ChanConfig.DefaultConnectRetryPolicy,
requestRetryPolicy = ChanConfig.DefaultRequestRetryPolicy)
KafkaConfig.create (
[KafkaUri.parse host],
//[KafkaUri.parse "localhost:9092" ; KafkaUri.parse "localhost:9093" ; KafkaUri.parse "localhost:9094"],
tcpConfig = chanConfig,
requestRetryPolicy = KafkaConfig.DefaultRequestRetryPolicy,
version = Versions.V_0_10_1,
autoApiVersions = true)
version = Versions.V_0_9_0,
autoApiVersions = false)
Kafka.connAsync connConfig
let consumerConfig =
ConsumerConfig.create (
Expand Down

0 comments on commit 65ebbee

Please sign in to comment.