-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtypes.go
245 lines (195 loc) · 6.69 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
package datatransfer
import (
"fmt"
"time"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/go-data-transfer/encoding"
)
//go:generate cbor-gen-for ChannelID ChannelStages ChannelStage Log
// TypeIdentifier is a unique string identifier for a type of encodable object in a
// registry
type TypeIdentifier string
// EmptyTypeIdentifier means there is no voucher present
const EmptyTypeIdentifier = TypeIdentifier("")
// Registerable is a type of object in a registry. It must be encodable and must
// have a single method that uniquely identifies its type
type Registerable interface {
encoding.Encodable
// Type is a unique string identifier for this voucher type
Type() TypeIdentifier
}
// Voucher is used to validate
// a data transfer request against the underlying storage or retrieval deal
// that precipitated it. The only requirement is a voucher can read and write
// from bytes, and has a string identifier type
type Voucher Registerable
// VoucherResult is used to provide option additional information about a
// voucher being rejected or accepted
type VoucherResult Registerable
// TransferID is an identifier for a data transfer, shared between
// request/responder and unique to the requester
type TransferID uint64
// ChannelID is a unique identifier for a channel, distinct by both the other
// party's peer ID + the transfer ID
type ChannelID struct {
Initiator peer.ID
Responder peer.ID
ID TransferID
}
func (c ChannelID) String() string {
return fmt.Sprintf("%s-%s-%d", c.Initiator, c.Responder, c.ID)
}
// OtherParty returns the peer on the other side of the request, depending
// on whether this peer is the initiator or responder
func (c ChannelID) OtherParty(thisPeer peer.ID) peer.ID {
if thisPeer == c.Initiator {
return c.Responder
}
return c.Initiator
}
// Channel represents all the parameters for a single data transfer
type Channel interface {
// TransferID returns the transfer id for this channel
TransferID() TransferID
// BaseCID returns the CID that is at the root of this data transfer
BaseCID() cid.Cid
// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
Selector() ipld.Node
// Voucher returns the voucher for this data transfer
Voucher() Voucher
// Sender returns the peer id for the node that is sending data
Sender() peer.ID
// Recipient returns the peer id for the node that is receiving data
Recipient() peer.ID
// TotalSize returns the total size for the data being transferred
TotalSize() uint64
// IsPull returns whether this is a pull request
IsPull() bool
// ChannelID returns the ChannelID for this request
ChannelID() ChannelID
// OtherPeer returns the counter party peer for this channel
OtherPeer() peer.ID
}
// ChannelState is channel parameters plus it's current state
type ChannelState interface {
Channel
// SelfPeer returns the peer this channel belongs to
SelfPeer() peer.ID
// Status is the current status of this channel
Status() Status
// Sent returns the number of bytes sent
Sent() uint64
// Received returns the number of bytes received
Received() uint64
// Message offers additional information about the current status
Message() string
// Vouchers returns all vouchers sent on this channel
Vouchers() []Voucher
// VoucherResults are results of vouchers sent on the channel
VoucherResults() []VoucherResult
// LastVoucher returns the last voucher sent on the channel
LastVoucher() Voucher
// LastVoucherResult returns the last voucher result sent on the channel
LastVoucherResult() VoucherResult
// ReceivedCids returns the cids received so far on the channel
ReceivedCids() []cid.Cid
// ReceivedCidsLen returns the number of cids received so far on the channel
ReceivedCidsLen() int
// Queued returns the number of bytes read from the node and queued for sending
Queued() uint64
// Stages returns the timeline of events this data transfer has gone through,
// for observability purposes.
//
// It is unsafe for the caller to modify the return value, and changes
// may not be persisted. It should be treated as immutable.
Stages() *ChannelStages
}
// ChannelStages captures a timeline of the progress of a data transfer channel,
// grouped by stages.
//
// EXPERIMENTAL; subject to change.
type ChannelStages struct {
// Stages contains an entry for every stage the channel has gone through.
// Each stage then contains logs.
Stages []*ChannelStage
}
// ChannelStage traces the execution of a data transfer channel stage.
//
// EXPERIMENTAL; subject to change.
type ChannelStage struct {
// Human-readable fields.
// TODO: these _will_ need to be converted to canonical representations, so
// they are machine readable.
Name string
Description string
// Timestamps.
// TODO: may be worth adding an exit timestamp. It _could_ be inferred from
// the start of the next stage, or from the timestamp of the last log line
// if this is a terminal stage. But that's non-determistic and it relies on
// assumptions.
CreatedTime cbg.CborTime
UpdatedTime cbg.CborTime
// Logs contains a detailed timeline of events that occurred inside
// this stage.
Logs []*Log
}
// Log represents a point-in-time event that occurred inside a channel stage.
//
// EXPERIMENTAL; subject to change.
type Log struct {
// Log is a human readable message.
//
// TODO: this _may_ need to be converted to a canonical data model so it
// is machine-readable.
Log string
UpdatedTime cbg.CborTime
}
// AddLog adds a log to the specified stage, creating the stage if
// it doesn't exist yet.
//
// EXPERIMENTAL; subject to change.
func (cs *ChannelStages) AddLog(stage, msg string) {
if cs == nil {
return
}
now := curTime()
st := cs.GetStage(stage)
if st == nil {
st = &ChannelStage{
CreatedTime: now,
}
cs.Stages = append(cs.Stages, st)
}
st.Name = stage
st.UpdatedTime = now
if msg != "" && (len(st.Logs) == 0 || st.Logs[len(st.Logs)-1].Log != msg) {
// only add the log if it's not a duplicate.
st.Logs = append(st.Logs, &Log{msg, now})
}
}
// GetStage returns the ChannelStage object for a named stage, or nil if not found.
//
// TODO: the input should be a strongly-typed enum instead of a free-form string.
// TODO: drop Get from GetStage to make this code more idiomatic. Return a
// second ok boolean to make it even more idiomatic.
//
// EXPERIMENTAL; subject to change.
func (cs *ChannelStages) GetStage(stage string) *ChannelStage {
if cs == nil {
return nil
}
for _, s := range cs.Stages {
if s.Name == stage {
return s
}
}
return nil
}
func curTime() cbg.CborTime {
now := time.Now()
return cbg.CborTime(time.Unix(0, now.UnixNano()).UTC())
}