以下分析基于gh-ost 1.0.35
包 | 描述 | 备注 |
---|---|---|
base | 定义了整个过程的中间变量和方法,utils.go提供检查文件是否存在、字符串包含等 |
| | mysql | binlog文件坐标处理方法和dsn处理方法,utils.go提供获取binlog坐标、获取主库实例等方法 |
| | sql | 用于解析sql语句和构造sql语句的工具 |
| | binlog | 获取binlog的入口 |
| | cmd | 程序入口 |
| | logic | inspect.go: replicat库上能否进行数据迁移的检查和校验 applier.go: 被数据迁移库上:数据迁移和cutover的方法 server.go: gh-ost控制数据迁移的交互接口 streamer.go: 流式读取binlog和events事件通知器 migrator.go: 逻辑入口:prepare、migrate、cut-over throttler.go: 节流入口:库上超负载 |
|
gh-ost中主从复制以及解析binlog的库使用的是一个第三方的库https://github.com/siddontang/go-mysql (由PingCAP的siddontang开发)
- inspector:监听拉取mysql binlog。其连接的mysql根据工作模式的不同,既可以是master也可以是slave
- applier:rowcopy和binblog应用回mysql。其连接的mysql根据工作模式不同,即可以是master也可以是replica
- streamer: binlog解析成BinlogEntry
- hooksExecutor:在各个阶段(start-up, copy-completed, ...)执行一些外部可执行文件/脚本,默认disabled,https://github.com/github/gh-ost/blob/master/doc/hooks.md
主流程都在这里,实现各个流程的协调和数据传输。
// Migrator is the main schema migration flow manager.
type Migrator struct {
parser *sql.Parser
inspector *Inspector
applier *Applier
replicator *replica.Replicator
eventsStreamer *EventsStreamer
server *Server
throttler *Throttler
hooksExecutor *HooksExecutor
migrationContext *base.MigrationContext
firstThrottlingCollected chan bool
ghostTableMigrated chan bool
rowCopyComplete chan error
allEventsUpToLockProcessed chan string
rowCopyCompleteFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
// binlog event 队列,由addDMLEventsListener(Migrator的方法)向EventsStream注册的DMLEventsListener向其中put
// applyEventsQueue的大小为100
// applyEventStruct实际上是包含两个field:tableWriteFunc和BinlogDMLEvent
// 正常数据只会有BinlogDMLEvent,tableWriteFunc是nil
// todo
applyEventsQueue chan *applyEventStruct
handledChangelogStates map[string]bool
}
// EventsStreamer reads data from binary logs and streams it on. It acts as a publisher,
// and interested parties may subscribe for per-table events.
type EventsStreamer struct {
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
migrationContext *base.MigrationContext
initialBinlogCoordinates *mysql.BinlogCoordinates
listeners [](*BinlogEventListener)
listenersMutex *sync.Mutex
eventsChannel chan *binlog.BinlogEntry // 由StreamEvents从GoMySQLReader.binlogStreamer中读取数据并put到其中
binlogReader *binlog.GoMySQLReader
}
type GoMySQLReader struct {
connectionConfig *mysql.ConnectionConfig
binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
currentCoordinates mysql.BinlogCoordinates
currentCoordinatesMutex *sync.Mutex
LastAppliedRowsEventHint mysql.BinlogCoordinates
MigrationContext *base.MigrationContext
}
// Inspector reads data from the read-MySQL-server (typically a replica, but can be the master)
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
type Inspector struct {
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
migrationContext *base.MigrationContext
generator *replica.Generator
}
// Applier connects and writes the the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given.
// Applier is the one to actually write row data and apply binlog events onto the ghost table.
// It is where the ghost & changelog tables get created. It is where the cut-over phase happens.
type Applier struct {
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
singletonDB *gosql.DB
migrationContext *base.MigrationContext
}
流程逻辑入口migrator.Migrate()
_**ParseAlterStatement, validateStatement**_
_:解析和验证alter语句,找出rename和drop的列(MigrationContext.ColumnRenameMap_和_MigrationContext.DroppedColumnsMap)。是否允许rename受参数_approve-renamed-columns 和 _skip-renamed-columns_控制,如果不允许skip-renamed-columns,那么必须approve-renamed-columns ,否则报错。- 初始化inspector
**_initiateInspector_**
:- _Inspector._InitDBConnections:初始化并验证数据库连接(Inspector.validateConnection),验证权限(_Inspector.validateGrants _didi gh-ost在该commit中有修改逻辑),验证binlog格式(Inspector.validateBinlogs),设置binlog为row格式(Inspector.applyBinlogFormat)
- ValidateOriginalTable: 检查原表,获取Engine、Rows信息(show /* gh-ost */ table status from %s like '%s'),检查外键,检查触发器,explain获取行数
**_InspectOriginalTable_**
:获取原表**所有列(MigrationContext.OriginalTableColumns)和唯一键(包括主键)(MigrationContext.OriginalTableUniqueKeys)**信息。- 检查--test-on-replica 和 --migrate-on-replica
_**initiateStreaming**_
(确定binlog apply的起始位点并开始dump binlog)- InitDBConnections,初始化DB连接:判断是否有hadZombieBinlogDump;获取inspector上当前binlog的位点(show /* gh-ost readCurrentBinlogCoordinates */ master status),初始化binlogStreamer(发送BINLOG DUMP开始在新的协程中dump binlog(go b.onStream(s)))
- 向EventsStreamer中添加Changelog表的监听onChangelogStateEvent(通过binlog event)
- StreamEvents:开启goroutine** 从EventsStreamer.eventsChannel中读取event,然后调用**Listener。目前就changelog和dml两个listener,分别负责处理changeLog表的更新和put event到Migrator.applyEventsQueue中
- StreamEvents:开启goroutine从binlogStreamer中读取event并put到EventsStreamer.eventsChannel中
**_initiateApplier_**
,初始化applyer。- 检查ghost表是否存在以及是否drop(didi gh-ost 修改了drop逻辑)
- 创建changelog表(记录任务元信息)和ghost表
- alter ghost表
- 更新changelog表内记录的 status为
GhostTableMigrated
- 等待ghostTableMigrated消息(上一步中更新changelog表的binlog中取得,在onChangelogStateEvent中处理)
**_inspectOriginalAndGhostTables_**
:验证原表和ghost表- 获取ghost表的所有列信息(MigrationContext.GhostTabeColumns)和唯一键(包括主键)信息(MigrationContext.GhostTableUniqueKeys)
- 找到原表和ghost表都有的unique key(migrationContext.UniqueKey)。gh-ost需要使用唯一键或主键确定rowcopy的范围
- 找到原表和ghost表都有的列(MigrationContext.SharedColumns)和rename后的映射关系(MigrationContext.MappedSharedColumns.inspectOriginalAndGhostTables),如果有虚拟列就去掉虚拟列(didi-gh-ost新增)
- 计原表行数,在goroutine中计算
- 向EventsStreamer中添加DMLEventsListener,该listener的功能是把从EventsStreamer.eventsChannel中读取的BinlogDMLEvent添加到Migrator.applyEventsQueue中
**ReadMigrationRangeValues**
:reads min/max values that will be used for rowcopy- initiateThrottler:初始化限流模块,//todo
- executeWriteFuncs:goroutine中开始rowcopy和binlog apply
- iterateChunks:rowcopy
- consumeRowCopyComplete:等待rowcopy结束
- cutover
入口函数:go/logic/migrator.go:: executeWriteFuncs 、 iterateChunks,两个函数都在goroutine中执行,然后由consumeRowCopyComplete等待数据拷贝结束
for {
// We give higher priority to event processing, then secondary priority to rowcopy
select {
case eventStruct := <-this.applyEventsQueue: // 优先binlog apply,applyEventsQueue大小为100
{
this.onApplyEventStruct(eventStruct)
}
default: // 如果applyEventsQueue中没数据,会进入default
{
select {
case copyRowsFunc := <-this.copyRowsQueue:
{
// Retries are handled within the copyRowsFunc
copyRowsFunc()
// 根据设置的nice值sleep一定时间
}
default: // 如果copyRowsQueue中没数据,会进入default。进入该defalult表示applyEventsQueue和copyRowsQueue中都没数据
{
// Hmmmmm... nothing in the queue; no events, but also no row copy.
// This is possible upon load. Let's just sleep it over.
time.Sleep(time.Second)
}
}
}
}
}
binlog apply的函数调用关系是onApplyEventStruct → ApplyDMLEventQueries → buildDMLEventQuery。
- onApplyEventStruct函数处理DMLEvent和tableWriteFunc,会根据_dml-batch-size_参数对applyEventsQueue中的数据做batch;
- ApplyDMLEventQueries是真正执行sql语句的函数;
- buildDMLEventQuery根据DeleteDML、InsertDML、UpdateDML生成对应的SQL语句。
Q&A Q: binlog apply何时结束? A: 在cutover流程锁表并且消费完所有ddl表的binlog后,参考cutover流程
func (this *Migrator) iterateChunks() error {
// 结束rowcopy回调,把rowcopy结束标志写到rowCopyComplete中
terminateRowIteration := func(err error) error {
// 注意这里的err是terminateRowIteration传进来的,可以是nil
this.rowCopyComplete <- err
return log.Errore(err)
}
var hasNoFurtherRangeFlag int64
// Iterate per chunk:
for {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
// Done
// There's another such check down the line
return nil
}
copyRowsFunc := func() error {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
// Done.
// There's another such check down the line
return nil
}
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
hasFurtherRange := false
if err := this.retryOperation(func() (e error) {
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues() //检测是否还有数据要迁移
return e
}); err != nil {
return terminateRowIteration(err)
}
if !hasFurtherRange {
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
return terminateRowIteration(nil) // 没有数据需要rowcopy了,这里的参数是nil
}
// Copy task:
applyCopyRowsFunc := func() error {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
return nil
}
_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() //真正执行rowcopy的函数,insert ignore (select ...) ...
if err != nil {
return err // wrapping call will retry
}
return nil
}
if err := this.retryOperation(applyCopyRowsFunc); err != nil { //重试执行rowcopy函数
return terminateRowIteration(err)
}
return nil
}
// Enqueue copy operation; to be executed by executeWriteFuncs()
this.copyRowsQueue <- copyRowsFunc // 把rowcopy闭包put到copyRowsQueue中
}
return nil
}
rowcopy直接调用copyRowsQueue中取出来的copyRowsFunc闭包,copyRowsQueue是一个不带缓冲的chan,其调用关系:copyRowsFunc → ApplyIterationInsertQuery → BuildRangeInsertPreparedQuery。
BuildRangeInsertPreparedQuery:生成insert ignore into ... (select ... from ... force index (xxx) where (xxx and xxx) lock in share mode) 语句
consumeRowCopyComplete会阻塞直到Migratoe.rowCopyComplete中有数据。如果rowcopy是正常结束的,会写nil到rowCopyComplete中,读到的数据是nil,然后把Migratoe.rowCopyCompleteFlag置为true。
// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
// consumes and drops any further incoming events that may be left hanging.
func (this *Migrator) consumeRowCopyComplete() {
if err := <-this.rowCopyComplete; err != nil {
this.migrationContext.PanicAbort <- err
} else {
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
this.migrationContext.MarkRowCopyEndTime()
}
go func() {
for err := range this.rowCopyComplete {
if err != nil {
this.migrationContext.PanicAbort <- err
}
}
}()
}
Q&A Q: rowcopy的范围如何确定? A: ReadMigrationRangeValues 就确定了
Q: 何时结束rowcopy?CalculateNextIterationRangeEndValues A: copy 完所有最开始确定要copy的数据
Q: row copy对隔离级别有要求吗? A: 没有,不论是什么隔离级别都行。只要是在binlog dump后计算的rowcopy范围,等到rowcopy和binlog apply完成后数据一定是正确的。注意必须先开始binlog dump或记录binlog位点然后再计算rowcopy范围,因为如果先计算了rowcopy范围,然后再记录binlog位点,在两步之间写入的数据可能永远不会被appply(rowcopy没有该行记录,binlog记录的位点之后也没有该记录)
在rowcopy完成后(阻塞在函数_consumeRowCopyComplete_,等待_Migratoe.rowCopyComplete_信号),就可以开始cutOver流程了。 入口函数**cutOver**,分为_atomicCutOver_和_TwoStepCutOver_两种方式。cutOver会一直retry(使用retryOperation,该函数在重试一定次数都失败后,就返回失败),次数受参数“default-retries”控制,默认值30
首先是cutover前的各种检查:
- cut-over postpone的检查(postpone-cut-over-flag-file),如果设置了该参数,会一直阻塞直到postpone文件被删除
- test-on-replica the replication is first stopped; then the cut-over is executed just as on master, but then reverted (tables rename forth then back again).
接下来调用atomicCutOver或TwoStepCutOver
gh-ost也是异步模式,利用Mysql一个特性,就是在所有被blocked的请求中,rename请求是永远最优先的。 一条连接对原表加锁,另一条连接进行rename操作,此时rename会被blocked掉,当unlock后,rename请求会优先被处理,其他的请求会应用到新表上。
主要流程如下,包括3个goroutine,其中括号中的数字表示实际执行的顺序,有相同的数字表示可能是同时执行的;括号中的->表示是一个阻塞操作,阻塞前后的状态。
主goroutinue
- (1)_ _AtomicCutOverMagicLock:创建锁表和解锁表的goruntine。
- (2 → 8) <- tableLocked:等待lock表完成。
- (8)** waitForEventsUpToLock**:等待原表的binlog apply完成。到这里表示原表已经lock,所有原表新的DML操作已经被block了,验证binlog apply完成的方法是向changeLog表写入一条“AllEventsUpToLockProcessed” stat hint数据,并等待binlog消费到该数据。
- (9)** AtomicCutoverRename**:创建rename goroutine
- (10 → 11) 获取renameSessionId,会阻塞等待
- (11) waitForRename → ExpectProcess:检查renameSession存活且被阻塞住(show processlist)
- (12)** ExpectUsedLock**:检查lockOriginalSession存活(select is_used_lock(lockOriginalSession)检查voluntary lock存在)
- (13)_ okToUnlockTable_ <- true:通知AtomicCutOverMagicLock释放锁。到这里表示已经确认了RENAME blocking并且locking connectiing still alive,下一步就是unlock tables,然后rename就会自动执行。
- (14 → 15) <-tableUnlocked:等待lockOriginalSession unlock table 完成
- (16) <-tablesRenamed:等待renameSession rename完成
AtomicCutOverMagicLock
- (3) 开启新连接和事务(lockOriginalSession)
- (4) Grabbing voluntary lock: _select get_lock(lockOriginalSession, 0)。voluntary lock_是为了让主线程验证lockOriginalSession仍然存活,如果lockOriginalSession连接断开,该锁会自动释放。
- (5) 创建Sentry Table,后缀为 _[timestamp]_gho_old (根据_timestamp-old-table_参数决定是否添加timestamp)。Sentry Table是为了防止lockOriginalSession异常退出后rename被执行。
- (6) 对原表和SentryTable加write锁,超时时间为_cut-over-lock-timeout-seconds_,默认3s。完成这一步后对原表的所有DML操作都被阻塞,rename也被阻塞
- (7) 通知主goroutine加锁结果,如果加锁成功,向_tableLocked_写入_nil_标识通知主goroutine,否则写入_err_
- (8 → 14) 等待_okToUnlockTable_通知(正常是在主goroutine检查完rename连接和lock连接后)
- **(14) drop SentryTable and **unlock tables(这一步完成后,blocking的rename会被立刻执行)
- (15) 通知主goroutine unlock的结果,tableUnlocked ← err 或 nil
AtomicCutoverRename
- (10) 开启新连接和事务(renameSession)并设置session lock_wait_timeou = cut-over-lock-timeout-seconds,默认值3s
- (11 → 15) rename:originalTable → OldTable(_gho_old),ghostTable(_gho_new) → originalTable,一条语句中执行(这里会阻塞住,直到lockOriginalSession drop SentryTable并释放锁)
- (15) 通知主goroutine rename结果,tablesRenamed <- err 或 nil
- (16) 如果rename出错,需要unlockTable,okToUnlockTable <- true,通知AtomicCutOverMagicLock解锁
注意:加锁解锁(AtomicCutOverMagicLock)、rename(AtomicCutoverRename)在不同的session中执行