Skip to content

Commit

Permalink
New encoding/decoding options for sqs, kafka and kafacgo (Vonage#235)
Browse files Browse the repository at this point in the history
* Add message encoding and decoding options for SQS

* Add byte-slice encode/decode functions

* New kafka options to encode/decode data.

* Add kafkacgo encoding/decoding options

* Add Encrypt and Decrypt functions

* Update dependencies
  • Loading branch information
nicolaasuni-vonage authored Apr 18, 2024
1 parent 3549e13 commit 6dc8f50
Show file tree
Hide file tree
Showing 29 changed files with 1,012 additions and 42 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.84.2
1.85.0
8 changes: 5 additions & 3 deletions examples/service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.22.2
replace github.com/Vonage/gosrvlib => ../..

require (
github.com/Vonage/gosrvlib v1.84.2
github.com/Vonage/gosrvlib v1.85.0
github.com/golang/mock v1.6.0
github.com/jstemmer/go-junit-report v1.0.0
github.com/prometheus/client_golang v1.19.0
Expand All @@ -20,6 +20,8 @@ require (

require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go/auth v0.2.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.0 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/firestore v1.15.0 // indirect
cloud.google.com/go/longrunning v0.5.6 // indirect
Expand Down Expand Up @@ -96,7 +98,7 @@ require (
go.opentelemetry.io/otel/trace v1.25.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
Expand All @@ -105,7 +107,7 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/api v0.172.0 // indirect
google.golang.org/api v0.174.0 // indirect
google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
Expand Down
16 changes: 10 additions & 6 deletions examples/service/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ cloud.google.com/go/asset v1.18.1 h1:+NpxL5L53VY91EoJTHeGGXSWEUllf2hhXpCyTnSrd3Q
cloud.google.com/go/asset v1.18.1/go.mod h1:QXivw0mVqwrhZyuX6iqFbyfCdzYE9AFCJVG47Eh5dMM=
cloud.google.com/go/assuredworkloads v1.11.6 h1:3NlUes0xLN2kcSU24qQADFYsOaetCPg0HSA302AyV5s=
cloud.google.com/go/assuredworkloads v1.11.6/go.mod h1:1dlhWKocQorGYkspt+scx11kQCI9qVHOi1Au6Rw9srg=
cloud.google.com/go/auth v0.2.0 h1:y6oTcpMSbOcXbwYgUUrvI+mrQ2xbrcdpPgtVbCGTLTk=
cloud.google.com/go/auth v0.2.0/go.mod h1:+yb+oy3/P0geX6DLKlqiGHARGR6EX2GRtYCzWOCQSbU=
cloud.google.com/go/auth/oauth2adapt v0.2.0 h1:FR8zevgQwu+8CqiOT5r6xCmJa3pJC/wdXEEPF1OkNhA=
cloud.google.com/go/auth/oauth2adapt v0.2.0/go.mod h1:AfqujpDAlTfLfeCIl/HJZZlIxD8+nJoZ5e0x1IxGq5k=
cloud.google.com/go/automl v1.13.6 h1:NHBO5cjo2IgwaJ5qlez/iA35XI1db87PPlOB0Kjt5RM=
cloud.google.com/go/automl v1.13.6/go.mod h1:/0VtkKis6KhFJuPzi45e0E+e9AdQE09SNieChjJqU18=
cloud.google.com/go/baremetalsolution v1.2.5 h1:jCR4rnVsG6ocK6ngFr2Z6ugKZfTENmMZkiV6Ma2tEeE=
Expand Down Expand Up @@ -722,8 +726,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 h1:ESSUROHIBHg7USnszlcdmjBEwdMj9VUvU+OPk4yl2mc=
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
Expand Down Expand Up @@ -821,8 +825,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/api v0.172.0 h1:/1OcMZGPmW1rX2LCu2CmGUD1KXK1+pfzxotxyRUCCdk=
google.golang.org/api v0.172.0/go.mod h1:+fJZq6QXWfa9pXhnIzsjx4yI22d4aI9ZpLb58gvXjis=
google.golang.org/api v0.174.0 h1:zB1BWl7ocxfTea2aQ9mgdzXjnfPySllpPOskdnO+q34=
google.golang.org/api v0.174.0/go.mod h1:aC7tB6j0HR1Nl0ni5ghpx6iLasmAX78Zkh/wgxAAjLg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
Expand All @@ -834,8 +838,8 @@ google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be h1:g4aX8SUFA8V5F4L
google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be/go.mod h1:FeSdT5fk+lkxatqJP38MsUicGqHax5cLtmy/6TAuxO4=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be h1:Zz7rLWqp0ApfsR/l7+zSHhY3PMiH2xqgxlfYfAfNpoU=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be/go.mod h1:dvdCTIoAGbkWbcIKBniID56/7XHTt6WfxXNMxuziJ+w=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240318140521-94a12d6c2237 h1:BGtl5+MtFriTFllRl3QPEPWZrD8nVhSTONzTkSin3+c=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240318140521-94a12d6c2237/go.mod h1:IN9OQUXZ0xT+26MDwZL8fJcYw+y99b0eYPA2U15Jt8o=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa h1:wBkzraZsSqhj1M4L/nMrljUU6XasJkgHvUsq8oRGwF0=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa/go.mod h1:IN9OQUXZ0xT+26MDwZL8fJcYw+y99b0eYPA2U15Jt8o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ require (

require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go/auth v0.2.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.0 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/firestore v1.15.0 // indirect
cloud.google.com/go/longrunning v0.5.6 // indirect
Expand Down Expand Up @@ -122,14 +124,14 @@ require (
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/metric v1.25.0 // indirect
go.opentelemetry.io/otel/trace v1.25.0 // indirect
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/api v0.172.0 // indirect
google.golang.org/api v0.174.0 // indirect
google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
Expand Down
16 changes: 10 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ cloud.google.com/go/asset v1.18.1 h1:+NpxL5L53VY91EoJTHeGGXSWEUllf2hhXpCyTnSrd3Q
cloud.google.com/go/asset v1.18.1/go.mod h1:QXivw0mVqwrhZyuX6iqFbyfCdzYE9AFCJVG47Eh5dMM=
cloud.google.com/go/assuredworkloads v1.11.6 h1:3NlUes0xLN2kcSU24qQADFYsOaetCPg0HSA302AyV5s=
cloud.google.com/go/assuredworkloads v1.11.6/go.mod h1:1dlhWKocQorGYkspt+scx11kQCI9qVHOi1Au6Rw9srg=
cloud.google.com/go/auth v0.2.0 h1:y6oTcpMSbOcXbwYgUUrvI+mrQ2xbrcdpPgtVbCGTLTk=
cloud.google.com/go/auth v0.2.0/go.mod h1:+yb+oy3/P0geX6DLKlqiGHARGR6EX2GRtYCzWOCQSbU=
cloud.google.com/go/auth/oauth2adapt v0.2.0 h1:FR8zevgQwu+8CqiOT5r6xCmJa3pJC/wdXEEPF1OkNhA=
cloud.google.com/go/auth/oauth2adapt v0.2.0/go.mod h1:AfqujpDAlTfLfeCIl/HJZZlIxD8+nJoZ5e0x1IxGq5k=
cloud.google.com/go/automl v1.13.6 h1:NHBO5cjo2IgwaJ5qlez/iA35XI1db87PPlOB0Kjt5RM=
cloud.google.com/go/automl v1.13.6/go.mod h1:/0VtkKis6KhFJuPzi45e0E+e9AdQE09SNieChjJqU18=
cloud.google.com/go/baremetalsolution v1.2.5 h1:jCR4rnVsG6ocK6ngFr2Z6ugKZfTENmMZkiV6Ma2tEeE=
Expand Down Expand Up @@ -820,8 +824,8 @@ golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 h1:ESSUROHIBHg7USnszlcdmjBEwdMj9VUvU+OPk4yl2mc=
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
Expand Down Expand Up @@ -954,8 +958,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/api v0.172.0 h1:/1OcMZGPmW1rX2LCu2CmGUD1KXK1+pfzxotxyRUCCdk=
google.golang.org/api v0.172.0/go.mod h1:+fJZq6QXWfa9pXhnIzsjx4yI22d4aI9ZpLb58gvXjis=
google.golang.org/api v0.174.0 h1:zB1BWl7ocxfTea2aQ9mgdzXjnfPySllpPOskdnO+q34=
google.golang.org/api v0.174.0/go.mod h1:aC7tB6j0HR1Nl0ni5ghpx6iLasmAX78Zkh/wgxAAjLg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
Expand All @@ -969,8 +973,8 @@ google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be h1:g4aX8SUFA8V5F4L
google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be/go.mod h1:FeSdT5fk+lkxatqJP38MsUicGqHax5cLtmy/6TAuxO4=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be h1:Zz7rLWqp0ApfsR/l7+zSHhY3PMiH2xqgxlfYfAfNpoU=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be/go.mod h1:dvdCTIoAGbkWbcIKBniID56/7XHTt6WfxXNMxuziJ+w=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240318140521-94a12d6c2237 h1:BGtl5+MtFriTFllRl3QPEPWZrD8nVhSTONzTkSin3+c=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240318140521-94a12d6c2237/go.mod h1:IN9OQUXZ0xT+26MDwZL8fJcYw+y99b0eYPA2U15Jt8o=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa h1:wBkzraZsSqhj1M4L/nMrljUU6XasJkgHvUsq8oRGwF0=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa/go.mod h1:IN9OQUXZ0xT+26MDwZL8fJcYw+y99b0eYPA2U15Jt8o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
12 changes: 8 additions & 4 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ const (
)

type config struct {
sessionTimeout time.Duration
startOffset int64
sessionTimeout time.Duration
startOffset int64
messageEncodeFunc TEncodeFunc
messageDecodeFunc TDecodeFunc
}

func defaultConfig() *config {
return &config{
sessionTimeout: defaultSessionTimeout,
startOffset: kafka.LastOffset,
sessionTimeout: defaultSessionTimeout,
startOffset: kafka.LastOffset,
messageEncodeFunc: DefaultMessageEncodeFunc,
messageDecodeFunc: DefaultMessageDecodeFunc,
}
}
2 changes: 2 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ func Test_defaultConfig(t *testing.T) {
require.NotNil(t, cfg)
require.NotEmpty(t, cfg.sessionTimeout)
require.Equal(t, int64(-1), cfg.startOffset)
require.NotNil(t, cfg.messageEncodeFunc)
require.NotNil(t, cfg.messageDecodeFunc)
}
25 changes: 25 additions & 0 deletions pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package kafka

import (
"context"
"errors"
"fmt"

"github.com/Vonage/gosrvlib/pkg/typeutil"
"github.com/segmentio/kafka-go"
"go.uber.org/multierr"
)
Expand All @@ -12,6 +14,9 @@ const (
network = "tcp"
)

// TDecodeFunc is the type of function used to replace the default message decoding function used by ReceiveData().
type TDecodeFunc func(ctx context.Context, msg []byte, data any) error

type consumerClient interface {
ReadMessage(ctx context.Context) (kafka.Message, error)
Close() error
Expand All @@ -34,6 +39,10 @@ func NewConsumer(brokers []string, topic, groupID string, opts ...Option) (*Cons
applyOpt(cfg)
}

if cfg.messageDecodeFunc == nil {
return nil, errors.New("missing message decoding function")
}

params := kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
Expand Down Expand Up @@ -96,3 +105,19 @@ func (c *Consumer) HealthCheck(ctx context.Context) error {

return fmt.Errorf("unable to connect to Kafka: %w", errors)
}

// DefaultMessageDecodeFunc is the default function to decode a message for ReceiveData().
// The value underlying data must be a pointer to the correct type for the next data item received.
func DefaultMessageDecodeFunc(_ context.Context, msg []byte, data any) error {
return typeutil.ByteDecode(msg, data) //nolint:wrapcheck
}

// ReceiveData retrieves a message from the queue and extract its content in the data.
func (c *Consumer) ReceiveData(ctx context.Context, data any) error {
message, err := c.Receive(ctx)
if err != nil {
return err
}

return c.cfg.messageDecodeFunc(ctx, message, data)
}
109 changes: 109 additions & 0 deletions pkg/kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ func Test_NewConsumer(t *testing.T) {
groupID: "three",
wantErr: true,
},
{
name: "missing decoding function",
brokers: []string{"url1", "url2"},
topic: "topic1",
groupID: "one",
options: []Option{
WithMessageDecodeFunc(nil),
},
wantErr: true,
},
}

for _, tt := range testCases {
Expand Down Expand Up @@ -144,3 +154,102 @@ func Test_Consumer_HealthCheck(t *testing.T) {
err = consumer.HealthCheck(ctx)
require.NoError(t, err)
}

type consumerMock struct {
readMessage func(ctx context.Context) (kafka.Message, error)
close func() error
}

func (c consumerMock) ReadMessage(ctx context.Context) (kafka.Message, error) {
return c.readMessage(ctx)
}

func (c consumerMock) Close() error {
return c.close()
}

func TestReceiveData(t *testing.T) {
t.Parallel()

type TestData struct {
Alpha string
Beta int
}

tests := []struct {
name string
mock consumerClient
data TestData
wantErr bool
}{
{
name: "success",
mock: consumerMock{
readMessage: func(_ context.Context) (kafka.Message, error) {
return kafka.Message{
Value: []byte("Kf+BAwEBCFRlc3REYXRhAf+CAAECAQVBbHBoYQEMAAEEQmV0YQEEAAAAD/+CAQZhYmMxMjMB/gLtAA=="),
}, nil
},
close: func() error { return nil },
},
data: TestData{Alpha: "abc123", Beta: -375},
wantErr: false,
},
{
name: "empty",
mock: consumerMock{
readMessage: func(_ context.Context) (kafka.Message, error) { return kafka.Message{Value: []byte{}}, nil },
close: func() error { return nil },
},
wantErr: true,
},
{
name: "error",
mock: consumerMock{
readMessage: func(_ context.Context) (kafka.Message, error) { return kafka.Message{}, errors.New("error") },
close: func() error { return nil },
},
wantErr: true,
},
{
name: "invalid message",
mock: consumerMock{
readMessage: func(_ context.Context) (kafka.Message, error) {
return kafka.Message{
Value: []byte("你好世界"),
}, nil
},
close: func() error { return nil },
},
wantErr: true,
},
}

for _, tt := range tests {
tt := tt

t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx := context.TODO()
cli, err := NewConsumer([]string{"url1", "url2"}, "topic", "groupID")
require.NoError(t, err)
require.NotNil(t, cli)

cli.client = tt.mock

var data TestData

err = cli.ReceiveData(ctx, &data)
if tt.wantErr {
require.Error(t, err)

return
}

require.NoError(t, err)
require.Equal(t, tt.data.Alpha, data.Alpha)
require.Equal(t, tt.data.Beta, data.Beta)
})
}
}
17 changes: 17 additions & 0 deletions pkg/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,20 @@ func WithFirstOffset() Option {
c.startOffset = kafka.FirstOffset
}
}

// WithMessageEncodeFunc allow to replace DefaultMessageEncodeFunc.
// This function used by SendData() to encode the input data.
func WithMessageEncodeFunc(f TEncodeFunc) Option {
return func(c *config) {
c.messageEncodeFunc = f
}
}

// WithMessageDecodeFunc allow to replace DefaultMessageDecodeFunc().
// This function used by ReceiveData() to decode a message encoded with messageEncodeFunc to the provided data object.
// The value underlying data must be a pointer to the correct type for the next data item received.
func WithMessageDecodeFunc(f TDecodeFunc) Option {
return func(c *config) {
c.messageDecodeFunc = f
}
}
Loading

0 comments on commit 6dc8f50

Please sign in to comment.