Skip to content

Commit

Permalink
Remove EnsureSubscription Rule
Browse files Browse the repository at this point in the history
AB#8172
  • Loading branch information
eccles committed Aug 21, 2024
1 parent 8724f18 commit a4b90d4
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 161 deletions.
158 changes: 10 additions & 148 deletions azbus/azadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,33 @@ import (
"context"
"errors"
"fmt"
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
azadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
)

var (
// lots of docs by MS on how these limits are set to various values here
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quotas but
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quotas
defaultMaxMessageSize = int64(256 * 1024)
ErrMessageOversized = errors.New("message is too large")
)

// AZAdminClient provides access to the administrative client for the message
// bus. Services that self manage subscriptions are the exceptional case and
// co-ordination with devops is required before using this mechanism.
type AZAdminClient struct {
type azAdminClient struct {
ConnectionString string
log Logger
admin *azadmin.Client
}

func NewAZAdminClient(log Logger, connectionString string) AZAdminClient {
return AZAdminClient{
func newazAdminClient(log Logger, connectionString string) azAdminClient {
return azAdminClient{
ConnectionString: connectionString,
log: log,
}
}

// Open - connects and returns the azure admin Client interface that allows creation of topics etc.
// open - connects and returns the azure admin Client interface that allows creation of topics etc.
// Note that creation is cached
func (c *AZAdminClient) Open() (*azadmin.Client, error) {
func (c *azAdminClient) open() (*azadmin.Client, error) {

if c.admin != nil {
return c.admin, nil
Expand All @@ -57,8 +52,8 @@ func (c *AZAdminClient) Open() (*azadmin.Client, error) {
return c.admin, nil
}

func (c *AZAdminClient) GetQueueMaxMessageSize(queueName string) (int64, error) {
admin, err := c.Open()
func (c *azAdminClient) getQueueMaxMessageSize(queueName string) (int64, error) {
admin, err := c.open()
if err != nil {
return 0, err
}
Expand All @@ -75,8 +70,8 @@ func (c *AZAdminClient) GetQueueMaxMessageSize(queueName string) (int64, error)
return defaultMaxMessageSize, nil
}

func (c *AZAdminClient) GetTopicMaxMessageSize(topicName string) (int64, error) {
admin, err := c.Open()
func (c *azAdminClient) getTopicMaxMessageSize(topicName string) (int64, error) {
admin, err := c.open()
if err != nil {
return 0, err
}
Expand All @@ -92,136 +87,3 @@ func (c *AZAdminClient) GetTopicMaxMessageSize(topicName string) (int64, error)
// For non-Premium accounts the default is 256KiB and is not returned by GetQueue
return defaultMaxMessageSize, nil
}

// EnsuresubscriptionRule ensures the named rule is set on the subscription and
// creates it from the supplied filter if not. Note: When the ruleName exists,
// we do not attempt check the supplied filter matches the existing filter.
func (c *AZAdminClient) EnsureSubscriptionRule(
ctx context.Context,
topicName, subscriptionName string,
ruleName string,
ruleString string,
) error {

admin, err := c.Open()
if err != nil {
return err
}
// The default rule matches everything. If its not removed all the other
// filters are effectively ignored. Removal is idempotent. So we always
// remove it.
if _, err = admin.DeleteRule(ctx, topicName, subscriptionName, "$Default", nil); err != nil {
var respError *azcore.ResponseError
if !errors.As(err, &respError) {
c.log.Infof(
"DeleteRule failed for topicname=%s subname=%s, rulename=%s: %v",
topicName,
subscriptionName,
"$Default",
err.Error(),
)
return err
}
if respError.StatusCode != http.StatusNotFound {
c.log.Infof(
"DeleteRule failed for topicname=%s subname=%s, rulename=%s: %v",
topicName,
subscriptionName,
"$Default",
err.Error(),
)
return err
}
}
c.log.Debugf(
"DeleteRule no longer exists for topicname=%s subname=%s, rulename=%s",
topicName,
subscriptionName,
"$Default",
)

// Attempt to get the rule - strangely an error is not generated for a 404
// Found this by reading the unittests...
response, err := admin.GetRule(ctx, topicName, subscriptionName, ruleName, nil)
if err != nil {
c.log.Infof(
"GetRule failed for topicname=%s subname=%s, rulename=%s: %v",
topicName,
subscriptionName,
ruleName,
err.Error(),
)
return err
}

// Rule does not exist so create it
if response == nil {
c.log.Debugf(
"Rule does not exist for topicname=%s subname=%s, rulename=%s",
topicName,
subscriptionName,
ruleName,
)
_, err = admin.CreateRule(
ctx,
topicName,
subscriptionName,
&azadmin.CreateRuleOptions{
Name: &ruleName,
Filter: &azadmin.SQLFilter{
Expression: ruleString,
},
},
)
if err != nil {
c.log.Infof(
"CreateRule failed for topicname=%s subname=%s, rulename=%s: %v",
topicName,
subscriptionName,
ruleName,
err.Error(),
)
return err
}
return nil
}

c.log.Debugf(
"UpdateRule for topicname=%s subname=%s, rulename=%s, ruleString=%q",
topicName,
subscriptionName,
ruleName,
ruleString,
)
_, err = admin.UpdateRule(
ctx,
topicName,
subscriptionName,
azadmin.RuleProperties{
Name: ruleName,
Filter: &azadmin.SQLFilter{
Expression: ruleString,
},
},
)
if err != nil {
c.log.Infof(
"UpdateRule failed for topicname=%s subname=%s, rulename=%s, ruleString=%q: %v",
topicName,
subscriptionName,
ruleName,
ruleString,
err.Error(),
)
return err
}

c.log.Debugf(
"Rule exists for topicname=%s subname=%s, rulename=%s, ruleString=%q",
topicName,
subscriptionName,
ruleName,
ruleString,
)
return nil
}
1 change: 0 additions & 1 deletion azbus/msgreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ type MsgReceiver interface {
Listen() error
Shutdown(context.Context) error

GetAZClient() AZClient
String() string
}
2 changes: 0 additions & 2 deletions azbus/msgsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ type MsgSender interface {

Send(context.Context, *OutMessage) error
String() string

GetAZClient() AZClient
}
4 changes: 0 additions & 4 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,6 @@ func newReceiver(r *Receiver, log Logger, cfg ReceiverConfig, opts ...ReceiverOp
return r
}

func (r *Receiver) GetAZClient() AZClient {
return r.azClient
}

// String - returns string representation of receiver.
func (r *Receiver) String() string {
// No log function calls in this method please.
Expand Down
8 changes: 2 additions & 6 deletions azbus/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func (s *Sender) String() string {
return s.Cfg.TopicOrQueueName
}

func (s *Sender) GetAZClient() AZClient {
return s.azClient
}

func (s *Sender) Close(ctx context.Context) {

var err error
Expand Down Expand Up @@ -80,8 +76,8 @@ func (s *Sender) Open() error {
return err
}

azadmin := NewAZAdminClient(s.log, s.Cfg.ConnectionString)
s.maxMessageSizeInBytes, err = azadmin.GetQueueMaxMessageSize(s.Cfg.TopicOrQueueName)
azadmin := newazAdminClient(s.log, s.Cfg.ConnectionString)
s.maxMessageSizeInBytes, err = azadmin.getQueueMaxMessageSize(s.Cfg.TopicOrQueueName)
if err != nil {
azerr := fmt.Errorf("%s: failed to get sender properties: %w", s, NewAzbusError(err))
s.log.Infof("%s", azerr)
Expand Down

0 comments on commit a4b90d4

Please sign in to comment.