Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate out ILineReader abstraction #1504

Merged
merged 2 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/input/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package input

const CSV_BOM = "\xef\xbb\xbf"
126 changes: 126 additions & 0 deletions pkg/input/line_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// This file contains the interface for file-format-specific record-readers, as
// well as a collection of utility functions.

package input

import (
"bufio"
"container/list"
"io"
)

type ILineReader interface {
Scan() bool
Text() string
}

type TLineReader struct {
scanner *bufio.Scanner
}

// NewLineReader handles reading lines which may be delimited by multi-line separators,
// e.g. "\xe2\x90\x9e" for USV.
func NewLineReader(handle io.Reader, irs string) *TLineReader {
scanner := bufio.NewScanner(handle)

if irs == "\n" || irs == "\r\n" {
// Handled by default scanner.
} else {
irsbytes := []byte(irs)
irslen := len(irsbytes)

// Custom splitter
recordSplitter := func(
data []byte,
atEOF bool,
) (
advance int,
token []byte,
err error,
) {
datalen := len(data)
end := datalen - irslen
for i := 0; i <= end; i++ {
if data[i] == irsbytes[0] {
match := true
for j := 1; j < irslen; j++ {
if data[i+j] != irsbytes[j] {
match = false
break
}
}
if match {
return i + irslen, data[:i], nil
}
}
}
if !atEOF {
return 0, nil, nil
}
// There is one final token to be delivered, which may be the empty string.
// Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this
// but does not trigger an error to be returned from Scan itself.
return 0, data, bufio.ErrFinalToken
}

scanner.Split(recordSplitter)
}

return &TLineReader{
scanner: scanner,
}
}

func (r *TLineReader) Scan() bool {
return r.scanner.Scan()
}

func (r *TLineReader) Text() string {
return r.scanner.Text()
}

// TODO: comment copiously
//
// Lines are written to the channel with their trailing newline (or whatever
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
func channelizedLineReader(
lineReader ILineReader,
linesChannel chan<- *list.List,
downstreamDoneChannel <-chan bool, // for mlr head
recordsPerBatch int64,
) {
i := int64(0)
done := false

lines := list.New()

for lineReader.Scan() {
i++

lines.PushBack(lineReader.Text())

// See if downstream processors will be ignoring further data (e.g. mlr
// head). If so, stop reading. This makes 'mlr head hugefile' exit
// quickly, as it should.
if i%recordsPerBatch == 0 {
select {
case _ = <-downstreamDoneChannel:
done = true
break
default:
break
}
if done {
break
}
linesChannel <- lines
lines = list.New()
}

if done {
break
}
}
linesChannel <- lines
close(linesChannel) // end-of-stream marker
}
171 changes: 0 additions & 171 deletions pkg/input/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,11 @@
package input

import (
"bufio"
"container/list"
"io"
"regexp"
"strings"

"github.com/johnkerl/miller/pkg/cli"
"github.com/johnkerl/miller/pkg/lib"
"github.com/johnkerl/miller/pkg/types"
)

const CSV_BOM = "\xef\xbb\xbf"

// Since Go is concurrent, the context struct (AWK-like variables such as
// FILENAME, NF, NF, FNR, etc.) needs to be duplicated and passed through the
// channels along with each record. Hence the initial context, which readers
Expand All @@ -32,166 +24,3 @@ type IRecordReader interface {
downstreamDoneChannel <-chan bool, // for mlr head
)
}

// NewLineScanner handles read lines which may be delimited by multi-line separators,
// e.g. "\xe2\x90\x9e" for USV.
func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner {
scanner := bufio.NewScanner(handle)

// Handled by default scanner.
if irs == "\n" || irs == "\r\n" {
return scanner
}

irsbytes := []byte(irs)
irslen := len(irsbytes)

// Custom splitter
recordSplitter := func(
data []byte,
atEOF bool,
) (
advance int,
token []byte,
err error,
) {
datalen := len(data)
end := datalen - irslen
for i := 0; i <= end; i++ {
if data[i] == irsbytes[0] {
match := true
for j := 1; j < irslen; j++ {
if data[i+j] != irsbytes[j] {
match = false
break
}
}
if match {
return i + irslen, data[:i], nil
}
}
}
if !atEOF {
return 0, nil, nil
}
// There is one final token to be delivered, which may be the empty string.
// Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this
// but does not trigger an error to be returned from Scan itself.
return 0, data, bufio.ErrFinalToken
}

scanner.Split(recordSplitter)

return scanner
}

// TODO: comment copiously
//
// Lines are written to the channel with their trailing newline (or whatever
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
func channelizedLineScanner(
lineScanner *bufio.Scanner,
linesChannel chan<- *list.List,
downstreamDoneChannel <-chan bool, // for mlr head
recordsPerBatch int64,
) {
i := int64(0)
done := false

lines := list.New()

for lineScanner.Scan() {
i++

lines.PushBack(lineScanner.Text())

// See if downstream processors will be ignoring further data (e.g. mlr
// head). If so, stop reading. This makes 'mlr head hugefile' exit
// quickly, as it should.
if i%recordsPerBatch == 0 {
select {
case _ = <-downstreamDoneChannel:
done = true
break
default:
break
}
if done {
break
}
linesChannel <- lines
lines = list.New()
}

if done {
break
}
}
linesChannel <- lines
close(linesChannel) // end-of-stream marker
}

// IPairSplitter splits a string into left and right, e.g. for IPS.
// This helps us reuse code for splitting by IPS string, or IPS regex.
type iPairSplitter interface {
Split(input string) []string
}

func newPairSplitter(options *cli.TReaderOptions) iPairSplitter {
if options.IPSRegex == nil {
return &tIPSSplitter{ips: options.IPS}
} else {
return &tIPSRegexSplitter{ipsRegex: options.IPSRegex}
}
}

type tIPSSplitter struct {
ips string
}

func (s *tIPSSplitter) Split(input string) []string {
return strings.SplitN(input, s.ips, 2)
}

type tIPSRegexSplitter struct {
ipsRegex *regexp.Regexp
}

func (s *tIPSRegexSplitter) Split(input string) []string {
return lib.RegexCompiledSplitString(s.ipsRegex, input, 2)
}

// IFieldSplitter splits a string into pieces, e.g. for IFS.
// This helps us reuse code for splitting by IFS string, or IFS regex.
type iFieldSplitter interface {
Split(input string) []string
}

func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter {
if options.IFSRegex == nil {
return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS}
} else {
return &tIFSRegexSplitter{ifsRegex: options.IFSRegex}
}
}

type tIFSSplitter struct {
ifs string
allowRepeatIFS bool
}

func (s *tIFSSplitter) Split(input string) []string {
fields := lib.SplitString(input, s.ifs)
if s.allowRepeatIFS {
fields = lib.StripEmpties(fields) // left/right trim
}
return fields
}

type tIFSRegexSplitter struct {
ifsRegex *regexp.Regexp
}

func (s *tIFSRegexSplitter) Split(input string) []string {
return lib.RegexCompiledSplitString(s.ifsRegex, input, -1)
}
4 changes: 2 additions & 2 deletions pkg/input/record_reader_csvlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ func (reader *RecordReaderCSVLite) processHandle(
reader.headerStrings = nil

recordsPerBatch := reader.recordsPerBatch
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
linesChannel := make(chan *list.List, recordsPerBatch)
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)

for {
recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel)
Expand Down
4 changes: 2 additions & 2 deletions pkg/input/record_reader_dkvp_nidx.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (reader *RecordReaderDKVPNIDX) processHandle(
context.UpdateForStartOfFile(filename)
recordsPerBatch := reader.recordsPerBatch

lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
linesChannel := make(chan *list.List, recordsPerBatch)
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)

for {
recordsAndContexts, eof := reader.getRecordBatch(linesChannel, errorChannel, context)
Expand Down
9 changes: 4 additions & 5 deletions pkg/input/record_reader_json.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package input

import (
"bufio"
"container/list"
"fmt"
"io"
Expand Down Expand Up @@ -203,7 +202,7 @@ func (reader *RecordReaderJSON) processHandle(
// JSONCommentEnabledReader implements io.Reader to strip comment lines
// off of CSV data.
type JSONCommentEnabledReader struct {
lineScanner *bufio.Scanner
lineReader ILineReader
readerOptions *cli.TReaderOptions
context *types.Context // Needed for channelized stdout-printing logic
readerChannel chan<- *list.List // list of *types.RecordAndContext
Expand All @@ -220,7 +219,7 @@ func NewJSONCommentEnabledReader(
readerChannel chan<- *list.List, // list of *types.RecordAndContext
) *JSONCommentEnabledReader {
return &JSONCommentEnabledReader{
lineScanner: bufio.NewScanner(underlying),
lineReader: NewLineReader(underlying, "\n"),
readerOptions: readerOptions,
context: types.NewNilContext(),
readerChannel: readerChannel,
Expand All @@ -237,10 +236,10 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) {
// Loop until we can get a non-comment line to pass on, or end of file.
for {
// EOF
if !bsr.lineScanner.Scan() {
if !bsr.lineReader.Scan() {
return 0, io.EOF
}
line := bsr.lineScanner.Text()
line := bsr.lineReader.Text()

// Non-comment line
if !strings.HasPrefix(line, bsr.readerOptions.CommentString) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/input/record_reader_pprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ func (reader *RecordReaderPprintBarredOrMarkdown) processHandle(
reader.headerStrings = nil

recordsPerBatch := reader.recordsPerBatch
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
linesChannel := make(chan *list.List, recordsPerBatch)
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)

for {
recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel)
Expand Down
Loading
Loading