-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpc: fix receiving empty messages when compression is enabled and maxReceiveMessageSize is maxInt64 #7753
grpc: fix receiving empty messages when compression is enabled and maxReceiveMessageSize is maxInt64 #7753
Conversation
@misvivek please keep the description and pr title in the format as I have updated |
noted @purnesh42H |
rpc_util.go
Outdated
if err = checkReceiveMessageOverflow(int64(out.Len()), int64(maxReceiveMessageSize), dcReader); err != nil { | ||
return nil, out.Len() + 1, err | ||
} | ||
return out, len(out), err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return out.Len()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util.go
Outdated
if readBytes == maxReceiveMessageSize { | ||
b := make([]byte, 1) | ||
if n, err := dcReader.Read(b); n > 0 || err != io.EOF { | ||
return fmt.Errorf("overflow: message larger than max size receivable by client (%d bytes)", maxReceiveMessageSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"overflow: received message size is larger than the allowed maxReceiveMessageSize (%d bytes)."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
@@ -290,3 +292,130 @@ func BenchmarkGZIPCompressor512KiB(b *testing.B) { | |||
func BenchmarkGZIPCompressor1MiB(b *testing.B) { | |||
bmCompressor(b, 1024*1024, NewGZIPCompressor()) | |||
} | |||
|
|||
type mockCompressor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/mockCompressor/testCompressor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
@@ -290,3 +292,130 @@ func BenchmarkGZIPCompressor512KiB(b *testing.B) { | |||
func BenchmarkGZIPCompressor1MiB(b *testing.B) { | |||
bmCompressor(b, 1024*1024, NewGZIPCompressor()) | |||
} | |||
|
|||
type mockCompressor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
top level docstring explaining the use of this testCompressor and top level comments for all the functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
t.Errorf("decompress() size = %d, want %d", size, tt.size) | ||
} | ||
if len(tt.want) != len(output) { | ||
t.Errorf("decompress() output length = %d, want %d", output, tt.want) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decompress() output length, got = %d, want = %d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
name: "Successful decompression", | ||
compressor: &mockCompressor{ | ||
DecompressedData: []byte("decompressed data"), | ||
Size: 17, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need "Size" as different attribute, can't you calculate from DecompressedData
?
rpc_util_test.go
Outdated
@@ -290,3 +292,130 @@ func BenchmarkGZIPCompressor512KiB(b *testing.B) { | |||
func BenchmarkGZIPCompressor1MiB(b *testing.B) { | |||
bmCompressor(b, 1024*1024, NewGZIPCompressor()) | |||
} | |||
|
|||
type mockCompressor struct { | |||
DecompressedData []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are these attributes upper case? We are not exporting them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to lowercase
rpc_util_test.go
Outdated
input mem.BufferSlice | ||
maxReceiveMessageSize int | ||
want mem.BufferSlice | ||
size int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this size for? Please give more meaningful name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
}{ | ||
{ | ||
name: "Successful decompression", | ||
compressor: &mockCompressor{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to test compressor for all cases? Can't we just use a regular compressor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of this test is to also verify that inputs work with regular compressor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this @purnesh42H
``compressor := gzip.NewCompressor()
originalData := []byte("test data")
// Compress
compressedData, err := compressor.Compress(originalData)
if err != nil {
t.Fatalf("Failed to compress data: %v", err)
}
// Decompress
decompressedData, err := compressor.Decompress(compressedData)
if err != nil {
t.Fatalf("Failed to decompress data: %v", err)
}
if !bytes.Equal(decompressedData, originalData) {
t.Errorf("Decompressed data does not match original")
}``
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i think we should try to get the regular test cases working with regular compressor. Only use testCompressor when we want to simulate or enforce an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you check this and see how are different compressors are tested https://github.com/grpc/grpc-go/blob/56df169480cdb4928a24a50b5289f909f0d81ba7/test/compressor_test.go ?
We should try to do something similar. There are test compressors as well regular compressors based on test cases. Compressor can be part of test case struct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more example
Line 151 in 56df169
func (s) TestCompress(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to make as many as tests as we can using regular compressor and then we can review the rest
rpc_util_test.go
Outdated
// used for testing compression and decompression functionality. | ||
// Compress is a placeholder for the compression logic. | ||
// It currently returns a nil WriteCloser and no error, as the actual compression is not implemented. | ||
// DecompressedSize returns the pre-configured size of the decompressed data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these comments should be against the attributes of struct, not in docstring
rpc_util_test.go
Outdated
}{ | ||
{ | ||
name: "Successful decompression", | ||
compressor: &mockCompressor{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i think we should try to get the regular test cases working with regular compressor. Only use testCompressor when we want to simulate or enforce an error
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #7753 +/- ##
==========================================
+ Coverage 81.80% 81.98% +0.17%
==========================================
Files 375 377 +2
Lines 37978 38132 +154
==========================================
+ Hits 31068 31262 +194
+ Misses 5609 5564 -45
- Partials 1301 1306 +5
|
rpc_util_test.go
Outdated
t.Fatalf("Could not initialize gzip compressor with level 5 compression.") | ||
} | ||
|
||
for _, test := range []struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@misvivek i don't understand what this test loop is doing and what is it testing. This seems totally unnecessary for the change (line 349-373)
rpc_util_test.go
Outdated
}{ | ||
{ | ||
name: "Successful decompression", | ||
compressor: &testCompressor{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have to use real compressor here. I have mentioned this so many times. Why do you keep using test compressor here?
rpc_util_test.go
Outdated
@@ -290,3 +293,129 @@ func BenchmarkGZIPCompressor512KiB(b *testing.B) { | |||
func BenchmarkGZIPCompressor1MiB(b *testing.B) { | |||
bmCompressor(b, 1024*1024, NewGZIPCompressor()) | |||
} | |||
|
|||
type testCompressor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this testCompressor should not be used ideally. If at all we are using it, we should not use it for test cases involving max receive size and actual receive message size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
error error | ||
}{ | ||
{ | ||
name: "Successful decompression", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more context to name. For example, in this case actual message size is less than allowed receive size. Keep it short though.
rpc_util_test.go
Outdated
error: nil, | ||
}, | ||
{ | ||
name: "Error during decompression", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. name should be meaningful in terms of input and output
rpc_util_test.go
Outdated
error: errors.New("decompression error"), | ||
}, | ||
{ | ||
name: "Buffer overflow", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a better name
rpc_util_test.go
Outdated
}, | ||
{ | ||
name: "Buffer overflow", | ||
compressor: &testCompressor{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testCompressor should not be required here
rpc_util_test.go
Outdated
}, | ||
{ | ||
name: "MaxInt64 receive size with small data", | ||
compressor: &testCompressor{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, testCompressor should not be required here. We should use a regular compressor and simulate a scenario where receiveMaxSize is Int64 and actual data is small and that should pass.
3589636
to
3603069
Compare
rpc_util_test.go
Outdated
error error | ||
}{ | ||
{ | ||
name: "Decompresses successfully with sufficient buffer size", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
success, receive message within maxReceiveMessageSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
error: errors.New("decompression error"), | ||
}, | ||
{ | ||
name: "Fails with overflow error when decompressed size exceeds maxReceiveMessageSize", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overflow failure, receive message exceeds maxReceiveMessageSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
error: nil, | ||
}, | ||
{ | ||
name: "Fails with io.Copy error during decompression", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this test case? Do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done removed
rpc_util_test.go
Outdated
error: errors.New("overflow: received message size is larger than the allowed maxReceiveMessageSize (5 bytes)."), | ||
}, | ||
{ | ||
name: "Returns EOF error when maxReceiveMessageSize is MaxInt64 but data ends prematurely", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EOF is not really failure. This test case is similar to case 1) except maxReceiveMessageSize is math.MAXInt64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
compressedSize: 0, | ||
error: errors.New("simulated io.Copy read error"), | ||
}, | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is redundant. Case 1) already test it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done remove
rpc_util_test.go
Outdated
{ | ||
name: "Decompresses successfully with sufficient buffer size", | ||
compressor: c, | ||
input: func() mem.BufferSlice { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As suggested, change this to []byte and do mem.BufferSlice
when calling decompress
rpc_util_test.go
Outdated
return mem.BufferSlice{mem.NewBuffer(&compressedData, nil)} | ||
}(), | ||
maxReceiveMessageSize: 100, | ||
want: func() mem.BufferSlice { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. this should be just []byte. When comparing you should compare to this in []byte format
rpc_util_test.go
Outdated
if size != tt.compressedSize { | ||
t.Errorf("decompress() size, got = %d, want = %d", size, tt.compressedSize) | ||
} | ||
if len(tt.want) != len(output) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should compare the actual bytes instead of just comparing the length.
rpc_util_test.go
Outdated
output, size, err := decompress(tt.compressor, tt.input, tt.maxReceiveMessageSize, nil) | ||
// Check if both err and tt.error are either nil or non-nil | ||
if (err != nil) != (tt.error != nil) { | ||
t.Errorf("decompress() error, got err=%v, want err=%v", err, tt.error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all should change to t.Fatalf because we don't want to continue once detecting an error. Like you won't compare compressed size if decompression has returned error
rpc_util_test.go
Outdated
input mem.BufferSlice | ||
maxReceiveMessageSize int | ||
want mem.BufferSlice | ||
compressedSize int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need compressedSize as parameter here. It can be easily calculated from compressed data you are providing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compressedSize := 0 for _, buf := range tt.input { compressedSize += len(buf.Bytes()) }
your are suggesting like this
rpc_util.go
Outdated
@@ -900,12 +899,33 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, maxReceiveMes | |||
//} | |||
|
|||
var out mem.BufferSlice | |||
_, err = io.Copy(mem.NewWriter(&out, pool), io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1)) | |||
_, err = io.Copy(mem.NewWriter(&out, pool), io.LimitReader(dcReader, int64(maxReceiveMessageSize))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@misvivek can you rebase with latest main branch. This has changed to mem.ReadAll
. Can you verify if bug still exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please verify if the bug still exist first with me.ReadAll. Don't change that
rpc_util_test.go
Outdated
error: errors.New("overflow: received message size is larger than the allowed maxReceiveMessageSize (5 bytes)"), | ||
}, | ||
{ | ||
name: "Succeeds when message size is much smaller than maxReceiveMessageSize", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is similar to case 1. We don't need it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
|
||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
message := func() mem.BufferSlice { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a strange way of initializing. It should be done something like below
mem.BufferSlice{mem.NewBuffer(&input, nil)}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have to send the compressed message, so we are doing the above way,
we tried your suggestion then we are getting like gzip: invalid header,
rpc_util_test.go
Outdated
return mem.BufferSlice{mem.NewBuffer(&compressedData, nil)} | ||
}() | ||
output, size, err := decompress(tt.compressor, message, tt.maxReceiveMessageSize, nil) | ||
wantMsg := func() mem.BufferSlice { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. Initialize as suggested above.
decompressed := tt.want | ||
return mem.BufferSlice{mem.NewBuffer(&decompressed, nil)} | ||
}() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need of new lines when checking different things. Remove them please.
}, | ||
} | ||
|
||
for _, tt := range tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/tt/test
rpc_util_test.go
Outdated
}() | ||
|
||
// Check for expected error | ||
if (err != nil) != (tt.error != nil) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be more clear. Something like if test.error != nil && err == nil
then throw error that we got nil error when we want non-nil err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rpc_util_test.go
Outdated
|
||
} | ||
// to check when we are passing empty bytes output is "nil" failure, empty receive message case | ||
if wantMsg != nil && output != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there should be separate condition to check if wanMsg is non-nil and output is nil. Then it should throw error that got nil when we want non-nil
47003c3
to
fad8892
Compare
rpc_util_test.go
Outdated
if wantMsg != nil && output == nil { | ||
t.Fatalf("decompress() got = nil, want = non nil") | ||
} | ||
if wantMsg != nil && output != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, need of checking non nil again for output as you already checked that
Fixes #4552
RELEASE NOTES: