From ca3b8373476cae130244b0f7d699a42456477db1 Mon Sep 17 00:00:00 2001 From: Merve Taner Date: Thu, 28 Nov 2024 12:29:15 +0000 Subject: [PATCH] Add more logging to sharding key comparison parsing --- sharding/filter.go | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/sharding/filter.go b/sharding/filter.go index e94a7622..44889839 100644 --- a/sharding/filter.go +++ b/sharding/filter.go @@ -162,11 +162,39 @@ func (f *ShardedCopyFilter) ApplicableEvent(event ghostferry.DMLEvent) (bool, er oldShardingValue, oldExists, err := parseShardingValue(oldValues, idx) if err != nil { + + sql, err := event.AsSQLString(event.Database(), event.Table()) + if err != nil { + sql = "" + } + + log.WithFields(log.Fields{ + "tag": "sharding", + "table": event.Table(), + "position": event.BinlogPosition(), + "sqlStatement": sql, + "event": fmt.Sprintf("%T", event), + }).WithError(err).Error("parsing old sharding key failed") + return false, fmt.Errorf("parsing old sharding key: %s", err) } newShardingValue, newExists, err := parseShardingValue(newValues, idx) if err != nil { + + sql, err := event.AsSQLString(event.Database(), event.Table()) + if err != nil { + sql = "" + } + + log.WithFields(log.Fields{ + "tag": "sharding", + "table": event.Table(), + "position": event.BinlogPosition(), + "sqlStatement": sql, + "event": fmt.Sprintf("%T", event), + }).WithError(err).Error("parsing new sharding key failed") + return false, fmt.Errorf("parsing new sharding key: %s", err) } @@ -215,7 +243,7 @@ func (s *ShardedTableFilter) ApplicableDatabases(dbs []string) ([]string, error) func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema) (applicable []*ghostferry.TableSchema, err error) { for _, table := range tables { - if ((s.isIgnoreFilter() && s.isPresent(table)) || (s.isIncludeFilter() && !s.isPresent(table))) { + if (s.isIgnoreFilter() && s.isPresent(table)) || (s.isIncludeFilter() && !s.isPresent(table)) { continue }