Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
fix partial decompressed record batch response
Browse files Browse the repository at this point in the history
  • Loading branch information
eulerfx committed Mar 19, 2018
1 parent 859eaad commit db40171
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions src/kafunk/Protocol.fs
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,7 @@ module Protocol =
MessageSet.WriteRecords (ms,BinaryZipper(value)) |> ignore
value
let compressedValue = CompressionCodec.compress compression value
if compressedValue.Count > value.Count then
//failwithf "compressed_value_larged|compressed=%i uncompressed=%i" compressedValue.Count value.Count
if compressedValue.Count > value.Count then
compression <- CompressionCodec.None
MessageSet.WriteRecords (ms,buf)
else
Expand Down Expand Up @@ -742,7 +741,7 @@ module Protocol =
let offsetDelta = buf.ReadVarint()
let key = buf.ReadVarintBytes()
let value = buf.ReadVarintBytes()
let headersLength = buf.ReadVarint()
let headersLength = buf.ReadVarint()
if headersLength < 0 then failwithf "invalid_headers_length=%i" headersLength else
let headers = Array.zeroCreate headersLength
for i = 0 to headers.Length - 1 do
Expand Down Expand Up @@ -800,10 +799,13 @@ module Protocol =
MessageSet.ReadRecords (buf,magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss)
| compression ->
let recordsLength = sizeInBytes - RecordBatch.RECORD_BATCH_OVERHEAD
let compressedValue = buf.Slice recordsLength
let decompressedValue = CompressionCodec.decompress compression compressedValue
MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss)
buf.ShiftOffset recordsLength
if buf.Buffer.Count < recordsLength then
buf.ShiftOffset buf.Buffer.Count
else
let compressedValue = buf.Slice recordsLength
let decompressedValue = CompressionCodec.decompress compression compressedValue
MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss)
buf.ShiftOffset recordsLength
if checkCrc then
let crcCount = buf.Buffer.Count - attributesOffset
let crc = Crc.crc32C buf.Buffer.Array attributesOffset crcCount
Expand Down

0 comments on commit db40171

Please sign in to comment.