diff --git a/binlog_streamer.go b/binlog_streamer.go index c6d1564b..2e1f83c9 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -188,7 +188,6 @@ func (s *BinlogStreamer) Run() { isEventPositionResumable := false isEventPositionValid := true - skipEvent := false switch e := ev.Event.(type) { case *replication.RotateEvent: @@ -268,31 +267,14 @@ func (s *BinlogStreamer) Run() { // or the end of the current/next transaction. As such, the query will be // reset following the next RowsQueryEvent before the corresponding RowsEvent(s) query = nil - case *replication.TableMapEvent: - skipEvent = true//we can skip this event case *replication.GenericEvent: - if ev.Header.Flags == replication.LOG_EVENT_IGNORABLE_F { - skipEvent = true//we can skip this event + if ev.RawData[4] == 164 && ev.Header.Flags == replication.LOG_EVENT_IGNORABLE_F { + // This event is published in binlog when encrypt_binlog is enabled for MariaDB + // event 164 shouldn't be published to replica but actually is, as it is not relevant + // since we receive binlogs unencrypted we can skip this event + // https://mariadb.com/kb/en/start_encryption_event/ + isEventPositionValid = false } - case *replication.QueryEvent: - // This event is published in binlog when not using ROW binlog_format - // or when MariaDB sees that replica can't handle annotations - // these always contain "BEGIN" or "# Dummy event replacing event type %u that slave cannot handle." - // this means that we can check for # as this is a comment in a query - queryEventQuery := ev.Event.(*replication.QueryEvent).Query - queryString := string(queryEventQuery) - if queryString == "BEGIN" { - skipEvent = true//we can skip this event - } else if queryEventQuery[0] == 35 {//35 is # - skipEvent = true//we can skip this event - } else { - err := fmt.Errorf("failed to handle query event query: %s", queryString) - s.logger.WithError(err).Error("failed to handle query event") - s.ErrorHandler.Fatal("binlog_streamer", err) - } - } - if skipEvent { - continue } if isEventPositionValid {