Skip to content

Commit

Permalink
Merge pull request IBM#2035 from Shopify/dnwe/add-missing-version-con…
Browse files Browse the repository at this point in the history
…stants

chore: populate the missing kafka versions
  • Loading branch information
dnwe authored Sep 22, 2021
2 parents 94c8136 + 8a2c8ba commit 461b931
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ default: fmt get update test lint

GO := go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 6m -coverprofile=profile.out -covermode=atomic
GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 10m -coverprofile=profile.out -covermode=atomic

FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go')
TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go')
Expand Down
3 changes: 3 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,20 +1357,23 @@ func TestDeleteOffset(t *testing.T) {

// Test NoError
handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNoError, topic, partition, ErrNoError)
seedBroker.SetHandlerByMap(handlerMap)
err = admin.DeleteConsumerGroupOffset(group, topic, partition)
if err != nil {
t.Fatalf("DeleteConsumerGroupOffset failed with error %v", err)
}

// Test Error
handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNotCoordinatorForConsumer, topic, partition, ErrNoError)
seedBroker.SetHandlerByMap(handlerMap)
err = admin.DeleteConsumerGroupOffset(group, topic, partition)
if err != ErrNotCoordinatorForConsumer {
t.Fatalf("DeleteConsumerGroupOffset should have failed with error %v", ErrNotCoordinatorForConsumer)
}

// Test Error for partition
handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNoError, topic, partition, ErrGroupSubscribedToTopic)
seedBroker.SetHandlerByMap(handlerMap)
err = admin.DeleteConsumerGroupOffset(group, topic, partition)
if err != ErrGroupSubscribedToTopic {
t.Fatalf("DeleteConsumerGroupOffset should have failed with error %v", ErrGroupSubscribedToTopic)
Expand Down
4 changes: 3 additions & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (b *Broker) Open(conf *Config) error {
return err
}

usingApiVersionsRequests := conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest

b.lock.Lock()

go withRecover(func() {
Expand All @@ -159,7 +161,7 @@ func (b *Broker) Open(conf *Config) error {
// Send an ApiVersionsRequest to identify the client (KIP-511).
// Ideally Sarama would use the response to control protocol versions,
// but for now just fire-and-forget just to send
if conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest {
if usingApiVersionsRequests {
_, err = b.ApiVersions(&ApiVersionsRequest{
Version: 3,
ClientSoftwareName: defaultClientSoftwareName,
Expand Down
48 changes: 26 additions & 22 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,30 +687,34 @@ func TestClientController(t *testing.T) {
cfg := NewTestConfig()

// test kafka version greater than 0.10.0.0
cfg.Version = V0_10_0_0
client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client1)
broker, err := client1.Controller()
if err != nil {
t.Fatal(err)
}
if broker.Addr() != controllerBroker.Addr() {
t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
}
t.Run("V0_10_0_0", func(t *testing.T) {
cfg.Version = V0_10_0_0
client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client1)
broker, err := client1.Controller()
if err != nil {
t.Fatal(err)
}
if broker.Addr() != controllerBroker.Addr() {
t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
}
})

// test kafka version earlier than 0.10.0.0
cfg.Version = V0_9_0_1
client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client2)
if _, err = client2.Controller(); err != ErrUnsupportedVersion {
t.Errorf("Expected Controller() to return %s, found %s", ErrUnsupportedVersion, err)
}
t.Run("V0_9_0_1", func(t *testing.T) {
cfg.Version = V0_9_0_1
client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client2)
if _, err = client2.Controller(); err != ErrUnsupportedVersion {
t.Errorf("Expected Controller() to return %s, found %s", ErrUnsupportedVersion, err)
}
})
}

func TestClientMetadataTimeout(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@ func (b *MockBroker) SetLatency(latency time.Duration) {
// and uses the found MockResponse instance to generate an appropriate reply.
// If the request type is not found in the map then nothing is sent.
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
fnMap := make(map[string]MockResponse)
for k, v := range handlerMap {
fnMap[k] = v
}
b.setHandler(func(req *request) (res encoderWithHeader) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
mockResponse := handlerMap[reqTypeName]
mockResponse := fnMap[reqTypeName]
if mockResponse == nil {
return nil
}
Expand Down
26 changes: 26 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,35 @@ var (
V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
V0_10_2_2 = newKafkaVersion(0, 10, 2, 2)
V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)
V1_0_1_0 = newKafkaVersion(1, 0, 1, 0)
V1_0_2_0 = newKafkaVersion(1, 0, 2, 0)
V1_1_0_0 = newKafkaVersion(1, 1, 0, 0)
V1_1_1_0 = newKafkaVersion(1, 1, 1, 0)
V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)
V2_0_1_0 = newKafkaVersion(2, 0, 1, 0)
V2_1_0_0 = newKafkaVersion(2, 1, 0, 0)
V2_1_1_0 = newKafkaVersion(2, 1, 1, 0)
V2_2_0_0 = newKafkaVersion(2, 2, 0, 0)
V2_2_1_0 = newKafkaVersion(2, 2, 1, 0)
V2_2_2_0 = newKafkaVersion(2, 2, 2, 0)
V2_3_0_0 = newKafkaVersion(2, 3, 0, 0)
V2_3_1_0 = newKafkaVersion(2, 3, 1, 0)
V2_4_0_0 = newKafkaVersion(2, 4, 0, 0)
V2_4_1_0 = newKafkaVersion(2, 4, 1, 0)
V2_5_0_0 = newKafkaVersion(2, 5, 0, 0)
V2_5_1_0 = newKafkaVersion(2, 5, 1, 0)
V2_6_0_0 = newKafkaVersion(2, 6, 0, 0)
V2_6_1_0 = newKafkaVersion(2, 6, 1, 0)
V2_6_2_0 = newKafkaVersion(2, 6, 2, 0)
V2_7_0_0 = newKafkaVersion(2, 7, 0, 0)
V2_7_1_0 = newKafkaVersion(2, 7, 1, 0)
V2_8_0_0 = newKafkaVersion(2, 8, 0, 0)
V2_8_1_0 = newKafkaVersion(2, 8, 1, 0)
V3_0_0_0 = newKafkaVersion(3, 0, 0, 0)

SupportedVersions = []KafkaVersion{
Expand All @@ -178,22 +191,35 @@ var (
V0_10_1_1,
V0_10_2_0,
V0_10_2_1,
V0_10_2_2,
V0_11_0_0,
V0_11_0_1,
V0_11_0_2,
V1_0_0_0,
V1_0_1_0,
V1_0_2_0,
V1_1_0_0,
V1_1_1_0,
V2_0_0_0,
V2_0_1_0,
V2_1_0_0,
V2_1_1_0,
V2_2_0_0,
V2_2_1_0,
V2_2_2_0,
V2_3_0_0,
V2_3_1_0,
V2_4_0_0,
V2_4_1_0,
V2_5_0_0,
V2_5_1_0,
V2_6_0_0,
V2_6_1_0,
V2_6_2_0,
V2_7_0_0,
V2_7_1_0,
V2_8_0_0,
V2_8_1_0,
V3_0_0_0,
}
MinVersion = V0_8_2_0
Expand Down
50 changes: 49 additions & 1 deletion utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,58 @@ func TestVersionCompare(t *testing.T) {
if V0_8_2_1.IsAtLeast(V0_10_0_0) {
t.Error("0.8.2.1 >= 0.10.0.0")
}
if !V1_0_0_0.IsAtLeast(V0_9_0_0) {
t.Error("! 1.0.0.0 >= 0.9.0.0")
}
if V0_9_0_0.IsAtLeast(V1_0_0_0) {
t.Error("0.9.0.0 >= 1.0.0.0")
}
}

func TestVersionParsing(t *testing.T) {
validVersions := []string{"0.8.2.0", "0.8.2.1", "0.9.0.0", "0.10.2.0", "1.0.0"}
validVersions := []string{
"0.8.2.0",
"0.8.2.1",
"0.8.2.2",
"0.9.0.0",
"0.9.0.1",
"0.10.0.0",
"0.10.0.1",
"0.10.1.0",
"0.10.1.1",
"0.10.2.0",
"0.10.2.1",
"0.10.2.2",
"0.11.0.0",
"0.11.0.1",
"0.11.0.2",
"1.0.0",
"1.0.1",
"1.0.2",
"1.1.0",
"1.1.1",
"2.0.0",
"2.0.1",
"2.1.0",
"2.1.1",
"2.2.0",
"2.2.1",
"2.2.2",
"2.3.0",
"2.3.1",
"2.4.0",
"2.4.1",
"2.5.0",
"2.5.1",
"2.6.0",
"2.6.1",
"2.6.2",
"2.7.0",
"2.7.1",
"2.8.0",
"2.8.1",
"3.0.0",
}
for _, s := range validVersions {
v, err := ParseKafkaVersion(s)
if err != nil {
Expand Down

0 comments on commit 461b931

Please sign in to comment.