forked from parquet-go/parquet-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
filter.go
90 lines (74 loc) · 1.71 KB
/
filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package parquet
// FilterRowReader constructs a RowReader which exposes rows from reader for
// which the predicate has returned true.
func FilterRowReader(reader RowReader, predicate func(Row) bool) RowReader {
f := &filterRowReader{reader: reader, predicate: predicate}
for i := range f.rows {
f.rows[i] = f.values[i : i : i+1]
}
return f
}
type filterRowReader struct {
reader RowReader
predicate func(Row) bool
rows [defaultRowBufferSize]Row
values [defaultRowBufferSize]Value
}
func (f *filterRowReader) ReadRows(rows []Row) (n int, err error) {
for n < len(rows) {
r := len(rows) - n
if r > len(f.rows) {
r = len(f.rows)
}
r, err = f.reader.ReadRows(f.rows[:r])
for i := 0; i < r; i++ {
if f.predicate(f.rows[i]) {
rows[n] = append(rows[n][:0], f.rows[i]...)
n++
}
}
if err != nil {
break
}
}
return n, err
}
// FilterRowWriter constructs a RowWriter which writes rows to writer for which
// the predicate has returned true.
func FilterRowWriter(writer RowWriter, predicate func(Row) bool) RowWriter {
return &filterRowWriter{writer: writer, predicate: predicate}
}
type filterRowWriter struct {
writer RowWriter
predicate func(Row) bool
rows [defaultRowBufferSize]Row
}
func (f *filterRowWriter) WriteRows(rows []Row) (n int, err error) {
defer func() {
clear := f.rows[:]
for i := range clear {
clearValues(clear[i])
}
}()
for n < len(rows) {
i := 0
j := len(rows) - n
if j > len(f.rows) {
j = len(f.rows)
}
for _, row := range rows[n : n+j] {
if f.predicate(row) {
f.rows[i] = row
i++
}
}
if i > 0 {
_, err := f.writer.WriteRows(f.rows[:i])
if err != nil {
break
}
}
n += j
}
return n, err
}