-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathserver_sink.go
140 lines (122 loc) · 3.04 KB
/
server_sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package proximo
import (
"context"
"io"
"strings"
"github.com/pkg/errors"
"github.com/uw-labs/proximo/proto"
"github.com/uw-labs/substrate"
"github.com/uw-labs/sync/gogroup"
)
type SinkServer struct {
SinkFactory AsyncSinkFactory
}
func (s *SinkServer) Publish(stream proto.MessageSink_PublishServer) error {
sCtx := stream.Context()
g, ctx := gogroup.New(sCtx)
messages := make(chan substrate.Message)
acks := make(chan substrate.Message)
startRequest := make(chan *proto.StartPublishRequest)
g.Go(func() error {
return s.receiveMessages(ctx, stream, startRequest, messages)
})
g.Go(func() error {
return s.sendConfirmations(ctx, stream, acks)
})
g.Go(func() error {
var req *proto.StartPublishRequest
select {
case req = <-startRequest:
case <-ctx.Done():
return nil
}
sink, err := s.SinkFactory.NewAsyncSink(ctx, req)
if err != nil {
return err
}
defer sink.Close()
return sink.PublishMessages(ctx, acks, messages)
})
if err := g.Wait(); err != nil {
return err
}
return sCtx.Err()
}
// receiveSinkStream is a subset of proto.MessageSink_PublishServer that only exposes the receive method
type receiveSinkStream interface {
Recv() (*proto.PublisherRequest, error)
}
// receiveMessages receives messages from the client
func (s *SinkServer) receiveMessages(ctx context.Context, stream receiveSinkStream, startRequest chan<- *proto.StartPublishRequest, messages chan<- substrate.Message) error {
started := false
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
if strings.HasSuffix(err.Error(), "context canceled") {
return nil
}
return err
}
switch {
case msg.GetStartRequest() != nil:
if started {
return errStartedTwice
}
select {
case startRequest <- msg.GetStartRequest():
case <-ctx.Done():
return nil
}
started = true
case msg.GetMsg() != nil:
if !started {
return errNotConnected
}
select {
case messages <- &proximoMsg{msg: msg.GetMsg()}:
case <-ctx.Done():
return nil
}
default:
return errInvalidRequest
}
}
}
// sendSinkStream is a subset of proto.MessageSink_PublishServer that only exposes the send method
type sendSinkStream interface {
Send(*proto.Confirmation) error
}
// sendConfirmations sends confirmations back to the client
func (s *SinkServer) sendConfirmations(ctx context.Context, stream sendSinkStream, forClient <-chan substrate.Message) error {
for {
select {
case msg := <-forClient:
pMsg, ok := msg.(*proximoMsg)
if !ok {
return errors.Errorf("unexpected message: %v", pMsg)
}
if err := stream.Send(&proto.Confirmation{MsgID: pMsg.msg.Id}); err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
}
type proximoMsg struct {
msg *proto.Message
}
func (m *proximoMsg) Data() []byte {
return m.msg.GetData()
}
func (m *proximoMsg) Key() []byte {
// If no key is provided, use the message as the key,
// this is the default behaviour of substrate anyway
if len(m.msg.GetKey()) == 0 {
return m.msg.GetData()
}
return m.msg.GetKey()
}