From a4b90d40d085f1ad753b69590b6e67b14f60c5cf Mon Sep 17 00:00:00 2001 From: Paul Hewlett Date: Tue, 20 Aug 2024 09:06:07 +0100 Subject: [PATCH] Remove EnsureSubscription Rule AB#8172 --- azbus/azadmin.go | 158 +++---------------------------------------- azbus/msgreceiver.go | 1 - azbus/msgsender.go | 2 - azbus/receiver.go | 4 -- azbus/sender.go | 8 +-- 5 files changed, 12 insertions(+), 161 deletions(-) diff --git a/azbus/azadmin.go b/azbus/azadmin.go index 6d1f696..ded40b7 100644 --- a/azbus/azadmin.go +++ b/azbus/azadmin.go @@ -4,38 +4,33 @@ import ( "context" "errors" "fmt" - "net/http" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" azadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" ) var ( // lots of docs by MS on how these limits are set to various values here - // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quotas but + // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quotas defaultMaxMessageSize = int64(256 * 1024) ErrMessageOversized = errors.New("message is too large") ) -// AZAdminClient provides access to the administrative client for the message -// bus. Services that self manage subscriptions are the exceptional case and -// co-ordination with devops is required before using this mechanism. -type AZAdminClient struct { +type azAdminClient struct { ConnectionString string log Logger admin *azadmin.Client } -func NewAZAdminClient(log Logger, connectionString string) AZAdminClient { - return AZAdminClient{ +func newazAdminClient(log Logger, connectionString string) azAdminClient { + return azAdminClient{ ConnectionString: connectionString, log: log, } } -// Open - connects and returns the azure admin Client interface that allows creation of topics etc. +// open - connects and returns the azure admin Client interface that allows creation of topics etc. // Note that creation is cached -func (c *AZAdminClient) Open() (*azadmin.Client, error) { +func (c *azAdminClient) open() (*azadmin.Client, error) { if c.admin != nil { return c.admin, nil @@ -57,8 +52,8 @@ func (c *AZAdminClient) Open() (*azadmin.Client, error) { return c.admin, nil } -func (c *AZAdminClient) GetQueueMaxMessageSize(queueName string) (int64, error) { - admin, err := c.Open() +func (c *azAdminClient) getQueueMaxMessageSize(queueName string) (int64, error) { + admin, err := c.open() if err != nil { return 0, err } @@ -75,8 +70,8 @@ func (c *AZAdminClient) GetQueueMaxMessageSize(queueName string) (int64, error) return defaultMaxMessageSize, nil } -func (c *AZAdminClient) GetTopicMaxMessageSize(topicName string) (int64, error) { - admin, err := c.Open() +func (c *azAdminClient) getTopicMaxMessageSize(topicName string) (int64, error) { + admin, err := c.open() if err != nil { return 0, err } @@ -92,136 +87,3 @@ func (c *AZAdminClient) GetTopicMaxMessageSize(topicName string) (int64, error) // For non-Premium accounts the default is 256KiB and is not returned by GetQueue return defaultMaxMessageSize, nil } - -// EnsuresubscriptionRule ensures the named rule is set on the subscription and -// creates it from the supplied filter if not. Note: When the ruleName exists, -// we do not attempt check the supplied filter matches the existing filter. -func (c *AZAdminClient) EnsureSubscriptionRule( - ctx context.Context, - topicName, subscriptionName string, - ruleName string, - ruleString string, -) error { - - admin, err := c.Open() - if err != nil { - return err - } - // The default rule matches everything. If its not removed all the other - // filters are effectively ignored. Removal is idempotent. So we always - // remove it. - if _, err = admin.DeleteRule(ctx, topicName, subscriptionName, "$Default", nil); err != nil { - var respError *azcore.ResponseError - if !errors.As(err, &respError) { - c.log.Infof( - "DeleteRule failed for topicname=%s subname=%s, rulename=%s: %v", - topicName, - subscriptionName, - "$Default", - err.Error(), - ) - return err - } - if respError.StatusCode != http.StatusNotFound { - c.log.Infof( - "DeleteRule failed for topicname=%s subname=%s, rulename=%s: %v", - topicName, - subscriptionName, - "$Default", - err.Error(), - ) - return err - } - } - c.log.Debugf( - "DeleteRule no longer exists for topicname=%s subname=%s, rulename=%s", - topicName, - subscriptionName, - "$Default", - ) - - // Attempt to get the rule - strangely an error is not generated for a 404 - // Found this by reading the unittests... - response, err := admin.GetRule(ctx, topicName, subscriptionName, ruleName, nil) - if err != nil { - c.log.Infof( - "GetRule failed for topicname=%s subname=%s, rulename=%s: %v", - topicName, - subscriptionName, - ruleName, - err.Error(), - ) - return err - } - - // Rule does not exist so create it - if response == nil { - c.log.Debugf( - "Rule does not exist for topicname=%s subname=%s, rulename=%s", - topicName, - subscriptionName, - ruleName, - ) - _, err = admin.CreateRule( - ctx, - topicName, - subscriptionName, - &azadmin.CreateRuleOptions{ - Name: &ruleName, - Filter: &azadmin.SQLFilter{ - Expression: ruleString, - }, - }, - ) - if err != nil { - c.log.Infof( - "CreateRule failed for topicname=%s subname=%s, rulename=%s: %v", - topicName, - subscriptionName, - ruleName, - err.Error(), - ) - return err - } - return nil - } - - c.log.Debugf( - "UpdateRule for topicname=%s subname=%s, rulename=%s, ruleString=%q", - topicName, - subscriptionName, - ruleName, - ruleString, - ) - _, err = admin.UpdateRule( - ctx, - topicName, - subscriptionName, - azadmin.RuleProperties{ - Name: ruleName, - Filter: &azadmin.SQLFilter{ - Expression: ruleString, - }, - }, - ) - if err != nil { - c.log.Infof( - "UpdateRule failed for topicname=%s subname=%s, rulename=%s, ruleString=%q: %v", - topicName, - subscriptionName, - ruleName, - ruleString, - err.Error(), - ) - return err - } - - c.log.Debugf( - "Rule exists for topicname=%s subname=%s, rulename=%s, ruleString=%q", - topicName, - subscriptionName, - ruleName, - ruleString, - ) - return nil -} diff --git a/azbus/msgreceiver.go b/azbus/msgreceiver.go index 1f464e3..82defdd 100644 --- a/azbus/msgreceiver.go +++ b/azbus/msgreceiver.go @@ -9,6 +9,5 @@ type MsgReceiver interface { Listen() error Shutdown(context.Context) error - GetAZClient() AZClient String() string } diff --git a/azbus/msgsender.go b/azbus/msgsender.go index 7b54ac1..953b7f7 100644 --- a/azbus/msgsender.go +++ b/azbus/msgsender.go @@ -10,6 +10,4 @@ type MsgSender interface { Send(context.Context, *OutMessage) error String() string - - GetAZClient() AZClient } diff --git a/azbus/receiver.go b/azbus/receiver.go index 4679784..41fb6c3 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -144,10 +144,6 @@ func newReceiver(r *Receiver, log Logger, cfg ReceiverConfig, opts ...ReceiverOp return r } -func (r *Receiver) GetAZClient() AZClient { - return r.azClient -} - // String - returns string representation of receiver. func (r *Receiver) String() string { // No log function calls in this method please. diff --git a/azbus/sender.go b/azbus/sender.go index cbaea5d..765bc61 100644 --- a/azbus/sender.go +++ b/azbus/sender.go @@ -48,10 +48,6 @@ func (s *Sender) String() string { return s.Cfg.TopicOrQueueName } -func (s *Sender) GetAZClient() AZClient { - return s.azClient -} - func (s *Sender) Close(ctx context.Context) { var err error @@ -80,8 +76,8 @@ func (s *Sender) Open() error { return err } - azadmin := NewAZAdminClient(s.log, s.Cfg.ConnectionString) - s.maxMessageSizeInBytes, err = azadmin.GetQueueMaxMessageSize(s.Cfg.TopicOrQueueName) + azadmin := newazAdminClient(s.log, s.Cfg.ConnectionString) + s.maxMessageSizeInBytes, err = azadmin.getQueueMaxMessageSize(s.Cfg.TopicOrQueueName) if err != nil { azerr := fmt.Errorf("%s: failed to get sender properties: %w", s, NewAzbusError(err)) s.log.Infof("%s", azerr)