diff --git a/canal/event.go b/canal/event.go index e75d9ffe..458b4981 100644 --- a/canal/event.go +++ b/canal/event.go @@ -109,6 +109,7 @@ func (e *eventHandler) onMessage(key, value []byte) error { var p Payload if err := json.Unmarshal(value, &p); err != nil { + e.log.Warn("failed to parse kafka value", zap.String("table", p.Source.Table), zap.Error(err)) return nil } diff --git a/internal/search/client.go b/internal/search/client.go index 0cc8f1de..dc79300c 100644 --- a/internal/search/client.go +++ b/internal/search/client.go @@ -161,7 +161,7 @@ func (c *client) OnSubjectUpdate(ctx context.Context, id model.SubjectID) error } if s.Redirect != 0 || s.Ban != 0 { - return c.DeleteSubject(ctx, id) + return c.OnSubjectDelete(ctx, id) } extracted := extractSubject(&s) @@ -172,8 +172,8 @@ func (c *client) OnSubjectUpdate(ctx context.Context, id model.SubjectID) error } // OnSubjectDelete is the hook called by canal. -func (c *client) OnSubjectDelete(_ context.Context, id model.SubjectID) error { - _, err := c.subjectIndex.DeleteDocument(strconv.FormatUint(uint64(id), 10)) +func (c *client) OnSubjectDelete(ctx context.Context, id model.SubjectID) error { + _, err := c.subjectIndex.DeleteDocumentWithContext(ctx, strconv.FormatUint(uint64(id), 10)) return errgo.Wrap(err, "search") } @@ -204,12 +204,6 @@ func (c *client) sendBatch(items []subjectIndex) { } } -func (c *client) DeleteSubject(_ context.Context, id model.SubjectID) error { - _, err := c.subjectIndex.Delete(strconv.FormatUint(uint64(id), 10)) - - return errgo.Wrap(err, "delete") -} - func (c *client) needFirstRun() (bool, error) { if os.Getenv("CHII_SEARCH_INIT") == "true" { return true, nil