-
Notifications
You must be signed in to change notification settings - Fork 0
/
data.go
146 lines (113 loc) · 3.01 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package qube
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"github.com/valyala/fastjson"
"github.com/winebarrel/qube/util"
)
var (
// End of data
EOD = errors.New("EOD")
)
type DataOptions struct {
DataFiles []string `kong:"short='f',required,help='JSON Lines file list of queries to execute.'"`
Key string `kong:"default='q',help='Key name of the query field in the test data. e.g. {\"q\":\"SELECT ...\"}'"`
Loop bool `kong:"negatable,default='true',help='Return to the beginning after reading the test data. (default: enabled)'"`
Random bool `kong:"negatable,default='false',help='Randomize the starting position of the test data. (default: disabled)'"`
CommitRate uint `kong:"help='Number of queries to execute \"COMMIT\".'"`
}
type Data struct {
*DataOptions
file *os.File
reader *bufio.Reader
count uint
inTxn bool
}
func NewData(options *Options, agentNum uint64) (*Data, error) {
dataFile := options.DataFiles[agentNum%uint64(len(options.DataFiles))]
file, err := os.OpenFile(dataFile, os.O_RDONLY, 0)
if err != nil {
return nil, fmt.Errorf("failed to open test data - %s (%w)", dataFile, err)
}
fileInfo, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("failed to get test data file info - %s (%w)", dataFile, err)
}
if fileInfo.Size() == 0 {
return nil, fmt.Errorf("test data is empty - %s", dataFile)
}
if options.Random {
err = util.RandSeek(file)
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to seek test data (%w)", err)
}
}
reader := bufio.NewReader(file)
if options.Random {
// If it is random, skip one line
_, err = util.ReadLine(reader)
if err == io.EOF {
_, err = file.Seek(0, io.SeekStart)
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to rewind test data (%w)", err)
}
} else if err != nil {
file.Close()
return nil, fmt.Errorf("failed to read test data (%w)", err)
}
}
data := &Data{
DataOptions: &options.DataOptions,
file: file,
reader: reader,
}
return data, nil
}
func (data *Data) Next() (string, error) {
data.count++
if data.CommitRate > 0 && !data.inTxn {
data.inTxn = true
return "begin", nil
}
if data.CommitRate > 0 && data.count%(data.CommitRate+2) == 0 {
data.inTxn = false
return "commit", nil
}
for {
line, err := util.ReadLine(data.reader)
if err == io.EOF {
if !data.Loop {
return "", EOD
}
_, err = data.file.Seek(0, io.SeekStart)
if err != nil {
return "", fmt.Errorf("failed to rewind test data (%w)", err)
}
data.reader.Reset(data.file)
continue
}
if err != nil {
return "", fmt.Errorf("failed to read test data (%w)", err)
}
if len(line) == 0 {
continue
}
if bytes.HasPrefix(line, []byte("//")) {
continue
}
query := fastjson.GetString(line, data.Key)
if query == "" {
return "", fmt.Errorf(`failed to get query field "%s" from '%s'`, data.Key, line)
}
return query, nil
}
}
func (data *Data) Close() error {
return data.file.Close()
}