diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index fa270ba30a..0231cae7ba 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -236,16 +236,17 @@ func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*int case <-timeout: return fmt.Errorf("schema tracking didn't mark table t2 as authoritative until timeout") default: - time.Sleep(1 * time.Second) res, err := readVSchema() require.NoError(t, err, res) t2Map := getTableT2Map(res, ks, tbl) authoritative, fieldPresent := t2Map["column_list_authoritative"] if !fieldPresent { + time.Sleep(100 * time.Millisecond) continue } authoritativeBool, isBool := authoritative.(bool) if !isBool || !authoritativeBool { + time.Sleep(100 * time.Millisecond) continue } return nil @@ -262,24 +263,46 @@ func WaitForKsError(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string t.Fatalf("schema tracking did not find error in '%s'", ks) return "" default: - time.Sleep(1 * time.Second) res, err := vtgateProcess.ReadVSchema() require.NoError(t, err, res) kss := convertToMap(*res)["keyspaces"] ksMap := convertToMap(convertToMap(kss)[ks]) ksErr, fieldPresent := ksMap["error"] if !fieldPresent { - break + time.Sleep(100 * time.Millisecond) + continue } errString, isErr := ksErr.(string) if !isErr { - break + time.Sleep(100 * time.Millisecond) + continue } return errString } } } +// WaitForTableDeletions waits for a table to be deleted +func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) error { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("schema tracking still found the table '%s'", tbl) + default: + res, err := vtgateProcess.ReadVSchema() + require.NoError(t, err, res) + keyspacesMap := convertToMap(*res)["keyspaces"] + ksMap := convertToMap(keyspacesMap)[ks] + tablesMap := convertToMap(ksMap)["tables"] + _, isPresent := convertToMap(tablesMap)[tbl] + if !isPresent { + return nil + } + time.Sleep(100 * time.Millisecond) + } + } +} + // WaitForColumn waits for a table's column to be present func WaitForColumn(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error { timeout := time.After(60 * time.Second) @@ -288,25 +311,28 @@ func WaitForColumn(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl, c case <-timeout: return fmt.Errorf("schema tracking did not find column '%s' in table '%s'", col, tbl) default: - time.Sleep(1 * time.Second) res, err := vtgateProcess.ReadVSchema() require.NoError(t, err, res) t2Map := getTableT2Map(res, ks, tbl) authoritative, fieldPresent := t2Map["column_list_authoritative"] if !fieldPresent { - break + time.Sleep(100 * time.Millisecond) + continue } authoritativeBool, isBool := authoritative.(bool) if !isBool || !authoritativeBool { - break + time.Sleep(100 * time.Millisecond) + continue } colMap, exists := t2Map["columns"] if !exists { - break + time.Sleep(100 * time.Millisecond) + continue } colList, isSlice := colMap.([]interface{}) if !isSlice { - break + time.Sleep(100 * time.Millisecond) + continue } for _, c := range colList { colDef, isMap := c.(map[string]interface{}) @@ -317,6 +343,7 @@ func WaitForColumn(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl, c return nil } } + time.Sleep(100 * time.Millisecond) } } } @@ -330,7 +357,10 @@ func getTableT2Map(res *interface{}, ks, tbl string) map[string]interface{} { } func convertToMap(input interface{}) map[string]interface{} { - output := input.(map[string]interface{}) + output, ok := input.(map[string]interface{}) + if !ok { + return make(map[string]interface{}) + } return output } diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index e9f0602d23..91e64e67d1 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -704,6 +704,14 @@ func createInitialSchema(t *testing.T, tcase *testCase) { require.NoError(t, err) } }) + t.Run("waiting for vschema deletions to apply", func(t *testing.T) { + timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + for _, tableName := range tableNames { + err := utils.WaitForTableDeletions(timeoutCtx, t, clusterInstance.VtgateProcess, keyspaceName, tableName) + require.NoError(t, err) + } + }) t.Run("creating tables", func(t *testing.T) { // Create the stress tables var b strings.Builder