diff --git a/batch_writer.go b/batch_writer.go index 9ae9d13d..5ed22cd8 100644 --- a/batch_writer.go +++ b/batch_writer.go @@ -19,7 +19,7 @@ func init() { } type BatchWriterVerificationFailed struct { - mismatchedPaginationKeys []uint64 + mismatchedPaginationKeys []InlineVerifierMismatches table string } diff --git a/copydb/test/copydb_test.go b/copydb/test/copydb_test.go index 7fbf86a6..cdae6e2d 100644 --- a/copydb/test/copydb_test.go +++ b/copydb/test/copydb_test.go @@ -2,6 +2,7 @@ package test import ( "fmt" + "os" "testing" "github.com/Shopify/ghostferry" @@ -138,8 +139,14 @@ func (t *CopydbTestSuite) TestCreateDatabaseCopiesTheRightCollation() { row := t.ferry.TargetDB.QueryRow(query) err = row.Scan(&characterSet, &collation) t.Require().Nil(err) - t.Require().Equal(characterSet, "utf8") - t.Require().Equal(collation, "utf8_general_ci") + + if os.Getenv("MYSQL_VERSION") == "8.0" { + t.Require().Equal(characterSet, "utf8mb3") + t.Require().Equal(collation, "utf8mb3_general_ci") + } else { + t.Require().Equal(characterSet, "utf8") + t.Require().Equal(collation, "utf8_general_ci") + } query = "SELECT table_collation FROM information_schema.tables WHERE table_schema = \"%s\" AND table_name = \"%s\"" query = fmt.Sprintf(query, renamedSchemaName, renamedTableName) diff --git a/inline_verifier.go b/inline_verifier.go index d221479a..80d56bab 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -232,6 +232,12 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore { return s.store.Copy() } +type InlineVerifierMismatches struct { + Pk uint64 + SourceChecksum string + TargetChecksum string +} + type InlineVerifier struct { SourceDB *sql.DB TargetDB *sql.DB @@ -304,7 +310,7 @@ func (v *InlineVerifier) Result() (VerificationResultAndStatus, error) { return v.backgroundVerificationResultAndStatus, v.backgroundVerificationErr } -func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]uint64, error) { +func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]InlineVerifierMismatches, error) { table := sourceBatch.TableSchema() paginationKeys := make([]uint64, len(sourceBatch.Values())) @@ -353,8 +359,8 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target mismatches := v.compareHashesAndData(sourceFingerprints, targetFingerprints, sourceDecompressedData, targetDecompressedData) if !enforceInlineVerification { - for _, mismatchedPk := range mismatches { - v.reverifyStore.Add(table, mismatchedPk) + for _, mismatch := range mismatches { + v.reverifyStore.Add(table, mismatch.Pk) } if len(mismatches) > 0 { @@ -446,15 +452,19 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) { messageBuf.WriteString("cutover verification failed for: ") incorrectTables := make([]string, 0) for schemaName, _ := range mismatches { - for tableName, paginationKeys := range mismatches[schemaName] { + for tableName, mismatches := range mismatches[schemaName] { tableName = fmt.Sprintf("%s.%s", schemaName, tableName) incorrectTables = append(incorrectTables, tableName) messageBuf.WriteString(tableName) messageBuf.WriteString(" [paginationKeys: ") - for _, paginationKey := range paginationKeys { - messageBuf.WriteString(strconv.FormatUint(paginationKey, 10)) - messageBuf.WriteString(" ") + for _, mismatch := range mismatches { + messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10)) + messageBuf.WriteString(" (source: ") + messageBuf.WriteString(mismatch.SourceChecksum) + messageBuf.WriteString(", target: ") + messageBuf.WriteString(mismatch.TargetChecksum) + messageBuf.WriteString(") ") } messageBuf.WriteString("] ") } @@ -555,20 +565,28 @@ func (v *InlineVerifier) decompressData(table *TableSchema, column string, compr } } -func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uint64]struct{} { - mismatchSet := map[uint64]struct{}{} +func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uint64]InlineVerifierMismatches { + mismatchSet := map[uint64]InlineVerifierMismatches{} for paginationKey, targetHash := range target { sourceHash, exists := source[paginationKey] if !bytes.Equal(sourceHash, targetHash) || !exists { - mismatchSet[paginationKey] = struct{}{} + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + SourceChecksum: string(sourceHash), + TargetChecksum: string(targetHash), + } } } for paginationKey, sourceHash := range source { targetHash, exists := target[paginationKey] if !bytes.Equal(sourceHash, targetHash) || !exists { - mismatchSet[paginationKey] = struct{}{} + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + SourceChecksum: string(sourceHash), + TargetChecksum: string(targetHash), + } } } @@ -613,17 +631,21 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s return mismatchSet } -func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []uint64 { +func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []InlineVerifierMismatches { mismatches := v.compareHashes(sourceHashes, targetHashes) compressedMismatch := v.compareDecompressedData(sourceData, targetData) for paginationKey, _ := range compressedMismatch { - mismatches[paginationKey] = struct{}{} + mismatches[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + SourceChecksum: "compressed-data-mismatch", // TODO: compute the hash of the compressed data and put it here + TargetChecksum: "compressed-data-mismatch", + } } - mismatchList := make([]uint64, 0, len(mismatches)) + mismatchList := make([]InlineVerifierMismatches, 0, len(mismatches)) - for paginationKey, _ := range mismatches { - mismatchList = append(mismatchList, paginationKey) + for _, mismatch := range mismatches { + mismatchList = append(mismatchList, mismatch) } return mismatchList @@ -650,21 +672,21 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error { return nil } -func (v *InlineVerifier) readdMismatchedPaginationKeysToBeVerifiedAgain(mismatches map[string]map[string][]uint64) { +func (v *InlineVerifier) readdMismatchedPaginationKeysToBeVerifiedAgain(mismatches map[string]map[string][]InlineVerifierMismatches) { for schemaName, _ := range mismatches { - for tableName, paginationKeys := range mismatches[schemaName] { + for tableName, mismatches := range mismatches[schemaName] { table := v.TableSchemaCache.Get(schemaName, tableName) - for _, paginationKey := range paginationKeys { - v.reverifyStore.Add(table, paginationKey) + for _, mismatch := range mismatches { + v.reverifyStore.Add(table, mismatch.Pk) } } } } // Returns mismatches in the form of db -> table -> paginationKeys -func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][]uint64, error) { +func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][]InlineVerifierMismatches, error) { mismatchFound := false - mismatches := make(map[string]map[string][]uint64) + mismatches := make(map[string]map[string][]InlineVerifierMismatches) allBatches := v.reverifyStore.Batches(v.BatchSize) if len(allBatches) == 0 { @@ -684,11 +706,11 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][ mismatchFound = true if _, exists := mismatches[batch.SchemaName]; !exists { - mismatches[batch.SchemaName] = make(map[string][]uint64) + mismatches[batch.SchemaName] = make(map[string][]InlineVerifierMismatches) } if _, exists := mismatches[batch.SchemaName][batch.TableName]; !exists { - mismatches[batch.SchemaName][batch.TableName] = make([]uint64, 0) + mismatches[batch.SchemaName][batch.TableName] = make([]InlineVerifierMismatches, 0) } mismatches[batch.SchemaName][batch.TableName] = append(mismatches[batch.SchemaName][batch.TableName], batchMismatches...) @@ -702,7 +724,7 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][ // Since the mismatches gets re-added to the reverify store, this must return // a union of mismatches of fingerprints and mismatches due to decompressed // data. -func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, error) { +func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]InlineVerifierMismatches, error) { targetSchema := batch.SchemaName if targetSchemaName, exists := v.DatabaseRewrites[targetSchema]; exists { targetSchema = targetSchemaName @@ -715,7 +737,7 @@ func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, e sourceTableSchema := v.TableSchemaCache.Get(batch.SchemaName, batch.TableName) if sourceTableSchema == nil { - return []uint64{}, fmt.Errorf("programming error? %s.%s is not found in TableSchemaCache but is being reverified", batch.SchemaName, batch.TableName) + return []InlineVerifierMismatches{}, fmt.Errorf("programming error? %s.%s is not found in TableSchemaCache but is being reverified", batch.SchemaName, batch.TableName) } wg := &sync.WaitGroup{} diff --git a/sharding/test/primary_key_table_test.go b/sharding/test/primary_key_table_test.go index 21a01a5b..c6228f43 100644 --- a/sharding/test/primary_key_table_test.go +++ b/sharding/test/primary_key_table_test.go @@ -3,6 +3,7 @@ package test import ( "fmt" "net/http" + "strings" "testing" "time" @@ -90,7 +91,9 @@ func (t *PrimaryKeyTableTestSuite) TestPrimaryKeyTableVerificationFailure() { t.Ferry.Run() t.Require().NotNil(errHandler.LastError) - t.Require().Equal("row fingerprints for paginationKeys [2] on gftest1.tenants_table do not match", errHandler.LastError.Error()) + + t.Require().True(strings.HasPrefix(errHandler.LastError.Error(), "row fingerprints for paginationKeys [{2")) + t.Require().True(strings.HasSuffix(errHandler.LastError.Error(), "on gftest1.tenants_table do not match")) } func TestPrimaryKeyTableTestSuite(t *testing.T) { diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index a12ed0e1..26c3a34d 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -40,7 +40,7 @@ def test_corrupted_insert_is_detected_inline_with_batch_writer assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id} ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id}") end def test_different_compressed_data_is_detected_inline_with_batch_writer @@ -68,7 +68,7 @@ def test_different_compressed_data_is_detected_inline_with_batch_writer assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") end def test_same_decompressed_data_different_compressed_test_passes_inline_verification @@ -163,7 +163,7 @@ def test_catches_binlog_streamer_corruption ghostferry.run assert verification_ran - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id} ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id}") end def test_target_corruption_is_ignored_if_skip_target_verification @@ -399,7 +399,7 @@ def test_catches_binlog_streamer_corruption_with_composite_pk ghostferry.run assert verification_ran assert incorrect_tables_found, "verification did not catch corrupted table" - assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: #{corrupting_id} ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: #{corrupting_id}") end def test_positive_negative_zero @@ -430,7 +430,7 @@ def test_positive_negative_zero assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1") # Now we run the real test case. target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1") @@ -484,7 +484,7 @@ def test_null_vs_empty_string assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") end def test_null_vs_null_string @@ -507,7 +507,7 @@ def test_null_vs_null_string assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") end def test_null_in_different_order @@ -533,7 +533,7 @@ def test_null_in_different_order assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") end ################### @@ -605,7 +605,7 @@ def run_collation_test(data, source_charset, target_charset, identical:) assert verify_during_cutover_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") end end diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 08cf3eeb..4af99294 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -262,7 +262,7 @@ def test_interrupt_resume_inline_verifier_will_verify_entries_in_reverify_store assert_equal "gftest.test_table_1", incorrect_tables.first error_line = ghostferry.error_lines.last - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id} ] ", error_line["msg"] + assert error_line["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id}") end def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on_source_during_interrupt @@ -306,7 +306,7 @@ def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on assert_equal "gftest.test_table_1", incorrect_tables.first error_line = ghostferry.error_lines.last - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id} ] ", error_line["msg"] + assert error_line["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id}") end # originally taken from @kolbitsch-lastline in https://github.com/Shopify/ghostferry/pull/160 @@ -671,6 +671,6 @@ def test_issue_149_corrupted assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{id_to_change} ] ", ghostferry.error_lines.last["msg"] + assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{id_to_change}") end end