Skip to content

Commit

Permalink
Add more logging to sharding key comparison parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
mtaner committed Nov 28, 2024
1 parent 08b4172 commit ca3b837
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion sharding/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,39 @@ func (f *ShardedCopyFilter) ApplicableEvent(event ghostferry.DMLEvent) (bool, er

oldShardingValue, oldExists, err := parseShardingValue(oldValues, idx)
if err != nil {

sql, err := event.AsSQLString(event.Database(), event.Table())
if err != nil {
sql = ""
}

log.WithFields(log.Fields{
"tag": "sharding",
"table": event.Table(),
"position": event.BinlogPosition(),
"sqlStatement": sql,
"event": fmt.Sprintf("%T", event),
}).WithError(err).Error("parsing old sharding key failed")

return false, fmt.Errorf("parsing old sharding key: %s", err)
}

newShardingValue, newExists, err := parseShardingValue(newValues, idx)
if err != nil {

sql, err := event.AsSQLString(event.Database(), event.Table())
if err != nil {
sql = ""
}

log.WithFields(log.Fields{
"tag": "sharding",
"table": event.Table(),
"position": event.BinlogPosition(),
"sqlStatement": sql,
"event": fmt.Sprintf("%T", event),
}).WithError(err).Error("parsing new sharding key failed")

return false, fmt.Errorf("parsing new sharding key: %s", err)
}

Expand Down Expand Up @@ -215,7 +243,7 @@ func (s *ShardedTableFilter) ApplicableDatabases(dbs []string) ([]string, error)

func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema) (applicable []*ghostferry.TableSchema, err error) {
for _, table := range tables {
if ((s.isIgnoreFilter() && s.isPresent(table)) || (s.isIncludeFilter() && !s.isPresent(table))) {
if (s.isIgnoreFilter() && s.isPresent(table)) || (s.isIncludeFilter() && !s.isPresent(table)) {
continue
}

Expand Down

0 comments on commit ca3b837

Please sign in to comment.