-
Notifications
You must be signed in to change notification settings - Fork 48
/
scanner.go
148 lines (126 loc) · 4.41 KB
/
scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package osmpbf
import (
"context"
"io"
"sync/atomic"
"github.com/paulmach/osm"
)
var _ osm.Scanner = &Scanner{}
// Scanner provides a convenient interface reading a stream of osm data
// from a file or url. Successive calls to the Scan method will step through the data.
//
// Scanning stops unrecoverably at EOF, the first I/O error, the first xml error or
// the context being cancelled. When a scan stops, the reader may have advanced
// arbitrarily far past the last token.
//
// The Scanner API is based on bufio.Scanner
// https://golang.org/pkg/bufio/#Scanner
type Scanner struct {
// Skip element types that are not needed. The data is skipped
// at the encoded protobuf level, but each block still needs to be decompressed.
SkipNodes bool
SkipWays bool
SkipRelations bool
// If the Filter function is false, the element well be skipped
// at the decoding level. The functions should be fast, they block the
// decoder, there are `procs` number of concurrent decoders.
// Elements can be stored if the function returns true. Memory is
// reused if the filter returns false.
FilterNode func(*osm.Node) bool
FilterWay func(*osm.Way) bool
FilterRelation func(*osm.Relation) bool
ctx context.Context
closed bool
decoder *decoder
started bool
procs int
next osm.Object
err error
}
// New returns a new Scanner to read from r.
// procs indicates amount of paralellism, when reading blocks
// which will off load the unzipping/decoding to multiple cpus.
func New(ctx context.Context, r io.Reader, procs int) *Scanner {
if ctx == nil {
ctx = context.Background()
}
s := &Scanner{
ctx: ctx,
procs: procs,
}
s.decoder = newDecoder(ctx, s, r)
return s
}
// FullyScannedBytes returns the number of bytes that have been read
// and fully scanned. OSM protobuf files contain data blocks with
// 8000 nodes each. The returned value contains the bytes for the blocks
// that have been fully scanned.
//
// A user can use this number of seek forward in a file
// and begin reading mid-data. Note that while elements are usually sorted
// by Type, ID, Version in OSM protobuf files, versions of given element may
// span blocks.
func (s *Scanner) FullyScannedBytes() int64 {
return atomic.LoadInt64(&s.decoder.cOffset)
}
// PreviousFullyScannedBytes returns the previous value of FullyScannedBytes.
// This is interesting because it's not totally clear if a feature spans a block.
// For example, if one quits after finding the first relation, upon restarting there
// is no way of knowing if the first relation is complete, so skip it. But if this relation
// is the first relation in the file we'll skip a full relation.
func (s *Scanner) PreviousFullyScannedBytes() int64 {
return atomic.LoadInt64(&s.decoder.pOffset)
}
// Close cleans up all the reading goroutines, it does not
// close the underlying reader.
func (s *Scanner) Close() error {
s.closed = true
return s.decoder.Close()
}
// Header returns the pbf file header with interesting information
// about how it was created.
func (s *Scanner) Header() (*Header, error) {
if !s.started {
s.started = true
// the header gets read before Start returns
s.err = s.decoder.Start(s.procs)
}
return s.decoder.header, s.err
}
// Scan advances the Scanner to the next element, which will then be available
// through the Element method. It returns false when the scan stops, either
// by reaching the end of the input, an io error, an xml error or the context
// being cancelled. After Scan returns false, the Err method will return any
// error that occurred during scanning, except that if it was io.EOF, Err will
// return nil.
func (s *Scanner) Scan() bool {
if !s.started {
s.started = true
s.err = s.decoder.Start(s.procs)
}
if s.err != nil || s.closed || s.ctx.Err() != nil {
return false
}
s.next, s.err = s.decoder.Next()
return s.err == nil
}
// Object returns the most recent token generated by a call to Scan
// as a new osm.Object. Currently osm.pbf files only contain nodes, ways and
// relations. This method returns an object so match the osm.Scanner interface
// and allows this Scanner to share an interface with osmxml.Scanner.
func (s *Scanner) Object() osm.Object {
return s.next
}
// Err returns the first non-EOF error that was encountered by the Scanner.
func (s *Scanner) Err() error {
if s.err == io.EOF {
return nil
}
if s.err != nil {
return s.err
}
if s.closed {
return osm.ErrScannerClosed
}
return s.ctx.Err()
}