diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..2e2a1af --- /dev/null +++ b/publisher.go @@ -0,0 +1,64 @@ +package main + +import ( + "github.com/cyverse-de/messaging" +) + +// JobUpdatePublisher is the interface for types that need to publish a job +// update. +type JobUpdatePublisher interface { + PublishJobUpdate(m *messaging.UpdateMessage) error + Reconnect() error + Close() +} + +type DefaultJobUpdatePublisher struct { + uri string + exchange string + client *messaging.Client +} + +func newMessagingClient(uri, exchange string, reconnect bool) (*messaging.Client, error) { + client, err := messaging.NewClient(uri, reconnect) + if err != nil { + return nil, err + } + + client.SetupPublishing(exchange) + + return client, nil +} + +func NewDefaultJobUpdatePublisher(uri, exchange string) (*DefaultJobUpdatePublisher, error) { + client, err := newMessagingClient(uri, exchange, true) + if err != nil { + return nil, err + } + + publisher := &DefaultJobUpdatePublisher{ + uri: uri, + exchange: exchange, + client: client, + } + return publisher, nil +} + +func (c *DefaultJobUpdatePublisher) PublishJobUpdate(m *messaging.UpdateMessage) error { + return c.client.PublishJobUpdate(m) +} + +func (c *DefaultJobUpdatePublisher) Reconnect() error { + c.client.Close() + + client, err := newMessagingClient(c.uri, c.exchange, false) + if err != nil { + return err + } + + c.client = client + return nil +} + +func (c *DefaultJobUpdatePublisher) Close() { + c.client.Close() +}