Skip to content

Commit

Permalink
add Describe/Create/DeleteAcls
Browse files Browse the repository at this point in the history
  • Loading branch information
Robin committed Dec 21, 2017
1 parent 1fcddd9 commit 86d6f39
Show file tree
Hide file tree
Showing 18 changed files with 997 additions and 0 deletions.
119 changes: 119 additions & 0 deletions acl_bindings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package sarama

type Resource struct {
ResourceType AclResourceType
ResourceName string
}

func (r *Resource) encode(pe packetEncoder) error {
pe.putInt8(int8(r.ResourceType))

if err := pe.putString(r.ResourceName); err != nil {
return err
}

return nil
}

func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
resourceType, err := pd.getInt8()
if err != nil {
return err
}
r.ResourceType = AclResourceType(resourceType)

if r.ResourceName, err = pd.getString(); err != nil {
return err
}

return nil
}

type Acl struct {
Principal string
Host string
Operation AclOperation
PermissionType AclPermissionType
}

func (a *Acl) encode(pe packetEncoder) error {
if err := pe.putString(a.Principal); err != nil {
return err
}

if err := pe.putString(a.Host); err != nil {
return err
}

pe.putInt8(int8(a.Operation))
pe.putInt8(int8(a.PermissionType))

return nil
}

func (a *Acl) decode(pd packetDecoder, version int16) (err error) {
if a.Principal, err = pd.getString(); err != nil {
return err
}

if a.Host, err = pd.getString(); err != nil {
return err
}

operation, err := pd.getInt8()
if err != nil {
return err
}
a.Operation = AclOperation(operation)

permissionType, err := pd.getInt8()
if err != nil {
return err
}
a.PermissionType = AclPermissionType(permissionType)

return nil
}

type ResourceAcls struct {
Resource
Acls []*Acl
}

func (r *ResourceAcls) encode(pe packetEncoder) error {
if err := r.Resource.encode(pe); err != nil {
return err
}

if err := pe.putArrayLength(len(r.Acls)); err != nil {
return err
}
for _, acl := range r.Acls {
if err := acl.encode(pe); err != nil {
return err
}
}

return nil
}

func (r *ResourceAcls) decode(pd packetDecoder, version int16) error {
if err := r.Resource.decode(pd, version); err != nil {
return err
}

n, err := pd.getArrayLength()
if err != nil {
return err
}

r.Acls = make([]*Acl, n)
for i := 0; i < n; i++ {
r.Acls[i] = new(Acl)
if err := r.Acls[i].decode(pd, version); err != nil {
return err
}
}

return nil
}
76 changes: 76 additions & 0 deletions acl_create_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package sarama

type CreateAclsRequest struct {
AclCreations []*AclCreation
}

func (c *CreateAclsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(c.AclCreations)); err != nil {
return err
}

for _, aclCreation := range c.AclCreations {
if err := aclCreation.encode(pe); err != nil {
return err
}
}

return nil
}

func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
}

c.AclCreations = make([]*AclCreation, n)

for i := 0; i < n; i++ {
c.AclCreations[i] = new(AclCreation)
if err := c.AclCreations[i].decode(pd, version); err != nil {
return err
}
}

return nil
}

func (d *CreateAclsRequest) key() int16 {
return 30
}

func (d *CreateAclsRequest) version() int16 {
return 0
}

func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}

type AclCreation struct {
Resource
Acl
}

func (a *AclCreation) encode(pe packetEncoder) error {
if err := a.Resource.encode(pe); err != nil {
return err
}
if err := a.Acl.encode(pe); err != nil {
return err
}

return nil
}

func (a *AclCreation) decode(pd packetDecoder, version int16) (err error) {
if err := a.Resource.decode(pd, version); err != nil {
return err
}
if err := a.Acl.decode(pd, version); err != nil {
return err
}

return nil
}
34 changes: 34 additions & 0 deletions acl_create_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sarama

import "testing"

var (
aclCreateRequest = []byte{
0, 0, 0, 1,
3, // resource type = group
0, 5, 'g', 'r', 'o', 'u', 'p',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
2, // all
2, // deny
}
)

func TestCreateAclsRequest(t *testing.T) {
req := &CreateAclsRequest{
AclCreations: []*AclCreation{{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
}},
},
}

testRequest(t, "create request", req, aclCreateRequest)
}
88 changes: 88 additions & 0 deletions acl_create_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package sarama

import "time"

type CreateAclsResponse struct {
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
}

func (c *CreateAclsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(c.ThrottleTime / time.Millisecond))

if err := pe.putArrayLength(len(c.AclCreationResponses)); err != nil {
return err
}

for _, aclCreationResponse := range c.AclCreationResponses {
if err := aclCreationResponse.encode(pe); err != nil {
return err
}
}

return nil
}

func (c *CreateAclsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

n, err := pd.getArrayLength()
if err != nil {
return err
}

c.AclCreationResponses = make([]*AclCreationResponse, n)
for i := 0; i < n; i++ {
c.AclCreationResponses[i] = new(AclCreationResponse)
if err := c.AclCreationResponses[i].decode(pd, version); err != nil {
return err
}
}

return nil
}

func (d *CreateAclsResponse) key() int16 {
return 30
}

func (d *CreateAclsResponse) version() int16 {
return 0
}

func (d *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

type AclCreationResponse struct {
Err KError
ErrMsg *string
}

func (a *AclCreationResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(a.Err))

if err := pe.putNullableString(a.ErrMsg); err != nil {
return err
}

return nil
}

func (a *AclCreationResponse) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
a.Err = KError(kerr)

if a.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}

return nil
}
41 changes: 41 additions & 0 deletions acl_create_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sarama

import (
"testing"
"time"
)

var (
createResponseWithError = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 42,
0, 5, 'e', 'r', 'r', 'o', 'r',
}

createResponseArray = []byte{
0, 0, 0, 100,
0, 0, 0, 2,
0, 42,
0, 5, 'e', 'r', 'r', 'o', 'r',
0, 0,
255, 255,
}
)

func TestCreateAclsResponse(t *testing.T) {
errmsg := "error"
resp := &CreateAclsResponse{
ThrottleTime: 100 * time.Millisecond,
AclCreationResponses: []*AclCreationResponse{{
Err: ErrInvalidRequest,
ErrMsg: &errmsg,
}},
}

testResponse(t, "response with error", resp, createResponseWithError)

resp.AclCreationResponses = append(resp.AclCreationResponses, new(AclCreationResponse))

testResponse(t, "response array", resp, createResponseArray)
}
Loading

0 comments on commit 86d6f39

Please sign in to comment.