-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathsubstrate.go
124 lines (111 loc) · 4.78 KB
/
substrate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package substrate
import (
"context"
"fmt"
)
// Message is the single type that represents all messages in substrate.
type Message interface {
Data() []byte
}
// Some brokers have the notion of keyed messages. Callers may optionally
// implement this interface in their message types for the benefiy of those
// brokers.
type KeyedMessage interface {
Message
Key() []byte
}
// DiscardableMessage allows a consumer to discard the payload after use (but
// before acking) in order to release memory earlier. This can be useful in
// cases where a consumer reads a very large number of messages before acking
// any of them.
// Since not all backends implement this, a checked type assertion is recommended.
type DiscardableMessage interface {
Message
// DiscardPayload discards the payload of the message. After calling this,
// Calls to Data() will panic.
// Calling this a second or subsequent time has no effect.
DiscardPayload()
}
// AsyncMessageSink represents a message sink that allows publishing messages,
// and multiple messages can be in flight before any acks are recieved,
// depending upon the configuration of the underlying message sink.
type AsyncMessageSink interface {
// PublishMessages publishes any messages found on the `messages`
// channel and returns them on the `acks` channel once they have
// been published. This function will block until `ctx` is done,
// or until an error occurs. Messages will always be processed
// and acknowledged in order.
// Normal termination is achieved when the passed Context is done,
// and will return the associated Context error.
PublishMessages(ctx context.Context, acks chan<- Message, messages <-chan Message) error
// Close permanently closes the AsyncMessageSink and frees underlying resources
Close() error
Statuser
}
// AsyncMessageSource represents a message source that allows consuming
// messages, and multiple messages can be in flight before any acks are sent,
// depending upon the configuration of the underlying message source.
type AsyncMessageSource interface {
// ConsumeMessages provides messages to the caller on the `messages`
// channel and expects them to be sent back to the `acks` channel once
// that have been handled properly. This function will block until
// `ctx` is done, or until an error occurs.
// Normal termination is achieved when the passed Context is done,
// and will return the associated Context error.
ConsumeMessages(ctx context.Context, messages chan<- Message, acks <-chan Message) error
// Close permanently closes the AsyncMessageSource and frees underlying resources
Close() error
Statuser
}
// ConsumerMessageHandler is the callback function type that synchronous
// message consumers must implement.
type ConsumerMessageHandler func(context.Context, Message) error
// SynchronousMessageSource represents a message source that allows "message
// at a time" consumption and relieves the consumer from having to deal with
// acknowledgements themselves.
type SynchronousMessageSource interface {
// Close closed the SynchronousMessageSource, freeing underlying
// resources.
Close() error
// ConsumeMessages calls the `handler` function for each messages
// available to consume. If the handler returns no error, an
// acknowledgement will be sent to the broker. If an error is returned
// by the handler, it will be propogated and returned from this
// function. This function will block until `ctx` is done or until an
// error occurs.
ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error
Statuser
}
// SynchronousMessageSink represents a message source that allows "message at
// a time" publishing and relieves the consumer from having to deal with
// acknowledgements themselves.
type SynchronousMessageSink interface {
// Close closed the SynchronousMessageSink, freeing underlying
// resources.
Close() error
// PublishMessage publishes messages to the broker, waiting for
// confirmation from the broker before returning.
PublishMessage(context.Context, Message) error
Statuser
}
// InvalidAckError means that a message acknowledgement was not as expected.
// This is possilbly from mis-use of the asynchronous APIs, for example acking
// out of order.
type InvalidAckError struct {
Acked Message
Expected Message
}
func (e InvalidAckError) Error() string {
return fmt.Sprintf("Message ack was out of order. Expected message '%s' but got '%s'", e.Expected, e.Acked)
}
// Statuser is the interface that wraps the Status method.
type Statuser interface {
Status() (*Status, error)
}
// Status represents a snapshot of the state of a source or sink.
type Status struct {
// Working indicates whether the source or sink is in a working state
Working bool
// Problems indicates and problems with the source or sink, whether or not they prevent it working.
Problems []string
}