-
Notifications
You must be signed in to change notification settings - Fork 0
/
lock.go
126 lines (110 loc) · 2.96 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package mvcc
import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
)
const TsMax uint64 = ^uint64(0)
type Lock struct {
Primary []byte
Ts uint64
Ttl uint64
Kind WriteKind
}
type KlPair struct {
Key []byte
Lock *Lock
}
// Info creates a LockInfo object from a Lock object for key.
func (lock *Lock) Info(key []byte) *kvrpcpb.LockInfo {
info := kvrpcpb.LockInfo{}
info.Key = key
info.LockVersion = lock.Ts
info.PrimaryLock = lock.Primary
info.LockTtl = lock.Ttl
return &info
}
func (lock *Lock) ToBytes() []byte {
buf := append(lock.Primary, byte(lock.Kind))
buf = append(buf, make([]byte, 16)...)
binary.BigEndian.PutUint64(buf[len(lock.Primary)+1:], lock.Ts)
binary.BigEndian.PutUint64(buf[len(lock.Primary)+9:], lock.Ttl)
return buf
}
// ParseLock attempts to parse a byte string into a Lock object.
func ParseLock(input []byte) (*Lock, error) {
if len(input) <= 16 {
return nil, fmt.Errorf("mvcc: error parsing lock, not enough input, found %d bytes", len(input))
}
primaryLen := len(input) - 17
primary := input[:primaryLen]
kind := WriteKind(input[primaryLen])
ts := binary.BigEndian.Uint64(input[primaryLen+1:])
ttl := binary.BigEndian.Uint64(input[primaryLen+9:])
return &Lock{Primary: primary, Ts: ts, Ttl: ttl, Kind: kind}, nil
}
// IsLockedFor checks if lock locks key at txnStartTs.
func (lock *Lock) IsLockedFor(key []byte, txnStartTs uint64, resp interface{}) bool {
if lock == nil {
return false
}
if txnStartTs == TsMax && bytes.Compare(key, lock.Primary) != 0 {
return false
}
if lock.Ts <= txnStartTs {
err := &kvrpcpb.KeyError{Locked: lock.Info(key)}
respValue := reflect.ValueOf(resp)
reflect.Indirect(respValue).FieldByName("Error").Set(reflect.ValueOf(err))
return true
}
return false
}
// AllLocksForTxn returns all locks for the current transaction.
func AllLocksForTxn(txn *MvccTxn) ([]KlPair, error) {
var result []KlPair
iter := txn.Reader.IterCF(engine_util.CfLock)
defer iter.Close()
for ; iter.Valid(); iter.Next() {
item := iter.Item()
val, err := item.Value()
if err != nil {
return nil, err
}
lock, err := ParseLock(val)
if err != nil {
return nil, err
}
if lock.Ts == txn.StartTS {
result = append(result, KlPair{item.Key(), lock})
}
}
return result, nil
}
func AllLocks(txn *MvccTxn, startTs, endTs uint64) ([]KlPair, error) {
var result []KlPair
iter := txn.Reader.IterCF(engine_util.CfLock)
defer iter.Close()
for ; iter.Valid(); iter.Next() {
item := iter.Item()
val, err := item.Value()
if err != nil {
return nil, err
}
lock, err := ParseLock(val)
if err != nil {
return nil, err
}
if lock.Ts >= startTs &&
lock.Ts <= endTs {
result = append(result, KlPair{item.Key(), lock})
}
}
return result, nil
}
func (lock *Lock) IsExpired(nextTs uint64) bool {
next := PhysicalTime(nextTs)
return next > (PhysicalTime(lock.Ts) + lock.Ttl)
}