-
Notifications
You must be signed in to change notification settings - Fork 216
/
channel_writer.go
130 lines (118 loc) · 3.32 KB
/
channel_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package output
import (
"bufio"
"container/list"
"fmt"
"os"
"github.com/johnkerl/miller/v6/pkg/cli"
"github.com/johnkerl/miller/v6/pkg/types"
)
func ChannelWriter(
writerChannel <-chan *list.List, // list of *types.RecordAndContext
recordWriter IRecordWriter,
writerOptions *cli.TWriterOptions,
doneChannel chan<- bool,
dataProcessingErrorChannel chan<- bool,
bufferedOutputStream *bufio.Writer,
outputIsStdout bool,
) {
for {
recordsAndContexts := <-writerChannel
done, errored := channelWriterHandleBatch(
recordsAndContexts,
recordWriter,
writerOptions,
dataProcessingErrorChannel,
bufferedOutputStream,
outputIsStdout,
)
if errored {
dataProcessingErrorChannel <- true
doneChannel <- true
break
}
if done {
doneChannel <- true
break
}
}
}
// TODO: comment
// Returns true on end of record stream
func channelWriterHandleBatch(
recordsAndContexts *list.List,
recordWriter IRecordWriter,
writerOptions *cli.TWriterOptions,
dataProcessingErrorChannel chan<- bool,
bufferedOutputStream *bufio.Writer,
outputIsStdout bool,
) (done bool, errored bool) {
for e := recordsAndContexts.Front(); e != nil; e = e.Next() {
recordAndContext := e.Value.(*types.RecordAndContext)
// Three things can come through:
// * End-of-stream marker
// * Non-nil records to be printed
// * Strings to be printed from put/filter DSL print/dump/etc
// statements. They are handled here rather than fmt.Println directly
// in the put/filter handlers since we want all print statements and
// record-output to be in the same goroutine, for deterministic
// output ordering.
if !recordAndContext.EndOfStream {
record := recordAndContext.Record
context := &recordAndContext.Context
// XXX more
// XXX also make sure this results in exit 1 & goroutine cleanup
if writerOptions.FailOnDataError {
ok := true
for pe := record.Head; pe != nil; pe = pe.Next {
if pe.Value.IsError() {
context := recordAndContext.Context
fmt.Fprintf(os.Stderr, "mlr: data error at NR=%d FNR=%d FILENAME=%s\n",
context.NR, context.FNR, context.FILENAME,
)
is, err := pe.Value.GetError()
if is {
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: field %s: %v\n", pe.Key, err)
} else {
fmt.Fprintf(os.Stderr, "mlr: field %s\n", pe.Key)
}
ok = false
}
}
}
if !ok {
return true, true
}
}
if record != nil {
err := recordWriter.Write(record, context, bufferedOutputStream, outputIsStdout)
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: %v\n", err)
return true, true
}
}
outputString := recordAndContext.OutputString
if outputString != "" {
bufferedOutputStream.WriteString(outputString)
}
if writerOptions.FlushOnEveryRecord {
bufferedOutputStream.Flush()
}
} else {
// Let the record-writers drain their output, if they have any
// queued up. For example, PPRINT needs to see all same-schema
// records before printing any, since it needs to compute max width
// down columns.
context := &recordAndContext.Context
err := recordWriter.Write(nil, context, bufferedOutputStream, outputIsStdout)
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: %v\n", err)
return true, true
} else {
return true, false
}
}
}
return false, false
}