From 86d6f390cf66ac0125394fe919a9a34da0e44223 Mon Sep 17 00:00:00 2001 From: Robin Date: Sat, 18 Nov 2017 03:51:46 +0100 Subject: [PATCH] add Describe/Create/DeleteAcls --- acl_bindings.go | 119 ++++++++++++++++++++++++++ acl_create_request.go | 76 +++++++++++++++++ acl_create_request_test.go | 34 ++++++++ acl_create_response.go | 88 +++++++++++++++++++ acl_create_response_test.go | 41 +++++++++ acl_delete_request.go | 48 +++++++++++ acl_delete_request_test.go | 69 +++++++++++++++ acl_delete_response.go | 155 ++++++++++++++++++++++++++++++++++ acl_delete_response_test.go | 38 +++++++++ acl_describe_request.go | 25 ++++++ acl_describe_request_test.go | 35 ++++++++ acl_describe_response.go | 80 ++++++++++++++++++ acl_describe_response_test.go | 45 ++++++++++ acl_filter.go | 61 +++++++++++++ acl_types.go | 42 +++++++++ broker.go | 33 ++++++++ request.go | 6 ++ request_test.go | 2 + 18 files changed, 997 insertions(+) create mode 100644 acl_bindings.go create mode 100644 acl_create_request.go create mode 100644 acl_create_request_test.go create mode 100644 acl_create_response.go create mode 100644 acl_create_response_test.go create mode 100644 acl_delete_request.go create mode 100644 acl_delete_request_test.go create mode 100644 acl_delete_response.go create mode 100644 acl_delete_response_test.go create mode 100644 acl_describe_request.go create mode 100644 acl_describe_request_test.go create mode 100644 acl_describe_response.go create mode 100644 acl_describe_response_test.go create mode 100644 acl_filter.go create mode 100644 acl_types.go diff --git a/acl_bindings.go b/acl_bindings.go new file mode 100644 index 000000000..51517359a --- /dev/null +++ b/acl_bindings.go @@ -0,0 +1,119 @@ +package sarama + +type Resource struct { + ResourceType AclResourceType + ResourceName string +} + +func (r *Resource) encode(pe packetEncoder) error { + pe.putInt8(int8(r.ResourceType)) + + if err := pe.putString(r.ResourceName); err != nil { + return err + } + + return nil +} + +func (r *Resource) decode(pd packetDecoder, version int16) (err error) { + resourceType, err := pd.getInt8() + if err != nil { + return err + } + r.ResourceType = AclResourceType(resourceType) + + if r.ResourceName, err = pd.getString(); err != nil { + return err + } + + return nil +} + +type Acl struct { + Principal string + Host string + Operation AclOperation + PermissionType AclPermissionType +} + +func (a *Acl) encode(pe packetEncoder) error { + if err := pe.putString(a.Principal); err != nil { + return err + } + + if err := pe.putString(a.Host); err != nil { + return err + } + + pe.putInt8(int8(a.Operation)) + pe.putInt8(int8(a.PermissionType)) + + return nil +} + +func (a *Acl) decode(pd packetDecoder, version int16) (err error) { + if a.Principal, err = pd.getString(); err != nil { + return err + } + + if a.Host, err = pd.getString(); err != nil { + return err + } + + operation, err := pd.getInt8() + if err != nil { + return err + } + a.Operation = AclOperation(operation) + + permissionType, err := pd.getInt8() + if err != nil { + return err + } + a.PermissionType = AclPermissionType(permissionType) + + return nil +} + +type ResourceAcls struct { + Resource + Acls []*Acl +} + +func (r *ResourceAcls) encode(pe packetEncoder) error { + if err := r.Resource.encode(pe); err != nil { + return err + } + + if err := pe.putArrayLength(len(r.Acls)); err != nil { + return err + } + for _, acl := range r.Acls { + if err := acl.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (r *ResourceAcls) decode(pd packetDecoder, version int16) error { + if err := r.Resource.decode(pd, version); err != nil { + return err + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + r.Acls = make([]*Acl, n) + for i := 0; i < n; i++ { + r.Acls[i] = new(Acl) + if err := r.Acls[i].decode(pd, version); err != nil { + return err + } + } + + return nil +} diff --git a/acl_create_request.go b/acl_create_request.go new file mode 100644 index 000000000..0b6ecbec3 --- /dev/null +++ b/acl_create_request.go @@ -0,0 +1,76 @@ +package sarama + +type CreateAclsRequest struct { + AclCreations []*AclCreation +} + +func (c *CreateAclsRequest) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(c.AclCreations)); err != nil { + return err + } + + for _, aclCreation := range c.AclCreations { + if err := aclCreation.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + c.AclCreations = make([]*AclCreation, n) + + for i := 0; i < n; i++ { + c.AclCreations[i] = new(AclCreation) + if err := c.AclCreations[i].decode(pd, version); err != nil { + return err + } + } + + return nil +} + +func (d *CreateAclsRequest) key() int16 { + return 30 +} + +func (d *CreateAclsRequest) version() int16 { + return 0 +} + +func (d *CreateAclsRequest) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +type AclCreation struct { + Resource + Acl +} + +func (a *AclCreation) encode(pe packetEncoder) error { + if err := a.Resource.encode(pe); err != nil { + return err + } + if err := a.Acl.encode(pe); err != nil { + return err + } + + return nil +} + +func (a *AclCreation) decode(pd packetDecoder, version int16) (err error) { + if err := a.Resource.decode(pd, version); err != nil { + return err + } + if err := a.Acl.decode(pd, version); err != nil { + return err + } + + return nil +} diff --git a/acl_create_request_test.go b/acl_create_request_test.go new file mode 100644 index 000000000..fb4b35c16 --- /dev/null +++ b/acl_create_request_test.go @@ -0,0 +1,34 @@ +package sarama + +import "testing" + +var ( + aclCreateRequest = []byte{ + 0, 0, 0, 1, + 3, // resource type = group + 0, 5, 'g', 'r', 'o', 'u', 'p', + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 2, // all + 2, // deny + } +) + +func TestCreateAclsRequest(t *testing.T) { + req := &CreateAclsRequest{ + AclCreations: []*AclCreation{{ + Resource: Resource{ + ResourceType: AclResourceGroup, + ResourceName: "group", + }, + Acl: Acl{ + Principal: "principal", + Host: "host", + Operation: AclOperationAll, + PermissionType: AclPermissionDeny, + }}, + }, + } + + testRequest(t, "create request", req, aclCreateRequest) +} diff --git a/acl_create_response.go b/acl_create_response.go new file mode 100644 index 000000000..8a56f3573 --- /dev/null +++ b/acl_create_response.go @@ -0,0 +1,88 @@ +package sarama + +import "time" + +type CreateAclsResponse struct { + ThrottleTime time.Duration + AclCreationResponses []*AclCreationResponse +} + +func (c *CreateAclsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(c.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(c.AclCreationResponses)); err != nil { + return err + } + + for _, aclCreationResponse := range c.AclCreationResponses { + if err := aclCreationResponse.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (c *CreateAclsResponse) decode(pd packetDecoder, version int16) (err error) { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + c.AclCreationResponses = make([]*AclCreationResponse, n) + for i := 0; i < n; i++ { + c.AclCreationResponses[i] = new(AclCreationResponse) + if err := c.AclCreationResponses[i].decode(pd, version); err != nil { + return err + } + } + + return nil +} + +func (d *CreateAclsResponse) key() int16 { + return 30 +} + +func (d *CreateAclsResponse) version() int16 { + return 0 +} + +func (d *CreateAclsResponse) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +type AclCreationResponse struct { + Err KError + ErrMsg *string +} + +func (a *AclCreationResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(a.Err)) + + if err := pe.putNullableString(a.ErrMsg); err != nil { + return err + } + + return nil +} + +func (a *AclCreationResponse) decode(pd packetDecoder, version int16) (err error) { + kerr, err := pd.getInt16() + if err != nil { + return err + } + a.Err = KError(kerr) + + if a.ErrMsg, err = pd.getNullableString(); err != nil { + return err + } + + return nil +} diff --git a/acl_create_response_test.go b/acl_create_response_test.go new file mode 100644 index 000000000..65b934d9a --- /dev/null +++ b/acl_create_response_test.go @@ -0,0 +1,41 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + createResponseWithError = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, + 0, 42, + 0, 5, 'e', 'r', 'r', 'o', 'r', + } + + createResponseArray = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 2, + 0, 42, + 0, 5, 'e', 'r', 'r', 'o', 'r', + 0, 0, + 255, 255, + } +) + +func TestCreateAclsResponse(t *testing.T) { + errmsg := "error" + resp := &CreateAclsResponse{ + ThrottleTime: 100 * time.Millisecond, + AclCreationResponses: []*AclCreationResponse{{ + Err: ErrInvalidRequest, + ErrMsg: &errmsg, + }}, + } + + testResponse(t, "response with error", resp, createResponseWithError) + + resp.AclCreationResponses = append(resp.AclCreationResponses, new(AclCreationResponse)) + + testResponse(t, "response array", resp, createResponseArray) +} diff --git a/acl_delete_request.go b/acl_delete_request.go new file mode 100644 index 000000000..4133dceab --- /dev/null +++ b/acl_delete_request.go @@ -0,0 +1,48 @@ +package sarama + +type DeleteAclsRequest struct { + Filters []*AclFilter +} + +func (d *DeleteAclsRequest) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(d.Filters)); err != nil { + return err + } + + for _, filter := range d.Filters { + if err := filter.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + d.Filters = make([]*AclFilter, n) + for i := 0; i < n; i++ { + d.Filters[i] = new(AclFilter) + if err := d.Filters[i].decode(pd, version); err != nil { + return err + } + } + + return nil +} + +func (d *DeleteAclsRequest) key() int16 { + return 31 +} + +func (d *DeleteAclsRequest) version() int16 { + return 0 +} + +func (d *DeleteAclsRequest) requiredVersion() KafkaVersion { + return V0_11_0_0 +} diff --git a/acl_delete_request_test.go b/acl_delete_request_test.go new file mode 100644 index 000000000..2efdcb48e --- /dev/null +++ b/acl_delete_request_test.go @@ -0,0 +1,69 @@ +package sarama + +import "testing" + +var ( + aclDeleteRequestNulls = []byte{ + 0, 0, 0, 1, + 1, + 255, 255, + 255, 255, + 255, 255, + 11, + 3, + } + + aclDeleteRequest = []byte{ + 0, 0, 0, 1, + 1, // any + 0, 6, 'f', 'i', 'l', 't', 'e', 'r', + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 4, // write + 3, // allow + } + + aclDeleteRequestArray = []byte{ + 0, 0, 0, 2, + 1, + 0, 6, 'f', 'i', 'l', 't', 'e', 'r', + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 4, // write + 3, // allow + 2, + 0, 5, 't', 'o', 'p', 'i', 'c', + 255, 255, + 255, 255, + 6, + 2, + } +) + +func TestDeleteAclsRequest(t *testing.T) { + req := &DeleteAclsRequest{ + Filters: []*AclFilter{{ + ResourceType: AclResourceAny, + Operation: AclOperationAlterConfigs, + PermissionType: AclPermissionAllow, + }}, + } + + testRequest(t, "delete request nulls", req, aclDeleteRequestNulls) + + req.Filters[0].ResourceName = nullString("filter") + req.Filters[0].Principal = nullString("principal") + req.Filters[0].Host = nullString("host") + req.Filters[0].Operation = AclOperationWrite + + testRequest(t, "delete request", req, aclDeleteRequest) + + req.Filters = append(req.Filters, &AclFilter{ + ResourceType: AclResourceTopic, + ResourceName: nullString("topic"), + Operation: AclOperationDelete, + PermissionType: AclPermissionDeny, + }) + + testRequest(t, "delete request array", req, aclDeleteRequestArray) +} diff --git a/acl_delete_response.go b/acl_delete_response.go new file mode 100644 index 000000000..b5e1c45eb --- /dev/null +++ b/acl_delete_response.go @@ -0,0 +1,155 @@ +package sarama + +import "time" + +type DeleteAclsResponse struct { + ThrottleTime time.Duration + FilterResponses []*FilterResponse +} + +func (a *DeleteAclsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(a.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(a.FilterResponses)); err != nil { + return err + } + + for _, filterResponse := range a.FilterResponses { + if err := filterResponse.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (a *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + n, err := pd.getArrayLength() + if err != nil { + return err + } + a.FilterResponses = make([]*FilterResponse, n) + + for i := 0; i < n; i++ { + a.FilterResponses[i] = new(FilterResponse) + if err := a.FilterResponses[i].decode(pd, version); err != nil { + return err + } + } + + return nil +} + +func (d *DeleteAclsResponse) key() int16 { + return 31 +} + +func (d *DeleteAclsResponse) version() int16 { + return 0 +} + +func (d *DeleteAclsResponse) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +type FilterResponse struct { + Err KError + ErrMsg *string + MatchingAcls []*MatchingAcl +} + +func (f *FilterResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(f.Err)) + if err := pe.putNullableString(f.ErrMsg); err != nil { + return err + } + + if err := pe.putArrayLength(len(f.MatchingAcls)); err != nil { + return err + } + for _, matchingAcl := range f.MatchingAcls { + if err := matchingAcl.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) { + kerr, err := pd.getInt16() + if err != nil { + return err + } + f.Err = KError(kerr) + + if f.ErrMsg, err = pd.getNullableString(); err != nil { + return err + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + f.MatchingAcls = make([]*MatchingAcl, n) + for i := 0; i < n; i++ { + f.MatchingAcls[i] = new(MatchingAcl) + if err := f.MatchingAcls[i].decode(pd, version); err != nil { + return err + } + } + + return nil +} + +type MatchingAcl struct { + Err KError + ErrMsg *string + Resource + Acl +} + +func (m *MatchingAcl) encode(pe packetEncoder) error { + pe.putInt16(int16(m.Err)) + if err := pe.putNullableString(m.ErrMsg); err != nil { + return err + } + + if err := m.Resource.encode(pe); err != nil { + return err + } + + if err := m.Acl.encode(pe); err != nil { + return err + } + + return nil +} + +func (m *MatchingAcl) decode(pd packetDecoder, version int16) (err error) { + kerr, err := pd.getInt16() + if err != nil { + return err + } + m.Err = KError(kerr) + + if m.ErrMsg, err = pd.getNullableString(); err != nil { + return err + } + + if err := m.Resource.decode(pd, version); err != nil { + return err + } + + if err := m.Acl.decode(pd, version); err != nil { + return err + } + + return nil +} diff --git a/acl_delete_response_test.go b/acl_delete_response_test.go new file mode 100644 index 000000000..0d9dea684 --- /dev/null +++ b/acl_delete_response_test.go @@ -0,0 +1,38 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + deleteAclsResponse = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, + 0, 0, // no error + 255, 255, // no error message + 0, 0, 0, 1, // 1 matching acl + 0, 0, // no error + 255, 255, // no error message + 2, // resource type + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 4, + 3, + } +) + +func TestDeleteAclsResponse(t *testing.T) { + resp := &DeleteAclsResponse{ + ThrottleTime: 100 * time.Millisecond, + FilterResponses: []*FilterResponse{{ + MatchingAcls: []*MatchingAcl{{ + Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "topic"}, + Acl: Acl{Principal: "principal", Host: "host", Operation: AclOperationWrite, PermissionType: AclPermissionAllow}, + }}, + }}, + } + + testResponse(t, "", resp, deleteAclsResponse) +} diff --git a/acl_describe_request.go b/acl_describe_request.go new file mode 100644 index 000000000..02a5a1f0e --- /dev/null +++ b/acl_describe_request.go @@ -0,0 +1,25 @@ +package sarama + +type DescribeAclsRequest struct { + AclFilter +} + +func (d *DescribeAclsRequest) encode(pe packetEncoder) error { + return d.AclFilter.encode(pe) +} + +func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) { + return d.AclFilter.decode(pd, version) +} + +func (d *DescribeAclsRequest) key() int16 { + return 29 +} + +func (d *DescribeAclsRequest) version() int16 { + return 0 +} + +func (d *DescribeAclsRequest) requiredVersion() KafkaVersion { + return V0_11_0_0 +} diff --git a/acl_describe_request_test.go b/acl_describe_request_test.go new file mode 100644 index 000000000..3af14c616 --- /dev/null +++ b/acl_describe_request_test.go @@ -0,0 +1,35 @@ +package sarama + +import ( + "testing" +) + +var ( + aclDescribeRequest = []byte{ + 2, // resource type + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 5, // acl operation + 3, // acl permission type + } +) + +func TestAclDescribeRequest(t *testing.T) { + resourcename := "topic" + principal := "principal" + host := "host" + + req := &DescribeAclsRequest{ + AclFilter{ + ResourceType: AclResourceTopic, + ResourceName: &resourcename, + Principal: &principal, + Host: &host, + Operation: AclOperationCreate, + PermissionType: AclPermissionAllow, + }, + } + + testRequest(t, "", req, aclDescribeRequest) +} diff --git a/acl_describe_response.go b/acl_describe_response.go new file mode 100644 index 000000000..5bc9497f4 --- /dev/null +++ b/acl_describe_response.go @@ -0,0 +1,80 @@ +package sarama + +import "time" + +type DescribeAclsResponse struct { + ThrottleTime time.Duration + Err KError + ErrMsg *string + ResourceAcls []*ResourceAcls +} + +func (d *DescribeAclsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(d.ThrottleTime / time.Millisecond)) + pe.putInt16(int16(d.Err)) + + if err := pe.putNullableString(d.ErrMsg); err != nil { + return err + } + + if err := pe.putArrayLength(len(d.ResourceAcls)); err != nil { + return err + } + + for _, resourceAcl := range d.ResourceAcls { + if err := resourceAcl.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (d *DescribeAclsResponse) decode(pd packetDecoder, version int16) (err error) { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + kerr, err := pd.getInt16() + if err != nil { + return err + } + d.Err = KError(kerr) + + errmsg, err := pd.getString() + if err != nil { + return err + } + if errmsg != "" { + d.ErrMsg = &errmsg + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + d.ResourceAcls = make([]*ResourceAcls, n) + + for i := 0; i < n; i++ { + d.ResourceAcls[i] = new(ResourceAcls) + if err := d.ResourceAcls[i].decode(pd, version); err != nil { + return err + } + } + + return nil +} + +func (d *DescribeAclsResponse) key() int16 { + return 29 +} + +func (d *DescribeAclsResponse) version() int16 { + return 0 +} + +func (d *DescribeAclsResponse) requiredVersion() KafkaVersion { + return V0_11_0_0 +} diff --git a/acl_describe_response_test.go b/acl_describe_response_test.go new file mode 100644 index 000000000..f0652cfee --- /dev/null +++ b/acl_describe_response_test.go @@ -0,0 +1,45 @@ +package sarama + +import ( + "testing" + "time" +) + +var aclDescribeResponseError = []byte{ + 0, 0, 0, 100, + 0, 8, // error + 0, 5, 'e', 'r', 'r', 'o', 'r', + 0, 0, 0, 1, // 1 resource + 2, // cluster type + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 1, // 1 acl + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 4, // write + 3, // allow +} + +func TestAclDescribeResponse(t *testing.T) { + errmsg := "error" + resp := &DescribeAclsResponse{ + ThrottleTime: 100 * time.Millisecond, + Err: ErrBrokerNotAvailable, + ErrMsg: &errmsg, + ResourceAcls: []*ResourceAcls{{ + Resource: Resource{ + ResourceName: "topic", + ResourceType: AclResourceTopic, + }, + Acls: []*Acl{ + { + Principal: "principal", + Host: "host", + Operation: AclOperationWrite, + PermissionType: AclPermissionAllow, + }, + }, + }}, + } + + testResponse(t, "describe", resp, aclDescribeResponseError) +} diff --git a/acl_filter.go b/acl_filter.go new file mode 100644 index 000000000..970635421 --- /dev/null +++ b/acl_filter.go @@ -0,0 +1,61 @@ +package sarama + +type AclFilter struct { + ResourceType AclResourceType + ResourceName *string + Principal *string + Host *string + Operation AclOperation + PermissionType AclPermissionType +} + +func (a *AclFilter) encode(pe packetEncoder) error { + pe.putInt8(int8(a.ResourceType)) + if err := pe.putNullableString(a.ResourceName); err != nil { + return err + } + if err := pe.putNullableString(a.Principal); err != nil { + return err + } + if err := pe.putNullableString(a.Host); err != nil { + return err + } + pe.putInt8(int8(a.Operation)) + pe.putInt8(int8(a.PermissionType)) + + return nil +} + +func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) { + resourceType, err := pd.getInt8() + if err != nil { + return err + } + a.ResourceType = AclResourceType(resourceType) + + if a.ResourceName, err = pd.getNullableString(); err != nil { + return err + } + + if a.Principal, err = pd.getNullableString(); err != nil { + return err + } + + if a.Host, err = pd.getNullableString(); err != nil { + return err + } + + operation, err := pd.getInt8() + if err != nil { + return err + } + a.Operation = AclOperation(operation) + + permissionType, err := pd.getInt8() + if err != nil { + return err + } + a.PermissionType = AclPermissionType(permissionType) + + return nil +} diff --git a/acl_types.go b/acl_types.go new file mode 100644 index 000000000..19da6f2f4 --- /dev/null +++ b/acl_types.go @@ -0,0 +1,42 @@ +package sarama + +type AclOperation int + +// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java +const ( + AclOperationUnknown AclOperation = 0 + AclOperationAny AclOperation = 1 + AclOperationAll AclOperation = 2 + AclOperationRead AclOperation = 3 + AclOperationWrite AclOperation = 4 + AclOperationCreate AclOperation = 5 + AclOperationDelete AclOperation = 6 + AclOperationAlter AclOperation = 7 + AclOperationDescribe AclOperation = 8 + AclOperationClusterAction AclOperation = 9 + AclOperationDescribeConfigs AclOperation = 10 + AclOperationAlterConfigs AclOperation = 11 + AclOperationIdempotentWrite AclOperation = 12 +) + +type AclPermissionType int + +// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java +const ( + AclPermissionUnknown AclPermissionType = 0 + AclPermissionAny AclPermissionType = 1 + AclPermissionDeny AclPermissionType = 2 + AclPermissionAllow AclPermissionType = 3 +) + +type AclResourceType int + +// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java +const ( + AclResourceUnknown AclResourceType = 0 + AclResourceAny AclResourceType = 1 + AclResourceTopic AclResourceType = 2 + AclResourceGroup AclResourceType = 3 + AclResourceCluster AclResourceType = 4 + AclResourceTransactionalID AclResourceType = 5 +) diff --git a/broker.go b/broker.go index e04c7ddc2..93c28ba5e 100644 --- a/broker.go +++ b/broker.go @@ -395,6 +395,39 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon return response, nil } +func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) { + response := new(DescribeAclsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) { + response := new(CreateAclsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) { + response := new(DeleteAclsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/request.go b/request.go index 282932ed7..1f5d57fae 100644 --- a/request.go +++ b/request.go @@ -118,6 +118,12 @@ func allocateBody(key, version int16) protocolBody { return &CreateTopicsRequest{} case 20: return &DeleteTopicsRequest{} + case 29: + return &DescribeAclsRequest{} + case 30: + return &CreateAclsRequest{} + case 31: + return &DeleteAclsRequest{} case 37: return &CreatePartitionsRequest{} } diff --git a/request_test.go b/request_test.go index bd9cef4eb..ba830d613 100644 --- a/request_test.go +++ b/request_test.go @@ -96,3 +96,5 @@ func testResponse(t *testing.T, name string, res protocolBody, expected []byte) t.Errorf("Decoded response does not match the encoded one\nencoded: %#v\ndecoded: %#v", res, decoded) } } + +func nullString(s string) *string { return &s }