-
Notifications
You must be signed in to change notification settings - Fork 9
/
subscribe_all_events_test.go
114 lines (100 loc) · 3.27 KB
/
subscribe_all_events_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package rangedbserver_test
import (
"context"
"encoding/json"
"fmt"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
"github.com/inklabs/rangedb"
"github.com/inklabs/rangedb/pkg/clock/provider/sequentialclock"
"github.com/inklabs/rangedb/pkg/grpc/rangedbpb"
"github.com/inklabs/rangedb/pkg/grpc/rangedbserver"
"github.com/inklabs/rangedb/pkg/jsontools"
"github.com/inklabs/rangedb/provider/inmemorystore"
"github.com/inklabs/rangedb/rangedbtest"
)
func ExampleRangeDBServer_SubscribeToEvents() {
// Given
rangedbtest.SetRand(100)
inMemoryStore := inmemorystore.New(
inmemorystore.WithClock(sequentialclock.New()),
inmemorystore.WithUUIDGenerator(rangedbtest.NewSeededUUIDGenerator()),
)
rangedbtest.BindEvents(inMemoryStore)
// Setup gRPC server
bufListener := bufconn.Listen(7)
server := grpc.NewServer()
defer server.Stop()
rangeDBServer, err := rangedbserver.New(rangedbserver.WithStore(inMemoryStore))
PrintError(err)
defer rangeDBServer.Stop()
rangedbpb.RegisterRangeDBServer(server, rangeDBServer)
go func() {
PrintError(server.Serve(bufListener))
}()
// Setup gRPC connection
dialer := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return bufListener.Dial()
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, "bufnet", dialer, grpc.WithInsecure(), grpc.WithBlock())
PrintError(err)
defer Close(conn)
// Setup gRPC client
rangeDBClient := rangedbpb.NewRangeDBClient(conn)
request := &rangedbpb.SubscribeToEventsRequest{
GlobalSequenceNumber: 1,
}
// When
events, err := rangeDBClient.SubscribeToEvents(ctx, request)
PrintError(err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 2; i++ {
record, err := events.Recv()
PrintError(err)
body, err := json.Marshal(record)
PrintError(err)
fmt.Println(jsontools.PrettyJSON(body))
}
wg.Done()
}()
streamNameA := "thing-52e247a7c0a54a65906e006dac9be108"
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx, streamNameA,
&rangedb.EventRecord{Event: rangedbtest.ThingWasDone{ID: "52e247a7c0a54a65906e006dac9be108", Number: 100}},
)))
streamNameB := "another-a3d9faa7614a46b388c6dce9984b6620"
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx, streamNameB,
&rangedb.EventRecord{Event: rangedbtest.AnotherWasComplete{ID: "a3d9faa7614a46b388c6dce9984b6620"}},
)))
wg.Wait()
// Output:
// {
// "StreamName": "thing-52e247a7c0a54a65906e006dac9be108",
// "AggregateType": "thing",
// "AggregateID": "52e247a7c0a54a65906e006dac9be108",
// "GlobalSequenceNumber": 1,
// "StreamSequenceNumber": 1,
// "EventID": "d2ba8e70072943388203c438d4e94bf3",
// "EventType": "ThingWasDone",
// "Data": "{\"id\":\"52e247a7c0a54a65906e006dac9be108\",\"number\":100}",
// "Metadata": "null"
// }
// {
// "StreamName": "another-a3d9faa7614a46b388c6dce9984b6620",
// "AggregateType": "another",
// "AggregateID": "a3d9faa7614a46b388c6dce9984b6620",
// "GlobalSequenceNumber": 2,
// "StreamSequenceNumber": 1,
// "InsertTimestamp": 1,
// "EventID": "99cbd88bbcaf482ba1cc96ed12541707",
// "EventType": "AnotherWasComplete",
// "Data": "{\"id\":\"a3d9faa7614a46b388c6dce9984b6620\"}",
// "Metadata": "null"
// }
}