From c49a6fb8b29f02774a7780bc85bc146be4d3bf17 Mon Sep 17 00:00:00 2001 From: David Faillefer <23032941+faillefer@users.noreply.github.com> Date: Sat, 4 Sep 2021 10:20:35 +0200 Subject: [PATCH 1/7] Implementation of DeleteOffsetsRequest --- delete_offsets_request.go | 92 ++++++++++++++++++++++++++++++++++ delete_offsets_request_test.go | 53 ++++++++++++++++++++ request.go | 2 + 3 files changed, 147 insertions(+) create mode 100644 delete_offsets_request.go create mode 100644 delete_offsets_request_test.go diff --git a/delete_offsets_request.go b/delete_offsets_request.go new file mode 100644 index 000000000..339c7857c --- /dev/null +++ b/delete_offsets_request.go @@ -0,0 +1,92 @@ +package sarama + +type DeleteOffsetsRequest struct { + Group string + partitions map[string][]int32 +} + +func (r *DeleteOffsetsRequest) encode(pe packetEncoder) (err error) { + err = pe.putString(r.Group) + if err != nil { + return err + } + + if r.partitions == nil { + pe.putInt32(0) + } else { + if err = pe.putArrayLength(len(r.partitions)); err != nil { + return err + } + } + for topic, partitions := range r.partitions { + err = pe.putString(topic) + if err != nil { + return err + } + err = pe.putInt32Array(partitions) + if err != nil { + return err + } + } + return +} + +func (r *DeleteOffsetsRequest) decode(pd packetDecoder, version int16) (err error) { + r.Group, err = pd.getString() + if err != nil { + return err + } + var partitionCount int + + partitionCount, err = pd.getArrayLength() + if err != nil { + return err + } + + if (partitionCount == 0 && version < 2) || partitionCount < 0 { + return nil + } + + r.partitions = make(map[string][]int32, partitionCount) + for i := 0; i < partitionCount; i++ { + var topic string + topic, err = pd.getString() + if err != nil { + return err + } + + var partitions []int32 + partitions, err = pd.getInt32Array() + if err != nil { + return err + } + + r.partitions[topic] = partitions + } + + return nil +} + +func (r *DeleteOffsetsRequest) key() int16 { + return 47 +} + +func (r *DeleteOffsetsRequest) version() int16 { + return 0 +} + +func (r *DeleteOffsetsRequest) headerVersion() int16 { + return 1 +} + +func (r *DeleteOffsetsRequest) requiredVersion() KafkaVersion { + return V2_4_0_0 +} + +func (r *DeleteOffsetsRequest) AddPartition(topic string, partitionID int32) { + if r.partitions == nil { + r.partitions = make(map[string][]int32) + } + + r.partitions[topic] = append(r.partitions[topic], partitionID) +} diff --git a/delete_offsets_request_test.go b/delete_offsets_request_test.go new file mode 100644 index 000000000..1b0b20a71 --- /dev/null +++ b/delete_offsets_request_test.go @@ -0,0 +1,53 @@ +package sarama + +import "testing" + +var ( + emptyDeleteOffsetsRequest = []byte{ + 0, 3,'f', 'o', 'o', // group name: foo + 0, 0, 0, 0, // 0 partition + } + + doubleDeleteOffsetsRequest = []byte{ + 0, 3,'f', 'o', 'o', // group name: foo + 0, 0, 0, 1, // 1 topic + 0, 3, 'b', 'a', 'r', // topic name: bar + 0, 0, 0, 2, // 2 partitions + 0, 0, 0, 6, // partition 6 + 0, 0, 0, 7, // partition 7 + } + + tripleDeleteOffsetsRequest = []byte{ + 0, 3,'f', 'o', 'o', // group name: foo + 0, 0, 0, 2, // 2 topics + 0, 3, 'b', 'a', 'r', // topic name: bar + 0, 0, 0, 2, // 2 partitions + 0, 0, 0, 6, // partition 6 + 0, 0, 0, 7, // partition 7 + 0, 3, 'b', 'a', 'z', // topic name: baz + 0, 0, 0, 1, // 1 partition + 0, 0, 0, 0, // partition 0 + } +) + +func TestDeleteOffsetsRequest(t *testing.T) { + var request *DeleteOffsetsRequest + + request = new(DeleteOffsetsRequest) + request.Group = "foo" + + testRequest(t, "no offset", request, emptyDeleteOffsetsRequest) + + request = new(DeleteOffsetsRequest) + request.Group = "foo" + request.AddPartition("bar", 6) + request.AddPartition("bar", 7) + testRequest(t, "two offsets on one topic", request, doubleDeleteOffsetsRequest) + + request = new(DeleteOffsetsRequest) + request.Group = "foo" + request.AddPartition("bar", 6) + request.AddPartition("bar", 7) + request.AddPartition("baz", 0) + testRequest(t, "three offsets on two topics", request, tripleDeleteOffsetsRequest) +} diff --git a/request.go b/request.go index d899df534..08c2b6741 100644 --- a/request.go +++ b/request.go @@ -188,6 +188,8 @@ func allocateBody(key, version int16) protocolBody { return &AlterPartitionReassignmentsRequest{} case 46: return &ListPartitionReassignmentsRequest{} + case 47: + return &DeleteOffsetsRequest{} case 50: return &DescribeUserScramCredentialsRequest{} case 51: From f8739df4f0688948ea61c0c8ecc38a9307c4d488 Mon Sep 17 00:00:00 2001 From: David Faillefer <23032941+faillefer@users.noreply.github.com> Date: Sat, 4 Sep 2021 10:34:51 +0200 Subject: [PATCH 2/7] Implementation of DeleteOffsetsResponse --- delete_offsets_response.go | 112 ++++++++++++++++++++++++++++++++ delete_offsets_response_test.go | 85 ++++++++++++++++++++++++ 2 files changed, 197 insertions(+) create mode 100644 delete_offsets_response.go create mode 100644 delete_offsets_response_test.go diff --git a/delete_offsets_response.go b/delete_offsets_response.go new file mode 100644 index 000000000..00c3eaf90 --- /dev/null +++ b/delete_offsets_response.go @@ -0,0 +1,112 @@ +package sarama + +import ( + "time" +) + +type DeleteOffsetsResponse struct { + //The top-level error code, or 0 if there was no error. + ErrorCode KError + ThrottleTime time.Duration + //The responses for each partition of the topics. + Errors map[string]map[int32]KError +} + +func (r *DeleteOffsetsResponse) AddError(topic string, partition int32, errorCode KError) { + if r.Errors == nil { + r.Errors = make(map[string]map[int32]KError) + } + partitions := r.Errors[topic] + if partitions == nil { + partitions = make(map[int32]KError) + r.Errors[topic] = partitions + } + partitions[partition] = errorCode +} + +func (r *DeleteOffsetsResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.ErrorCode)) + pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(r.Errors)); err != nil { + return err + } + for topic, partitions := range r.Errors { + if err := pe.putString(topic); err != nil { + return err + } + if err := pe.putArrayLength(len(partitions)); err != nil { + return err + } + for partition, errorCode := range partitions { + pe.putInt32(partition) + pe.putInt16(int16(errorCode)) + } + } + return nil +} + +func (r *DeleteOffsetsResponse) decode(pd packetDecoder, version int16) error { + tmpErr, err := pd.getInt16() + if err != nil { + return err + } + r.ErrorCode = KError(tmpErr) + + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + numTopics, err := pd.getArrayLength() + if err != nil || numTopics == 0 { + return err + } + + r.Errors = make(map[string]map[int32]KError, numTopics) + for i := 0; i < numTopics; i++ { + name, err := pd.getString() + if err != nil { + return err + } + + numErrors, err := pd.getArrayLength() + if err != nil { + return err + } + + r.Errors[name] = make(map[int32]KError, numErrors) + + for j := 0; j < numErrors; j++ { + id, err := pd.getInt32() + if err != nil { + return err + } + + tmp, err := pd.getInt16() + if err != nil { + return err + } + r.Errors[name][id] = KError(tmp) + } + } + + return nil +} + +func (r *DeleteOffsetsResponse) key() int16 { + return 47 +} + +func (r *DeleteOffsetsResponse) version() int16 { + return 0 +} + +func (r *DeleteOffsetsResponse) headerVersion() int16 { + return 0 +} + +func (r *DeleteOffsetsResponse) requiredVersion() KafkaVersion { + return V2_4_0_0 +} diff --git a/delete_offsets_response_test.go b/delete_offsets_response_test.go new file mode 100644 index 000000000..2a7cb6f0d --- /dev/null +++ b/delete_offsets_response_test.go @@ -0,0 +1,85 @@ +package sarama + +import ( + "testing" +) + +var ( + emptyDeleteOffsetsResponse = []byte{ + 0, 0, // no error + 0, 0, 0, 0, // 0 throttle + 0, 0, 0, 0, // 0 topics + } + + noErrorDeleteOffsetsResponse = []byte{ + 0, 0, // no error + 0, 0, 0, 0, // 0 throttle + 0, 0, 0, 1, // 1 topic + 0, 3, 'b', 'a', 'r', // topic name: bar + 0, 0, 0, 2, // 2 partitions + 0, 0, 0, 6, // partition 6 + 0, 0, // no error + 0, 0, 0, 7, // partition 7 + 0, 0, // no error + } + + errorDeleteOffsetsResponse = []byte{ + 0, 16, // error 16 : ErrNotCoordinatorForConsumer + 0, 0, 0, 0, // 0 throttle + 0, 0, 0, 1, // 1 topic + 0, 3, 'b', 'a', 'r', // topic name: bar + 0, 0, 0, 1, // 1 partition + 0, 0, 0, 6, // partition 6 + 0, 0, // no error + } + + errorOnPartitionResponse = []byte{ + 0, 0, // no error + 0, 0, 0, 0, // 0 throttle + 0, 0, 0, 1, // 1 topic + 0, 3, 'b', 'a', 'r', // topic name: bar + 0, 0, 0, 1, // 1 partition + 0, 0, 0, 6, // partition 6 + 0, 86, // error ErrGroupSubscribedToTopic=86 + } +) + +func TestDeleteOffsetsResponse(t *testing.T) { + var response *DeleteOffsetsResponse + + response = &DeleteOffsetsResponse{ + ErrorCode: 0, + ThrottleTime: 0, + } + testResponse(t, "empty no error", response, emptyDeleteOffsetsResponse) + + response = &DeleteOffsetsResponse { + ErrorCode: 0, + ThrottleTime: 0, + Errors: map[string]map[int32]KError { + "bar": map[int32]KError { + 6: 0, + 7: 0, + }, + }, + } + testResponse(t, "no error", response, noErrorDeleteOffsetsResponse) + + response = &DeleteOffsetsResponse { + ErrorCode: 16, + ThrottleTime: 0, + Errors: map[string]map[int32]KError { + "bar": map[int32]KError { + 6: 0, + }, + }, + } + testResponse(t, "error global", response, errorDeleteOffsetsResponse) + + response = &DeleteOffsetsResponse { + ErrorCode: 0, + ThrottleTime: 0, + } + response.AddError("bar", 6, ErrGroupSubscribedToTopic) + testResponse(t, "error partition", response, errorOnPartitionResponse) +} From b85a4d59a9923370bf36ed205c64ba88bc1e0dfe Mon Sep 17 00:00:00 2001 From: David Faillefer <23032941+faillefer@users.noreply.github.com> Date: Sun, 5 Sep 2021 10:49:20 +0200 Subject: [PATCH 3/7] Add DeleteOffsets operation on broker --- broker.go | 11 +++++++++++ broker_test.go | 16 ++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/broker.go b/broker.go index dd01e4ef1..6c5d5aac7 100644 --- a/broker.go +++ b/broker.go @@ -689,6 +689,17 @@ func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsRespon return response, nil } +// DeleteOffsets sends a request to delete group offsets and returns a response or error +func (b *Broker) DeleteOffsets(request *DeleteOffsetsRequest) (*DeleteOffsetsResponse, error) { + response := new(DeleteOffsetsResponse) + + if err := b.sendAndReceive(request, response); err != nil { + return nil, err + } + + return response, nil +} + // DescribeLogDirs sends a request to get the broker's log dir paths and sizes func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) { response := new(DescribeLogDirsResponse) diff --git a/broker_test.go b/broker_test.go index 2fa40ceb4..e895287e6 100644 --- a/broker_test.go +++ b/broker_test.go @@ -1009,6 +1009,22 @@ var brokerTestTable = []struct { } }, }, + + { + V2_4_0_0, + "DeleteOffsetsRequest", + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := DeleteOffsetsRequest{} + response, err := broker.DeleteOffsets(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("DeleteGroups request got no response!") + } + }, + }, } func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { From bb4ad59989aa33d72f7442d7429274487c34815d Mon Sep 17 00:00:00 2001 From: David Faillefer <23032941+faillefer@users.noreply.github.com> Date: Sun, 5 Sep 2021 10:52:40 +0200 Subject: [PATCH 4/7] Add DeleteConsumerGroupOffset operation on clusterAdmin --- admin.go | 31 +++++++++++++++++++++++++++++++ admin_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ mockresponses.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+) diff --git a/admin.go b/admin.go index 4187614f7..cee52bdc2 100644 --- a/admin.go +++ b/admin.go @@ -95,6 +95,9 @@ type ClusterAdmin interface { // List the consumer group offsets available in the cluster. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) + // Deletes a consumer group offset + DeleteConsumerGroupOffset(group string, topic string, partition int32) error + // Delete a consumer group. DeleteConsumerGroup(group string) error @@ -875,6 +878,34 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m return coordinator.FetchOffset(request) } +func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error { + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + + request := &DeleteOffsetsRequest{ + Group: group, + partitions: map[string][]int32{ + topic: []int32{partition}, + }, + } + + resp, err := coordinator.DeleteOffsets(request) + if err != nil { + return err + } + + if resp.ErrorCode != ErrNoError { + return resp.ErrorCode + } + + if resp.Errors[topic][partition] != ErrNoError { + return resp.Errors[topic][partition] + } + return nil +} + func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { coordinator, err := ca.client.Coordinator(group) if err != nil { diff --git a/admin_test.go b/admin_test.go index 4ab91c05a..36e4d4726 100644 --- a/admin_test.go +++ b/admin_test.go @@ -1326,6 +1326,52 @@ func TestDeleteConsumerGroup(t *testing.T) { } } +func TestDeleteOffset(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + group := "group-delete-offset" + topic := "topic-delete-offset" + partition := int32(0) + + handlerMap := map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), + } + seedBroker.SetHandlerByMap(handlerMap) + + config := NewTestConfig() + config.Version = V2_4_0_0 + + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + // Test NoError + handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNoError, topic, partition, ErrNoError) + err = admin.DeleteConsumerGroupOffset(group, topic, partition) + if err != nil { + t.Fatalf("DeleteConsumerGroupOffset failed with error %v", err) + } + + // Test Error + handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNotCoordinatorForConsumer, topic, partition, ErrNoError) + err = admin.DeleteConsumerGroupOffset(group, topic, partition) + if err != ErrNotCoordinatorForConsumer { + t.Fatalf("DeleteConsumerGroupOffset should have failed with error %v", ErrNotCoordinatorForConsumer) + } + + // Test Error for partition + handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNoError, topic, partition, ErrGroupSubscribedToTopic) + err = admin.DeleteConsumerGroupOffset(group, topic, partition) + if err != ErrGroupSubscribedToTopic { + t.Fatalf("DeleteConsumerGroupOffset should have failed with error %v", ErrGroupSubscribedToTopic) + } +} + // TestRefreshMetaDataWithDifferentController ensures that the cached // controller can be forcibly updated from Metadata by the admin client func TestRefreshMetaDataWithDifferentController(t *testing.T) { diff --git a/mockresponses.go b/mockresponses.go index 6654ed07c..280c9064c 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -1082,6 +1082,35 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead return resp } +type MockDeleteOffsetResponse struct { + errorCode KError + topic string + partition int32 + errorPartition KError +} + +func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse { + return &MockDeleteOffsetResponse{} +} + +func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse { + m.errorCode = errorCode + m.topic = topic + m.partition = partition + m.errorPartition = errorPartition + return m +} + +func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader { + resp := &DeleteOffsetsResponse{ + ErrorCode: m.errorCode, + Errors: map[string]map[int32]KError{ + m.topic : {m.partition : m.errorPartition}, + }, + } + return resp +} + type MockJoinGroupResponse struct { t TestReporter From 6e4ad144e1220a7d62bc67013151a7ad6b932fe0 Mon Sep 17 00:00:00 2001 From: David Faillefer <23032941+faillefer@users.noreply.github.com> Date: Mon, 6 Sep 2021 06:01:07 +0200 Subject: [PATCH 5/7] Add functional test for DeleteOffsetsRequest --- functional_consumer_group_test.go | 69 +++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 5fb063bf5..8ab672614 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -146,12 +146,81 @@ func TestFuncConsumerGroupFuzzy(t *testing.T) { } } +func TestFuncConsumerGroupOffsetDeletion(t *testing.T) { + checkKafkaVersion(t, "2.4.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + // create a client with 2.4.0 version as it is the minimal version + // that supports DeleteOffsets request + config := NewTestConfig() + config.Version = V2_4_0_0 + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + defer safeClose(t, client) + if err != nil { + t.Fatal(err) + } + + // create a consumer group with offsets on + // - topic test.1 partition 0 + // - topic test.4 partition 0 + groupID := testFuncConsumerGroupID(t) + consumerGroup, err := NewConsumerGroupFromClient(groupID, client) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, consumerGroup) + + offsetMgr, _ := NewOffsetManagerFromClient(groupID, client) + defer safeClose(t, offsetMgr) + markOffset(t, offsetMgr, "test.1", 0, 1) + markOffset(t, offsetMgr, "test.4", 0, 2) + offsetMgr.Commit() + + admin, err := NewClusterAdminFromClient(client) + if err != nil { + t.Fatal(err) + } + offsetFetch, err := admin.ListConsumerGroupOffsets(groupID, nil) + if err != nil { + t.Fatal(err) + } + if len(offsetFetch.Blocks) != 2 { + t.Fatal("Expected offsets on two topics. Found offsets on ", len(offsetFetch.Blocks), "topics.") + } + + // Delete offset for partition topic test.4 partition 0 + err = admin.DeleteConsumerGroupOffset(groupID, "test.4", 0) + if err != nil { + t.Fatal(err) + } + + offsetFetch, err = admin.ListConsumerGroupOffsets(groupID, nil) + if err != nil { + t.Fatal(err) + } + if len(offsetFetch.Blocks) != 1 { + t.Fatal("Expected offsets on one topic. Found offsets on ", len(offsetFetch.Blocks), "topics.") + } + if offsetFetch.Blocks["test.4"] != nil { + t.Fatal("Offset still exists for topic 'topic.4'. It should have been deleted.") + } +} + // -------------------------------------------------------------------- func testFuncConsumerGroupID(t *testing.T) string { return fmt.Sprintf("sarama.%s%d", t.Name(), time.Now().UnixNano()) } +func markOffset(t *testing.T, offsetMgr OffsetManager, topic string, partition int32, offset int64) { + partitionOffsetManager, err := offsetMgr.ManagePartition(topic, partition) + defer safeClose(t, partitionOffsetManager) + if err != nil { + t.Fatal(err) + } + partitionOffsetManager.MarkOffset(offset, "") +} + func testFuncConsumerGroupFuzzySeed(topic string) error { client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) if err != nil { From aa15a3e29920bd1f9fa65ee07886f2c501c068d9 Mon Sep 17 00:00:00 2001 From: David Faillefer <23032941+faillefer@users.noreply.github.com> Date: Mon, 6 Sep 2021 06:05:32 +0200 Subject: [PATCH 6/7] Update CHANGELOG.md --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 59ccd1de5..dd826e273 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ #### Unreleased +# Improvements + +- #1912 - @faillefer Support for --delete-offsets for consumer group topic + #### Version 1.28.0 (2021-02-15) **Note that with this release we change `RoundRobinBalancer` strategy to match Java client behavior. See #1788 for details.** From e14db2e8abf9b6cccd9b8649476978426923adc3 Mon Sep 17 00:00:00 2001 From: David Faillefer <23032941+faillefer@users.noreply.github.com> Date: Wed, 8 Sep 2021 14:12:13 +0200 Subject: [PATCH 7/7] Fix format --- admin.go | 4 ++-- delete_offsets_request_test.go | 12 ++++++------ delete_offsets_response_test.go | 32 ++++++++++++++++---------------- mockresponses.go | 8 ++++---- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/admin.go b/admin.go index cee52bdc2..3bac6208e 100644 --- a/admin.go +++ b/admin.go @@ -885,9 +885,9 @@ func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, pa } request := &DeleteOffsetsRequest{ - Group: group, + Group: group, partitions: map[string][]int32{ - topic: []int32{partition}, + topic: {partition}, }, } diff --git a/delete_offsets_request_test.go b/delete_offsets_request_test.go index 1b0b20a71..f0214e1e7 100644 --- a/delete_offsets_request_test.go +++ b/delete_offsets_request_test.go @@ -4,27 +4,27 @@ import "testing" var ( emptyDeleteOffsetsRequest = []byte{ - 0, 3,'f', 'o', 'o', // group name: foo + 0, 3, 'f', 'o', 'o', // group name: foo 0, 0, 0, 0, // 0 partition } doubleDeleteOffsetsRequest = []byte{ - 0, 3,'f', 'o', 'o', // group name: foo + 0, 3, 'f', 'o', 'o', // group name: foo 0, 0, 0, 1, // 1 topic - 0, 3, 'b', 'a', 'r', // topic name: bar + 0, 3, 'b', 'a', 'r', // topic name: bar 0, 0, 0, 2, // 2 partitions 0, 0, 0, 6, // partition 6 0, 0, 0, 7, // partition 7 } tripleDeleteOffsetsRequest = []byte{ - 0, 3,'f', 'o', 'o', // group name: foo + 0, 3, 'f', 'o', 'o', // group name: foo 0, 0, 0, 2, // 2 topics - 0, 3, 'b', 'a', 'r', // topic name: bar + 0, 3, 'b', 'a', 'r', // topic name: bar 0, 0, 0, 2, // 2 partitions 0, 0, 0, 6, // partition 6 0, 0, 0, 7, // partition 7 - 0, 3, 'b', 'a', 'z', // topic name: baz + 0, 3, 'b', 'a', 'z', // topic name: baz 0, 0, 0, 1, // 1 partition 0, 0, 0, 0, // partition 0 } diff --git a/delete_offsets_response_test.go b/delete_offsets_response_test.go index 2a7cb6f0d..a6a9d8bc9 100644 --- a/delete_offsets_response_test.go +++ b/delete_offsets_response_test.go @@ -12,15 +12,15 @@ var ( } noErrorDeleteOffsetsResponse = []byte{ - 0, 0, // no error + 0, 0, // no error 0, 0, 0, 0, // 0 throttle 0, 0, 0, 1, // 1 topic 0, 3, 'b', 'a', 'r', // topic name: bar 0, 0, 0, 2, // 2 partitions 0, 0, 0, 6, // partition 6 - 0, 0, // no error + 0, 0, // no error 0, 0, 0, 7, // partition 7 - 0, 0, // no error + 0, 0, // no error } errorDeleteOffsetsResponse = []byte{ @@ -34,13 +34,13 @@ var ( } errorOnPartitionResponse = []byte{ - 0, 0, // no error + 0, 0, // no error 0, 0, 0, 0, // 0 throttle 0, 0, 0, 1, // 1 topic 0, 3, 'b', 'a', 'r', // topic name: bar 0, 0, 0, 1, // 1 partition 0, 0, 0, 6, // partition 6 - 0, 86, // error ErrGroupSubscribedToTopic=86 + 0, 86, // error ErrGroupSubscribedToTopic=86 } ) @@ -48,16 +48,16 @@ func TestDeleteOffsetsResponse(t *testing.T) { var response *DeleteOffsetsResponse response = &DeleteOffsetsResponse{ - ErrorCode: 0, + ErrorCode: 0, ThrottleTime: 0, } testResponse(t, "empty no error", response, emptyDeleteOffsetsResponse) - response = &DeleteOffsetsResponse { - ErrorCode: 0, + response = &DeleteOffsetsResponse{ + ErrorCode: 0, ThrottleTime: 0, - Errors: map[string]map[int32]KError { - "bar": map[int32]KError { + Errors: map[string]map[int32]KError{ + "bar": { 6: 0, 7: 0, }, @@ -65,19 +65,19 @@ func TestDeleteOffsetsResponse(t *testing.T) { } testResponse(t, "no error", response, noErrorDeleteOffsetsResponse) - response = &DeleteOffsetsResponse { - ErrorCode: 16, + response = &DeleteOffsetsResponse{ + ErrorCode: 16, ThrottleTime: 0, - Errors: map[string]map[int32]KError { - "bar": map[int32]KError { + Errors: map[string]map[int32]KError{ + "bar": { 6: 0, }, }, } testResponse(t, "error global", response, errorDeleteOffsetsResponse) - response = &DeleteOffsetsResponse { - ErrorCode: 0, + response = &DeleteOffsetsResponse{ + ErrorCode: 0, ThrottleTime: 0, } response.AddError("bar", 6, ErrGroupSubscribedToTopic) diff --git a/mockresponses.go b/mockresponses.go index 280c9064c..d9785753c 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -1083,9 +1083,9 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead } type MockDeleteOffsetResponse struct { - errorCode KError - topic string - partition int32 + errorCode KError + topic string + partition int32 errorPartition KError } @@ -1105,7 +1105,7 @@ func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHead resp := &DeleteOffsetsResponse{ ErrorCode: m.errorCode, Errors: map[string]map[int32]KError{ - m.topic : {m.partition : m.errorPartition}, + m.topic: {m.partition: m.errorPartition}, }, } return resp