Skip to content

Commit

Permalink
[MySQL] Extending our TableAdapters to understand more DDL Events (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 19, 2024
1 parent 3ba635e commit 37ae2a7
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion sources/mysql/streaming/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *SchemaAdapter) ApplyDDL(unixTs int64, query string) error {
}

func (s *SchemaAdapter) applyDDL(unixTs int64, result antlr.Event) error {
switch result.(type) {
switch castedResult := result.(type) {
case antlr.DropTableEvent:
delete(s.adapters, result.GetTable())
return nil
Expand All @@ -76,6 +76,34 @@ func (s *SchemaAdapter) applyDDL(unixTs int64, result antlr.Event) error {

s.adapters[result.GetTable()] = tblAdapter
return nil
case antlr.CopyTableEvent:
existingTableAdapter, ok := s.adapters[castedResult.GetCopyFromTableName()]
if !ok {
return fmt.Errorf("table not found: %q", castedResult.GetTable())
}

tblAdapter, err := NewTableAdapter(s.dbName, s.tableCfgMap[result.GetTable()], existingTableAdapter.columns, unixTs, s.sqlMode)
if err != nil {
return err
}

s.adapters[result.GetTable()] = tblAdapter
return nil
case antlr.RenameTableEvent:
tblAdapter, ok := s.adapters[castedResult.GetTable()]
if !ok {
return fmt.Errorf("table not found: %q", result.GetTable())
}

newTableAdapter, err := NewTableAdapter(s.dbName, s.tableCfgMap[castedResult.GetNewTableName()], tblAdapter.columns, unixTs, s.sqlMode)
if err != nil {
return err
}

// Delete the old table adapter and create a new one
delete(s.adapters, result.GetTable())
s.adapters[castedResult.GetNewTableName()] = newTableAdapter
return nil
}

tblAdapter, ok := s.adapters[result.GetTable()]
Expand Down

0 comments on commit 37ae2a7

Please sign in to comment.