diff --git a/README.md b/README.md index d8cbae5..5aeba80 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,53 @@ ## Description -This is a commandline utility to send and receive messages to Azure servicebus queues. -Supports connection only via connection string. +Simple commandline utility to send and receive messages to/from +[Azure servicebus](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview) +queues. + +Limitations: + - Only queues are supported. + - Authentication is supported only with connection string. ## Download Download a Linux/Windows binary here: https://github.com/jakubneubauer/azure-sb-cli/releases -## Usage + +## Example + +### Using a queue without sessions support +```shell + +# Connection string and session-less queue name - variables used in the +# rest of the example +$ CONN_STR="*** YOUR CONNECTION STRING ***" +$ QUEUE_NAME = "your queue name" + +# Send a message +$ echo "Hello world!" | ./azure-sb-cli send -c "$CONN_STR" -q "$QUEUE_NAME" + +# Receive message +$ azure-sb-cli receive -c "$CONN_STR" -q $QUEUE_NAME +Hello world! +``` + +### Communicating with queue using sessions +```shell + +# Connection string and session-less queue name - variables used in the +# rest of the example +$ CONN_STR="*** YOUR CONNECTION STRING ***" +$ QUEUE_NAME = "your queue name" + +# Send a message +$ echo "Hello world!" | ./azure-sb-cli send -c "$CONN_STR" -q "$QUEUE_NAME" -s "session-12345" + +# Receive message: +# - using empty string as a session ID will receive message from any session. +# - The message is prefixed with session ID in the output +$ azure-sb-cli receive -c "$CONN_STR" -q $QUEUE_NAME -p -s "" +session-12345:Hello world! +``` + +## Documentation ``` ./azure-sb-cli -h Usage: ./azure-sb-cli @@ -32,3 +74,9 @@ Receive options: Send option: -i Correlation ID for sent messages ``` + +# Further development +Ideas to implement: +- Support Correlation ID for `receive` operation. +- Support for servicebus topics. +- Batch send/receive for better performance. diff --git a/build.sh b/build.sh index 91e73d1..eb7751a 100755 --- a/build.sh +++ b/build.sh @@ -1,10 +1,14 @@ #!/bin/bash -VERSION="0.4-preview" +VERSION="0.4" DATE=`date -u '+%Y-%m-%d_%H:%M:%S'` build() { go build -o azure-sb-cli$EXT -ldflags "-X main.buildVersion=$VERSION -X main.buildDate=$DATE" main.go } -build + +# I use linux => this is the Linux build ;-) +GOOS=linux GOARCH=amd64 EXT="" build + +# Windows binary build GOOS=windows GOARCH=386 EXT=.exe build diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f764762 --- /dev/null +++ b/go.mod @@ -0,0 +1,12 @@ +module github.com/jakubneubauer/azure-sb-cli + +go 1.18 + +require github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.0 + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect + golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect + golang.org/x/text v0.3.7 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b8903df --- /dev/null +++ b/go.sum @@ -0,0 +1,25 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.0 h1:ebO2jmZyctLSMBTvjsxZv/Ml3rGsvnJHUImVWotBl7I= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.0/go.mod h1:LH9XQnMr2ZYxQdVdCrzLO9mxeDyrDFa6wbSI3x5zCZk= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 h1:Tgea0cVUD0ivh5ADBX4WwuI12DUd2to3nCYe2eayMIw= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= diff --git a/main.go b/main.go index d1cba0a..0504ad3 100644 --- a/main.go +++ b/main.go @@ -3,10 +3,10 @@ package main import ( "bufio" "context" + "errors" "flag" "fmt" - "github.com/Azure/azure-service-bus-go" - "github.com/Azure/go-amqp" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "log" "os" ) @@ -20,77 +20,28 @@ var logDebug = false var prefixMsgWithSessionId = false var correlationId = "" -type MySessionHandler struct { - messageSession *servicebus.MessageSession - received bool -} - -// Start is called when a new session is started -func (sh *MySessionHandler) Start(ms *servicebus.MessageSession) error { - sh.messageSession = ms - debug("Begin message session:", strPtroToString(ms.SessionID())) - return nil -} - -// Handle is called when a new session message is received -func (sh *MySessionHandler) Handle(ctx context.Context, msg *servicebus.Message) error { - if msg.SessionID != nil && prefixMsgWithSessionId { - fmt.Print(*msg.SessionID + ":") - } - fmt.Println(string(msg.Data)) - - // If we wouldn't close the message session here, the call of queueSession.receiveOne would be blocked - // and we would receive another message during it. It would be different way of using the servicebus API. - if sh.messageSession != nil { - debug("Closing message session: " + strPtroToString(sh.messageSession.SessionID())) - sh.messageSession.Close() - } - sh.received = true - return msg.Complete(ctx) -} - -// End is called when the message session is closed. Service Bus will not automatically end your message session. Be -// sure to know when to terminate your own session. -func (sh *MySessionHandler) End() { - debug("End message session:", strPtroToString(sh.messageSession.SessionID())) -} - -func send(ctx context.Context, q *servicebus.Queue, sessionId *string) { - if *sessionId == NullStr { - sendNoSession(ctx, q) - } else { - if *sessionId == "" { - sessionId = nil - } - sendSession(ctx, q, sessionId) - } -} - -func sendNoSession(ctx context.Context, q *servicebus.Queue) { - scanner := bufio.NewScanner(os.Stdin) - for scanner.Scan() { - err := q.Send(ctx, createMsgFromString(scanner.Text())) - if err != nil { - fatal("Cannot send message:", err) - } +func send(ctx context.Context, client *azservicebus.Client, sessionId *string, queueName *string) { + if *sessionId == "" { + sessionId = nil } - if err := scanner.Err(); err != nil { - fatal("Cannot read standard input:", err) + debug("Opening sender") + sender, err := client.NewSender(*queueName, nil) + if err != nil { + fatal("Cannot create sender:", err) } -} - -func sendSession(ctx context.Context, q *servicebus.Queue, sessionId *string) { - debug("Opening session", strPtroToString(sessionId)) - session := q.NewSession(sessionId) defer func() { - debug("Closing session:", strPtroToString(sessionId)) - if err := session.Close(ctx); err != nil { - debug("Cannot close session:", err) - } + debug("Closing sender") + sender.Close(ctx) }() + scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { - err := session.Send(ctx, createMsgFromString(scanner.Text())) + msg := createMsgFromString(scanner.Text()) + if sessionId != nil && *sessionId != NullStr { + msg.SessionID = sessionId + } + debug("Sending message") + err := sender.SendMessage(ctx, msg, nil) if err != nil { fatal("Cannot send message:", err) } @@ -100,103 +51,147 @@ func sendSession(ctx context.Context, q *servicebus.Queue, sessionId *string) { } } -func receive(ctx context.Context, q *servicebus.Queue, sessionId *string, count int) { +func receive(ctx context.Context, client *azservicebus.Client, sessionId *string, count int, queueName *string) { if sessionId != nil && *sessionId != NullStr && *sessionId != "" { - receiveMoreSession(ctx, q, sessionId, count) + receiveMoreSession(ctx, client, sessionId, queueName, count) } else { - for i := 0; (count < 0 || i < count) && receiveOne(ctx, q, sessionId); i++ { + for i := 0; (count < 0 || i < count) && receiveOne(ctx, client, sessionId, queueName); i++ { } } } -func receiveOne(ctx context.Context, q *servicebus.Queue, sessionId *string) bool { +func receiveOne(ctx context.Context, client *azservicebus.Client, sessionId *string, queueName *string) bool { if *sessionId == NullStr { - return receiveOneNoSession(ctx, q) + return receiveOneNoSession(ctx, client, queueName) } else { if *sessionId == "" { sessionId = nil } - return receiveOneSession(ctx, q, sessionId) + return receiveMoreSession(ctx, client, sessionId, queueName, 1) } } -func receiveOneNoSession(ctx context.Context, q *servicebus.Queue) bool { - sh := new(MySessionHandler) - err := q.ReceiveOne(ctx, sh) +func receiveOneNoSession(ctx context.Context, client *azservicebus.Client, queueName *string) bool { + debug("Opening receiver") + receiver, err := client.NewReceiverForQueue( + *queueName, + nil, + ) + defer func() { + debug("Closing receiver") + receiver.Close(ctx) + }() + if err != nil { - if amqpErr, ok := err.(*amqp.Error); ok { - if amqpErr.Condition == "com.microsoft:timeout" { - debug("Timeout receiving message", err) - return true - } - } - fatal("Cannot receive:", err) + fatal("Cannot create receiver:", err) } - return sh.received -} -func receiveOneSession(ctx context.Context, q *servicebus.Queue, sessionId *string) bool { - debug("Opening queue session", strPtroToString(sessionId)) - queueSession := q.NewSession(sessionId) - defer func() { - debug("Closing queue session", strPtroToString(sessionId)) - if err := queueSession.Close(ctx); err != nil { - debug("Cannot close queue session:", err) + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + + for _, msg := range messages { + if msg.SessionID != nil && prefixMsgWithSessionId { + fmt.Print(*msg.SessionID + ":") } - }() - sh := new(MySessionHandler) - err := queueSession.ReceiveOne(ctx, sh) - if err != nil { - if amqpErr, ok := err.(*amqp.Error); ok { - if amqpErr.Condition == "com.microsoft:timeout" { - debug("Timeout receiving message", err) - return true + fmt.Println(string(msg.Body)) + + err = receiver.CompleteMessage(ctx, msg, nil) + + if err != nil { + var sbErr *azservicebus.Error + + if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost { + // The message lock has expired. This isn't fatal for the client, but it does mean + // that this message can be received by another Receiver (or potentially this one!). + debug("Message lock expired\n") + continue } + + fatal("Cannot receive message", err) } - fatal("Cannot receive:", err) + return true } - return sh.received + return false } -func receiveMoreSession(ctx context.Context, q *servicebus.Queue, sessionId *string, count int) bool { +func receiveMoreSession(ctx context.Context, client *azservicebus.Client, sessionId *string, queueName *string, count int) bool { + var receiver *azservicebus.SessionReceiver + var err error + debug("Opening queue session", strPtroToString(sessionId)) - queueSession := q.NewSession(sessionId) - defer func() { - debug("Closing queue session", strPtroToString(sessionId)) - if err := queueSession.Close(ctx); err != nil { - debug("Cannot close queue session:", err) + if sessionId != nil && *sessionId != "" { + receiver, err = client.AcceptSessionForQueue(ctx, *queueName, *sessionId, nil) + defer func() { + debug("Closing queue session", strPtroToString(sessionId)) + receiver.Close(ctx) + }() + } else { + receiver, err = client.AcceptNextSessionForQueue(ctx, *queueName, nil) + defer func() { + debug("Closing queue session", strPtroToString(sessionId)) + receiver.Close(ctx) + }() + } + if err != nil { + fatal("Cannot open session:", err) + } + + for i := 0; count < 0 || i < count; { + remains := 100 + if count > 0 { + remains = count - i } - }() - sh := new(MySessionHandler) - for i := 0; count < 0 || i < count; i++ { - err := queueSession.ReceiveOne(ctx, sh) - if err != nil { - if amqpErr, ok := err.(*amqp.Error); ok { - if amqpErr.Condition == "com.microsoft:timeout" { - debug("Timeout receiving message", err) - return true + debug("Calling receive", remains) + messages, err := receiver.ReceiveMessages(ctx, remains, nil) + + for _, msg := range messages { + if msg.SessionID != nil && prefixMsgWithSessionId { + fmt.Print(*msg.SessionID + ":") + } + fmt.Println(string(msg.Body)) + + err = receiver.CompleteMessage(ctx, msg, nil) + + if err != nil { + var sbErr *azservicebus.Error + + if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost { + // The message lock has expired. This isn't fatal for the client, but it does mean + // that this message can be received by another Receiver (or potentially this one!). + debug("Message lock expired\n") + continue } + + fatal("Cannot receive message", err) } - fatal("Cannot receive:", err) + i++ } } - return sh.received + return true } -func peek(ctx context.Context, q *servicebus.Queue, count int) { - subject, err := q.Peek(ctx) +func peek(ctx context.Context, client *azservicebus.Client, queueName *string, count int) { + receiver, err := client.NewReceiverForQueue( + *queueName, + nil, + ) if err != nil { - fatal("Cannot peek:", err) + fatal("Cannot create receiver:", err) } - for i := 0; count < 0 || i < count; i++ { - cursor, err := subject.Next(ctx) + defer receiver.Close(ctx) + + for i := 0; count < 0 || i < count; { + messages, err := receiver.PeekMessages(ctx, 1, nil) if err != nil { - if _, ok := err.(servicebus.ErrNoMessages); ok { - return + fatal("Cannot peek message:", err) + } + + for _, msg := range messages { + if msg.SessionID != nil && prefixMsgWithSessionId { + fmt.Print(*msg.SessionID + ":") } - fatal("Cannot iterate cursor:", err) + fmt.Println(string(msg.Body)) + i++ } - fmt.Println(string(cursor.Data)) } } @@ -217,11 +212,11 @@ Common options: -q Queue name -s Session ID. If the queue is not session-enabled, do not set this option. - If the queue is session-enabled, must be specified for receive. The 'send' works without it. + If the queue is session-enabled, must be specified. If set to empty string for receive, will receive message from any session. Receive options: - -p Prefix every message with session id, separated with ':' + -p Prefix every message with session id, separated with ':'. Useful if receiving all sessions messages. Send option: -i Correlation ID for sent messages `) @@ -276,24 +271,19 @@ func main() { //defer cancel() ctx := context.Background() - ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(*connStrPtr)) - if err != nil { - fatal("Cannot connect to servicebus:", err) - } + client, err := azservicebus.NewClientFromConnectionString(*connStrPtr, nil) - // Create a client to communicate with the queue. - q, err := ns.NewQueue(*queueNamePtr) if err != nil { - fatal("Cannot connect to queue:", err) + fatal("Cannot connect to servicebus:", err) } switch os.Args[1] { case "send": - send(ctx, q, sessionIdPtr) + send(ctx, client, sessionIdPtr, queueNamePtr) case "receive": - receive(ctx, q, sessionIdPtr, *msgCountPtr) + receive(ctx, client, sessionIdPtr, *msgCountPtr, queueNamePtr) case "peek": - peek(ctx, q, *msgCountPtr) + peek(ctx, client, queueNamePtr, *msgCountPtr) } } @@ -315,10 +305,12 @@ func fatal(msg string, args ...interface{}) { log.Fatal(append([]interface{}{"ERROR: " + msg}, args...)...) } -func createMsgFromString(s string) *servicebus.Message { - msg := servicebus.NewMessageFromString(s) +func createMsgFromString(s string) *azservicebus.Message { + msg := &azservicebus.Message{ + Body: []byte(s), + } if correlationId != "" { - msg.CorrelationID = correlationId + msg.CorrelationID = &correlationId } return msg }