Skip to content

Commit

Permalink
Merge pull request #714 from jackchenjc/issue-713
Browse files Browse the repository at this point in the history
subscription: add ModifySubscription functionality
  • Loading branch information
magiconair authored Jan 14, 2025
2 parents de14132 + ac1bb48 commit bf2d68f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 30 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ Here is the current set of supported services. For low-level access use the clie
| | SetMonitoringMode | Yes | Yes | |
| | SetTriggering | | | |
| Subscription Service Set | CreateSubscription | Yes | Yes | |
| | ModifySubscription | | | |
| | ModifySubscription | Yes | | |
| | SetPublishingMode | | | |
| | Publish | Yes | Yes | |
| | Republish | | | |
Expand Down
17 changes: 10 additions & 7 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *Client) Subscribe(ctx context.Context, params *SubscriptionParameters,
}

c.subs[sub.SubscriptionID] = sub
c.updatePublishTimeout_NeedsSubMuxRLock()
c.updatePublishTimeout_NeedsSubMuxLock()
return sub, nil
}

Expand All @@ -89,7 +89,7 @@ func (c *Client) SubscriptionIDs() []uint32 {
}

// recreateSubscriptions creates new subscriptions
// with the same parameters to replace the previous ones
// with the same parameters to replace the previous one
func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {
c.subMux.Lock()
defer c.subMux.Unlock()
Expand All @@ -98,7 +98,10 @@ func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {
if !ok {
return ua.StatusBadSubscriptionIDInvalid
}
return sub.recreate_NeedsSubMuxLock(ctx)

sub.recreate_delete(ctx)
c.forgetSubscription_NeedsSubMuxLock(ctx, id)
return sub.recreate_create(ctx)
}

// transferSubscriptions ask the server to transfer the given subscriptions
Expand Down Expand Up @@ -230,7 +233,7 @@ func (c *Client) forgetSubscription(ctx context.Context, id uint32) {

func (c *Client) forgetSubscription_NeedsSubMuxLock(ctx context.Context, id uint32) {
delete(c.subs, id)
c.updatePublishTimeout_NeedsSubMuxRLock()
c.updatePublishTimeout_NeedsSubMuxLock()
stats.Subscription().Add("Count", -1)

if len(c.subs) == 0 {
Expand All @@ -240,7 +243,7 @@ func (c *Client) forgetSubscription_NeedsSubMuxLock(ctx context.Context, id uint
}
}

func (c *Client) updatePublishTimeout_NeedsSubMuxRLock() {
func (c *Client) updatePublishTimeout_NeedsSubMuxLock() {
maxTimeout := uasc.MaxTimeout
for _, s := range c.subs {
if d := s.publishTimeout(); d < maxTimeout {
Expand Down Expand Up @@ -470,7 +473,7 @@ func (c *Client) publish(ctx context.Context) error {
}

// handle the publish response for a specific subscription
c.handleNotification_NeedsSubMuxLock(ctx, sub, res)
c.handleNotification_NeedsSubMuxLock(sub, res)
c.subMux.Unlock()

c.notifySubscription(ctx, sub, res.NotificationMessage)
Expand Down Expand Up @@ -513,7 +516,7 @@ func (c *Client) handleAcks_NeedsSubMuxLock(res []ua.StatusCode) {
dlog.Printf("notAcked=%v", notAcked)
}

func (c *Client) handleNotification_NeedsSubMuxLock(ctx context.Context, sub *Subscription, res *ua.PublishResponse) {
func (c *Client) handleNotification_NeedsSubMuxLock(sub *Subscription, res *ua.PublishResponse) {
dlog := debug.NewPrefixLogger("publish: sub %d: ", res.SubscriptionID)

// keep-alive message
Expand Down
8 changes: 8 additions & 0 deletions examples/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func main() {
log.Fatal(err)
}

// Uncomment the following to try modifying the subscription
//
// var params opcua.SubscriptionParameters
// params.Interval = time.Millisecond * 2000
// if _, err := sub.ModifySubscription(ctx, params); err != nil {
// log.Fatal(err)
// }

// read from subscription's notification channel until ctx is cancelled
for {
select {
Expand Down
79 changes: 57 additions & 22 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ const (
DefaultSubscriptionPriority = 0
)

const terminatedSubscriptionID uint32 = 0xC0CAC01B

type Subscription struct {
SubscriptionID uint32
RevisedPublishingInterval time.Duration
RevisedLifetimeCount uint32
RevisedMaxKeepAliveCount uint32
Notifs chan<- *PublishNotificationData
params *SubscriptionParameters
paramsMu sync.Mutex
items map[uint32]*monitoredItem
itemsMu sync.Mutex
lastSeq uint32
Expand Down Expand Up @@ -111,6 +110,40 @@ func (s *Subscription) delete(ctx context.Context) error {
}
}

func (s *Subscription) ModifySubscription(ctx context.Context, params SubscriptionParameters) (*ua.ModifySubscriptionResponse, error) {
stats.Subscription().Add("ModifySubscription", 1)

params.setDefaults()
req := &ua.ModifySubscriptionRequest{
SubscriptionID: s.SubscriptionID,
RequestedPublishingInterval: float64(params.Interval.Milliseconds()),
RequestedLifetimeCount: params.LifetimeCount,
RequestedMaxKeepAliveCount: params.MaxKeepAliveCount,
MaxNotificationsPerPublish: params.MaxNotificationsPerPublish,
Priority: params.Priority,
}

var res *ua.ModifySubscriptionResponse
err := s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})

if err != nil {
return nil, err
}

// update subscription parameters
s.paramsMu.Lock()
s.params = &params
s.paramsMu.Unlock()
// update revised subscription parameters
s.RevisedPublishingInterval = time.Duration(res.RevisedPublishingInterval) * time.Millisecond
s.RevisedLifetimeCount = res.RevisedLifetimeCount
s.RevisedMaxKeepAliveCount = res.RevisedMaxKeepAliveCount

return res, nil
}

func (s *Subscription) Monitor(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
stats.Subscription().Add("Monitor", 1)
stats.Subscription().Add("MonitoredItems", int64(len(items)))
Expand Down Expand Up @@ -358,29 +391,31 @@ func (p *SubscriptionParameters) setDefaults() {
}
}

// recreate_NeedsSubMuxLock creates a new subscription based on the previous subscription
// parameters and monitored items.
func (s *Subscription) recreate_NeedsSubMuxLock(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate: ", s.SubscriptionID)

if s.SubscriptionID == terminatedSubscriptionID {
dlog.Printf("subscription is not in a valid state")
return nil
// recreate_delete is called by the client when it is trying to
// recreate an existing subscription. This function deletes the
// existing subscription from the server.
func (s *Subscription) recreate_delete(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate_delete: ", s.SubscriptionID)
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
return nil
}

// recreate_create is called by the client when it is trying to
// recreate an existing subscription. This function creates a
// new subscription with the same parameters as the previous one.
func (s *Subscription) recreate_create(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate_create: ", s.SubscriptionID)

s.paramsMu.Lock()
params := s.params
{
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
}
s.c.forgetSubscription_NeedsSubMuxLock(ctx, s.SubscriptionID)
dlog.Printf("subscription forgotton")
s.paramsMu.Unlock()

req := &ua.CreateSubscriptionRequest{
RequestedPublishingInterval: float64(params.Interval / time.Millisecond),
Expand Down

0 comments on commit bf2d68f

Please sign in to comment.