-
Notifications
You must be signed in to change notification settings - Fork 72
/
Copy pathbinlog_writer.go
123 lines (97 loc) · 3.02 KB
/
binlog_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package ghostferry
import (
"fmt"
"time"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"github.com/sirupsen/logrus"
)
type BinlogWriter struct {
DB *sql.DB
DatabaseRewrites map[string]string
TableRewrites map[string]string
Throttler Throttler
BatchSize int
WriteRetries int
ErrorHandler ErrorHandler
StateTracker *StateTracker
binlogEventBuffer chan DMLEvent
// Useful for debugging binlog writer lag, if diverged from binlog streamer lag
lastProcessedEventTime time.Time
logger *logrus.Entry
}
func (b *BinlogWriter) Run() {
b.logger = logrus.WithField("tag", "binlog_writer")
b.binlogEventBuffer = make(chan DMLEvent, b.BatchSize)
batch := make([]DMLEvent, 0, b.BatchSize)
for {
firstEvent := <-b.binlogEventBuffer
if firstEvent == nil {
// Channel is closed, no more events to write
break
}
batch = append(batch, firstEvent)
wantMoreEvents := true
for wantMoreEvents && len(batch) < b.BatchSize {
select {
case event := <-b.binlogEventBuffer:
if event != nil {
batch = append(batch, event)
} else {
// Channel is closed, finish writing batch.
wantMoreEvents = false
}
default: // Nothing in the buffer so just write it
wantMoreEvents = false
}
}
err := WithRetries(b.WriteRetries, 0, b.logger, "write events to target", func() error {
return b.writeEvents(batch)
})
if err != nil {
b.ErrorHandler.Fatal("binlog_writer", err)
}
batch = make([]DMLEvent, 0, b.BatchSize)
}
}
func (b *BinlogWriter) Stop() {
close(b.binlogEventBuffer)
}
func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error {
for _, event := range events {
b.binlogEventBuffer <- event
}
return nil
}
func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
WaitForThrottle(b.Throttler)
queryBuffer := []byte(sql.AnnotateStmt("BEGIN;\n", b.DB.Marginalia))
for _, ev := range events {
eventDatabaseName := ev.Database()
if targetDatabaseName, exists := b.DatabaseRewrites[eventDatabaseName]; exists {
eventDatabaseName = targetDatabaseName
}
eventTableName := ev.Table()
if targetTableName, exists := b.TableRewrites[eventTableName]; exists {
eventTableName = targetTableName
}
sqlStmt, err := ev.AsSQLString(eventDatabaseName, eventTableName)
if err != nil {
return fmt.Errorf("generating sql query at pos %v: %v", ev.BinlogPosition(), err)
}
queryBuffer = append(queryBuffer, sql.AnnotateStmt(sqlStmt, b.DB.Marginalia)...)
queryBuffer = append(queryBuffer, ";\n"...)
}
queryBuffer = append(queryBuffer, "COMMIT"...)
startEv := events[0]
endEv := events[len(events)-1]
query := string(queryBuffer)
_, err := b.DB.Exec(query)
if err != nil {
return fmt.Errorf("exec query at pos %v -> %v (%d bytes): %v", startEv.BinlogPosition(), endEv.BinlogPosition(), len(query), err)
}
if b.StateTracker != nil {
b.StateTracker.UpdateLastResumableSourceBinlogPosition(events[len(events)-1].ResumableBinlogPosition())
}
b.lastProcessedEventTime = events[len(events)-1].Timestamp()
return nil
}