diff --git a/sources/mysql/streaming/ddl/ddl.go b/sources/mysql/streaming/ddl/ddl.go index c346517f..eac09189 100644 --- a/sources/mysql/streaming/ddl/ddl.go +++ b/sources/mysql/streaming/ddl/ddl.go @@ -55,7 +55,7 @@ func (s *SchemaAdapter) ApplyDDL(unixTs int64, query string) error { } func (s *SchemaAdapter) applyDDL(unixTs int64, result antlr.Event) error { - switch result.(type) { + switch castedResult := result.(type) { case antlr.DropTableEvent: delete(s.adapters, result.GetTable()) return nil @@ -76,6 +76,34 @@ func (s *SchemaAdapter) applyDDL(unixTs int64, result antlr.Event) error { s.adapters[result.GetTable()] = tblAdapter return nil + case antlr.CopyTableEvent: + existingTableAdapter, ok := s.adapters[castedResult.GetCopyFromTableName()] + if !ok { + return fmt.Errorf("table not found: %q", castedResult.GetTable()) + } + + tblAdapter, err := NewTableAdapter(s.dbName, s.tableCfgMap[result.GetTable()], existingTableAdapter.columns, unixTs, s.sqlMode) + if err != nil { + return err + } + + s.adapters[result.GetTable()] = tblAdapter + return nil + case antlr.RenameTableEvent: + tblAdapter, ok := s.adapters[castedResult.GetTable()] + if !ok { + return fmt.Errorf("table not found: %q", result.GetTable()) + } + + newTableAdapter, err := NewTableAdapter(s.dbName, s.tableCfgMap[castedResult.GetNewTableName()], tblAdapter.columns, unixTs, s.sqlMode) + if err != nil { + return err + } + + // Delete the old table adapter and create a new one + delete(s.adapters, result.GetTable()) + s.adapters[castedResult.GetNewTableName()] = newTableAdapter + return nil } tblAdapter, ok := s.adapters[result.GetTable()]