forked from tarantool/go-tarantool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
208 lines (178 loc) · 5.54 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package tarantool
import (
"context"
"errors"
"time"
"github.com/tarantool/go-iproto"
"github.com/vmihailenco/msgpack/v5"
)
type TxnIsolationLevel uint
const (
// By default, the isolation level of Tarantool is serializable.
DefaultIsolationLevel TxnIsolationLevel = 0
// The ReadCommittedLevel isolation level makes visible all transactions
// that started commit (stream.Do(NewCommitRequest()) was called).
ReadCommittedLevel TxnIsolationLevel = 1
// The ReadConfirmedLevel isolation level makes visible all transactions
// that finished the commit (stream.Do(NewCommitRequest()) was returned).
ReadConfirmedLevel TxnIsolationLevel = 2
// If the BestEffortLevel (serializable) isolation level becomes unreachable,
// the transaction is marked as «conflicted» and can no longer be committed.
BestEffortLevel TxnIsolationLevel = 3
)
var (
errUnknownStreamRequest = errors.New("the passed connected request doesn't belong " +
"to the current connection or connection pool")
)
type Stream struct {
Id uint64
Conn *Connection
}
func fillBegin(enc *msgpack.Encoder, txnIsolation TxnIsolationLevel, timeout time.Duration) error {
hasTimeout := timeout > 0
hasIsolationLevel := txnIsolation != DefaultIsolationLevel
mapLen := 0
if hasTimeout {
mapLen += 1
}
if hasIsolationLevel {
mapLen += 1
}
err := enc.EncodeMapLen(mapLen)
if err != nil {
return err
}
if hasTimeout {
err = enc.EncodeUint(uint64(iproto.IPROTO_TIMEOUT))
if err != nil {
return err
}
err = enc.Encode(timeout.Seconds())
if err != nil {
return err
}
}
if hasIsolationLevel {
err = enc.EncodeUint(uint64(iproto.IPROTO_TXN_ISOLATION))
if err != nil {
return err
}
err = enc.EncodeUint(uint64(txnIsolation))
if err != nil {
return err
}
}
return err
}
func fillCommit(enc *msgpack.Encoder) error {
return enc.EncodeMapLen(0)
}
func fillRollback(enc *msgpack.Encoder) error {
return enc.EncodeMapLen(0)
}
// BeginRequest helps you to create a begin request object for execution
// by a Stream.
// Begin request can not be processed out of stream.
type BeginRequest struct {
baseRequest
txnIsolation TxnIsolationLevel
timeout time.Duration
}
// NewBeginRequest returns a new BeginRequest.
func NewBeginRequest() *BeginRequest {
req := new(BeginRequest)
req.rtype = iproto.IPROTO_BEGIN
req.txnIsolation = DefaultIsolationLevel
return req
}
// TxnIsolation sets the the transaction isolation level for transaction manager.
// By default, the isolation level of Tarantool is serializable.
func (req *BeginRequest) TxnIsolation(txnIsolation TxnIsolationLevel) *BeginRequest {
req.txnIsolation = txnIsolation
return req
}
// WithTimeout allows to set up a timeout for call BeginRequest.
func (req *BeginRequest) Timeout(timeout time.Duration) *BeginRequest {
req.timeout = timeout
return req
}
// Body fills an msgpack.Encoder with the begin request body.
func (req *BeginRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
return fillBegin(enc, req.txnIsolation, req.timeout)
}
// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (req *BeginRequest) Context(ctx context.Context) *BeginRequest {
req.ctx = ctx
return req
}
// CommitRequest helps you to create a commit request object for execution
// by a Stream.
// Commit request can not be processed out of stream.
type CommitRequest struct {
baseRequest
}
// NewCommitRequest returns a new CommitRequest.
func NewCommitRequest() *CommitRequest {
req := new(CommitRequest)
req.rtype = iproto.IPROTO_COMMIT
return req
}
// Body fills an msgpack.Encoder with the commit request body.
func (req *CommitRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
return fillCommit(enc)
}
// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (req *CommitRequest) Context(ctx context.Context) *CommitRequest {
req.ctx = ctx
return req
}
// RollbackRequest helps you to create a rollback request object for execution
// by a Stream.
// Rollback request can not be processed out of stream.
type RollbackRequest struct {
baseRequest
}
// NewRollbackRequest returns a new RollbackRequest.
func NewRollbackRequest() *RollbackRequest {
req := new(RollbackRequest)
req.rtype = iproto.IPROTO_ROLLBACK
return req
}
// Body fills an msgpack.Encoder with the rollback request body.
func (req *RollbackRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
return fillRollback(enc)
}
// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (req *RollbackRequest) Context(ctx context.Context) *RollbackRequest {
req.ctx = ctx
return req
}
// Do verifies, sends the request and returns a future.
//
// An error is returned if the request was formed incorrectly, or failure to
// create the future.
func (s *Stream) Do(req Request) *Future {
if connectedReq, ok := req.(ConnectedRequest); ok {
if connectedReq.Conn() != s.Conn {
fut := NewFuture(req)
fut.SetError(errUnknownStreamRequest)
return fut
}
}
return s.Conn.send(req, s.Id)
}