Skip to content

Commit

Permalink
Merge pull request IBM#2006 from faillefer/fix-1912-delete-offsets
Browse files Browse the repository at this point in the history
Add support for DeleteOffsets operation
  • Loading branch information
bai authored Sep 9, 2021
2 parents 1e3101e + e14db2e commit bd4c972
Show file tree
Hide file tree
Showing 12 changed files with 550 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

#### Unreleased

# Improvements

- #1912 - @faillefer Support for --delete-offsets for consumer group topic

#### Version 1.28.0 (2021-02-15)

**Note that with this release we change `RoundRobinBalancer` strategy to match Java client behavior. See #1788 for details.**
Expand Down
31 changes: 31 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type ClusterAdmin interface {
// List the consumer group offsets available in the cluster.
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)

// Deletes a consumer group offset
DeleteConsumerGroupOffset(group string, topic string, partition int32) error

// Delete a consumer group.
DeleteConsumerGroup(group string) error

Expand Down Expand Up @@ -875,6 +878,34 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m
return coordinator.FetchOffset(request)
}

func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

request := &DeleteOffsetsRequest{
Group: group,
partitions: map[string][]int32{
topic: {partition},
},
}

resp, err := coordinator.DeleteOffsets(request)
if err != nil {
return err
}

if resp.ErrorCode != ErrNoError {
return resp.ErrorCode
}

if resp.Errors[topic][partition] != ErrNoError {
return resp.Errors[topic][partition]
}
return nil
}

func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
Expand Down
46 changes: 46 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,52 @@ func TestDeleteConsumerGroup(t *testing.T) {
}
}

func TestDeleteOffset(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

group := "group-delete-offset"
topic := "topic-delete-offset"
partition := int32(0)

handlerMap := map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
}
seedBroker.SetHandlerByMap(handlerMap)

config := NewTestConfig()
config.Version = V2_4_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// Test NoError
handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNoError, topic, partition, ErrNoError)
err = admin.DeleteConsumerGroupOffset(group, topic, partition)
if err != nil {
t.Fatalf("DeleteConsumerGroupOffset failed with error %v", err)
}

// Test Error
handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNotCoordinatorForConsumer, topic, partition, ErrNoError)
err = admin.DeleteConsumerGroupOffset(group, topic, partition)
if err != ErrNotCoordinatorForConsumer {
t.Fatalf("DeleteConsumerGroupOffset should have failed with error %v", ErrNotCoordinatorForConsumer)
}

// Test Error for partition
handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNoError, topic, partition, ErrGroupSubscribedToTopic)
err = admin.DeleteConsumerGroupOffset(group, topic, partition)
if err != ErrGroupSubscribedToTopic {
t.Fatalf("DeleteConsumerGroupOffset should have failed with error %v", ErrGroupSubscribedToTopic)
}
}

// TestRefreshMetaDataWithDifferentController ensures that the cached
// controller can be forcibly updated from Metadata by the admin client
func TestRefreshMetaDataWithDifferentController(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,17 @@ func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsRespon
return response, nil
}

// DeleteOffsets sends a request to delete group offsets and returns a response or error
func (b *Broker) DeleteOffsets(request *DeleteOffsetsRequest) (*DeleteOffsetsResponse, error) {
response := new(DeleteOffsetsResponse)

if err := b.sendAndReceive(request, response); err != nil {
return nil, err
}

return response, nil
}

// DescribeLogDirs sends a request to get the broker's log dir paths and sizes
func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
response := new(DescribeLogDirsResponse)
Expand Down
16 changes: 16 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,22 @@ var brokerTestTable = []struct {
}
},
},

{
V2_4_0_0,
"DeleteOffsetsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DeleteOffsetsRequest{}
response, err := broker.DeleteOffsets(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("DeleteGroups request got no response!")
}
},
},
}

func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
Expand Down
92 changes: 92 additions & 0 deletions delete_offsets_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package sarama

type DeleteOffsetsRequest struct {
Group string
partitions map[string][]int32
}

func (r *DeleteOffsetsRequest) encode(pe packetEncoder) (err error) {
err = pe.putString(r.Group)
if err != nil {
return err
}

if r.partitions == nil {
pe.putInt32(0)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
}
}
for topic, partitions := range r.partitions {
err = pe.putString(topic)
if err != nil {
return err
}
err = pe.putInt32Array(partitions)
if err != nil {
return err
}
}
return
}

func (r *DeleteOffsetsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Group, err = pd.getString()
if err != nil {
return err
}
var partitionCount int

partitionCount, err = pd.getArrayLength()
if err != nil {
return err
}

if (partitionCount == 0 && version < 2) || partitionCount < 0 {
return nil
}

r.partitions = make(map[string][]int32, partitionCount)
for i := 0; i < partitionCount; i++ {
var topic string
topic, err = pd.getString()
if err != nil {
return err
}

var partitions []int32
partitions, err = pd.getInt32Array()
if err != nil {
return err
}

r.partitions[topic] = partitions
}

return nil
}

func (r *DeleteOffsetsRequest) key() int16 {
return 47
}

func (r *DeleteOffsetsRequest) version() int16 {
return 0
}

func (r *DeleteOffsetsRequest) headerVersion() int16 {
return 1
}

func (r *DeleteOffsetsRequest) requiredVersion() KafkaVersion {
return V2_4_0_0
}

func (r *DeleteOffsetsRequest) AddPartition(topic string, partitionID int32) {
if r.partitions == nil {
r.partitions = make(map[string][]int32)
}

r.partitions[topic] = append(r.partitions[topic], partitionID)
}
53 changes: 53 additions & 0 deletions delete_offsets_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package sarama

import "testing"

var (
emptyDeleteOffsetsRequest = []byte{
0, 3, 'f', 'o', 'o', // group name: foo
0, 0, 0, 0, // 0 partition
}

doubleDeleteOffsetsRequest = []byte{
0, 3, 'f', 'o', 'o', // group name: foo
0, 0, 0, 1, // 1 topic
0, 3, 'b', 'a', 'r', // topic name: bar
0, 0, 0, 2, // 2 partitions
0, 0, 0, 6, // partition 6
0, 0, 0, 7, // partition 7
}

tripleDeleteOffsetsRequest = []byte{
0, 3, 'f', 'o', 'o', // group name: foo
0, 0, 0, 2, // 2 topics
0, 3, 'b', 'a', 'r', // topic name: bar
0, 0, 0, 2, // 2 partitions
0, 0, 0, 6, // partition 6
0, 0, 0, 7, // partition 7
0, 3, 'b', 'a', 'z', // topic name: baz
0, 0, 0, 1, // 1 partition
0, 0, 0, 0, // partition 0
}
)

func TestDeleteOffsetsRequest(t *testing.T) {
var request *DeleteOffsetsRequest

request = new(DeleteOffsetsRequest)
request.Group = "foo"

testRequest(t, "no offset", request, emptyDeleteOffsetsRequest)

request = new(DeleteOffsetsRequest)
request.Group = "foo"
request.AddPartition("bar", 6)
request.AddPartition("bar", 7)
testRequest(t, "two offsets on one topic", request, doubleDeleteOffsetsRequest)

request = new(DeleteOffsetsRequest)
request.Group = "foo"
request.AddPartition("bar", 6)
request.AddPartition("bar", 7)
request.AddPartition("baz", 0)
testRequest(t, "three offsets on two topics", request, tripleDeleteOffsetsRequest)
}
Loading

0 comments on commit bd4c972

Please sign in to comment.