Skip to content

Commit

Permalink
feat: augment get schema RPC to also have table and all type args
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed May 31, 2023
1 parent bb9a521 commit af82d62
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 32 deletions.
6 changes: 4 additions & 2 deletions go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,11 @@ func (client *QueryClient) UpdateContext(ctx context.Context) {
}

func (client *QueryClient) GetSchema(tableType querypb.SchemaTableType, tableNames ...string) (map[string]string, error) {
schemaDef := map[string]string{}
schemaDef := make(map[string]string)
err := client.server.GetSchema(client.ctx, client.target, tableType, tableNames, func(schemaRes *querypb.GetSchemaResponse) error {
schemaDef = schemaRes.TableDefinition
for tableName, schemaDefinition := range schemaRes.TableDefinition {
schemaDef[tableName] = schemaDefinition
}
return nil
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ var tableACLConfig = `{
},
{
"name": "vitess",
"table_names_or_prefixes": ["vitess_a", "vitess_b", "vitess_c", "dual", "vitess_d", "vitess_temp", "vitess_e", "vitess_f", "vitess_mixed_case", "upsert_test", "vitess_strings", "vitess_fracts", "vitess_ints", "vitess_misc", "vitess_bit_default", "vitess_big", "vitess_stress", "vitess_view", "vitess_json", "vitess_bool", "vitess_autoinc_seq"],
"table_names_or_prefixes": ["vitess_a", "vitess_b", "vitess_c", "dual", "vitess_d", "vitess_temp", "vitess_temp1", "vitess_temp2", "vitess_temp3", "vitess_e", "vitess_f", "vitess_mixed_case", "upsert_test", "vitess_strings", "vitess_fracts", "vitess_ints", "vitess_misc", "vitess_bit_default", "vitess_big", "vitess_stress", "vitess_view", "vitess_json", "vitess_bool", "vitess_autoinc_seq"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
Expand Down
202 changes: 202 additions & 0 deletions go/vt/vttablet/endtoend/rpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package endtoend

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/callerid"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
)

// TestGetSchemaRPC will validate GetSchema RPC.
func TestGetSchemaRPC(t *testing.T) {
testcases := []struct {
name string
queries []string
deferQueries []string
getSchemaQueryType querypb.SchemaTableType
getSchemaTables []string
mapToExpect map[string]string
}{
{
name: "All views",
queries: []string{
"create view vitess_view1 as select id from vitess_a",
"create view vitess_view2 as select id from vitess_b",
},
deferQueries: []string{
"drop view vitess_view1",
"drop view vitess_view2",
},
mapToExpect: map[string]string{
"vitess_view1": "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view1` AS select `vitess_a`.`id` AS `id` from `vitess_a`",
"vitess_view2": "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view2` AS select `vitess_b`.`id` AS `id` from `vitess_b`",
},
getSchemaQueryType: querypb.SchemaTableType_VIEWS,
}, {
name: "Views listed",
queries: []string{
"create view vitess_view1 as select eid from vitess_a",
"create view vitess_view2 as select eid from vitess_b",
"create view vitess_view3 as select eid from vitess_c",
},
deferQueries: []string{
"drop view vitess_view1",
"drop view vitess_view2",
"drop view vitess_view3",
},
mapToExpect: map[string]string{
"vitess_view3": "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view3` AS select `vitess_c`.`eid` AS `eid` from `vitess_c`",
"vitess_view2": "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view2` AS select `vitess_b`.`eid` AS `eid` from `vitess_b`",
// These shouldn't be part of the result so we verify it is empty.
"vitess_view1": "",
"unknown_view": "",
},
getSchemaTables: []string{"vitess_view3", "vitess_view2", "unknown_view"},
getSchemaQueryType: querypb.SchemaTableType_VIEWS,
}, {
name: "All tables",
queries: []string{
"create table vitess_temp1 (id int);",
"create table vitess_temp2 (id int);",
"create table vitess_temp3 (id int);",
},
deferQueries: []string{
"drop table vitess_temp1",
"drop table vitess_temp2",
"drop table vitess_temp3",
},
mapToExpect: map[string]string{
"vitess_temp1": "CREATE TABLE `vitess_temp1` (\n `id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"vitess_temp2": "CREATE TABLE `vitess_temp2` (\n `id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"vitess_temp3": "CREATE TABLE `vitess_temp3` (\n `id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
},
getSchemaQueryType: querypb.SchemaTableType_TABLES,
}, {
name: "Tables listed",
queries: []string{
"create table vitess_temp1 (eid int);",
"create table vitess_temp2 (eid int);",
"create table vitess_temp3 (eid int);",
},
deferQueries: []string{
"drop table vitess_temp1",
"drop table vitess_temp2",
"drop table vitess_temp3",
},
mapToExpect: map[string]string{
"vitess_temp1": "CREATE TABLE `vitess_temp1` (\n `eid` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"vitess_temp3": "CREATE TABLE `vitess_temp3` (\n `eid` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
// These shouldn't be part of the result so we verify it is empty.
"vitess_temp2": "",
"unknown_table": "",
},
getSchemaQueryType: querypb.SchemaTableType_TABLES,
getSchemaTables: []string{"vitess_temp1", "vitess_temp3", "unknown_table"},
}, {
name: "All tables and views",
queries: []string{
"create table vitess_temp1 (id int);",
"create table vitess_temp2 (id int);",
"create table vitess_temp3 (id int);",
"create view vitess_view1 as select id from vitess_a",
"create view vitess_view2 as select id from vitess_b",
},
deferQueries: []string{
"drop table vitess_temp1",
"drop table vitess_temp2",
"drop table vitess_temp3",
"drop view vitess_view1",
"drop view vitess_view2",
},
mapToExpect: map[string]string{
"vitess_view1": "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view1` AS select `vitess_a`.`id` AS `id` from `vitess_a`",
"vitess_view2": "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view2` AS select `vitess_b`.`id` AS `id` from `vitess_b`",
"vitess_temp1": "CREATE TABLE `vitess_temp1` (\n `id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"vitess_temp2": "CREATE TABLE `vitess_temp2` (\n `id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"vitess_temp3": "CREATE TABLE `vitess_temp3` (\n `id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
},
getSchemaQueryType: querypb.SchemaTableType_ALL,
}, {
name: "Listed tables and views",
queries: []string{
"create table vitess_temp1 (eid int);",
"create table vitess_temp2 (eid int);",
"create table vitess_temp3 (eid int);",
"create view vitess_view1 as select eid from vitess_a",
"create view vitess_view2 as select eid from vitess_b",
"create view vitess_view3 as select eid from vitess_c",
},
deferQueries: []string{
"drop table vitess_temp1",
"drop table vitess_temp2",
"drop table vitess_temp3",
"drop view vitess_view1",
"drop view vitess_view2",
"drop view vitess_view3",
},
mapToExpect: map[string]string{
"vitess_view1": "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view1` AS select `vitess_a`.`eid` AS `eid` from `vitess_a`",
"vitess_view3": "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view3` AS select `vitess_c`.`eid` AS `eid` from `vitess_c`",
"vitess_temp1": "CREATE TABLE `vitess_temp1` (\n `eid` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"vitess_temp3": "CREATE TABLE `vitess_temp3` (\n `eid` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
// These shouldn't be part of the result so we verify it is empty.
"vitess_temp2": "",
"vitess_view2": "",
"unknown_view": "",
"unknown_table": "",
},
getSchemaQueryType: querypb.SchemaTableType_ALL,
getSchemaTables: []string{"vitess_temp1", "vitess_temp3", "unknown_table", "vitess_view3", "vitess_view1", "unknown_view"},
},
}

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
client := framework.NewClient()
client.UpdateContext(callerid.NewContext(
context.Background(),
&vtrpcpb.CallerID{},
&querypb.VTGateCallerID{Username: "dev"}))

for _, query := range testcase.queries {
_, err := client.Execute(query, nil)
require.NoError(t, err)
}
defer func() {
for _, query := range testcase.deferQueries {
_, err := client.Execute(query, nil)
require.NoError(t, err)
}
}()

timeout := 1 * time.Minute
wait := time.After(timeout)
for {
select {
case <-wait:
t.Errorf("Schema tracking hasn't caught up")
return
case <-time.After(1 * time.Second):
schemaDefs, err := client.GetSchema(testcase.getSchemaQueryType, testcase.getSchemaTables...)
require.NoError(t, err)
success := true
for tableName, expectedCreateStatement := range testcase.mapToExpect {
if schemaDefs[tableName] != expectedCreateStatement {
success = false
break
}
}
if success {
return
}
}
}
})
}
}
29 changes: 0 additions & 29 deletions go/vt/vttablet/endtoend/views_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,35 +271,6 @@ func TestViewAndTableUnique(t *testing.T) {
require.ErrorContains(t, err, "Table 'vitess_view' already exists")
}

// TestGetSchemaRPC will validate GetSchema rpc..
func TestGetSchemaRPC(t *testing.T) {
client := framework.NewClient()

client.Execute("delete from _vt.views", nil)
viewSchemaDef, err := client.GetSchema(querypb.SchemaTableType_VIEWS)
require.NoError(t, err)
require.Zero(t, len(viewSchemaDef))

client.UpdateContext(callerid.NewContext(
context.Background(),
&vtrpcpb.CallerID{},
&querypb.VTGateCallerID{Username: "dev"}))

defer client.Execute("drop view vitess_view", nil)

_, err = client.Execute("create view vitess_view as select 1 from vitess_a", nil)
require.NoError(t, err)
waitForResult(t, client, 1, 1*time.Minute)

viewSchemaDef, err = client.GetSchema(querypb.SchemaTableType_VIEWS)
require.NoError(t, err)
require.Equal(t, "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view` AS select 1 AS `1` from `vitess_a`", viewSchemaDef["vitess_view"])

viewSchemaDef, err = client.GetSchema(querypb.SchemaTableType_VIEWS, "vitess_view")
require.NoError(t, err)
require.Equal(t, "CREATE ALGORITHM=UNDEFINED DEFINER=`vt_dba`@`localhost` SQL SECURITY DEFINER VIEW `vitess_view` AS select 1 AS `1` from `vitess_a`", viewSchemaDef["vitess_view"])
}

func waitForResult(t *testing.T, client *framework.QueryClient, rowCount int, timeout time.Duration) {
t.Helper()
wait := time.After(timeout)
Expand Down
19 changes: 19 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,13 @@ func (qre *QueryExecutor) GetSchemaDefinitions(tableType querypb.SchemaTableType
switch tableType {
case querypb.SchemaTableType_VIEWS:
return qre.getViewDefinitions(tableNames, callback)
case querypb.SchemaTableType_TABLES:
return qre.getTableDefinitions(tableNames, callback)
case querypb.SchemaTableType_ALL:
if err := qre.getViewDefinitions(tableNames, callback); err != nil {
return err
}
return qre.getTableDefinitions(tableNames, callback)
}
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid table type %v", tableType)
}
Expand All @@ -1133,6 +1140,18 @@ func (qre *QueryExecutor) getViewDefinitions(viewNames []string, callback func(s
if err != nil {
return err
}
return qre.executeGetSchemaQuery(query, callback)
}

func (qre *QueryExecutor) getTableDefinitions(tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
query, err := eschema.GetFetchTableQuery(tableNames)
if err != nil {
return err
}
return qre.executeGetSchemaQuery(query, callback)
}

func (qre *QueryExecutor) executeGetSchemaQuery(query string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
conn, err := qre.getStreamConn()
if err != nil {
return err
Expand Down
29 changes: 29 additions & 0 deletions go/vt/vttablet/tabletserver/schema/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ values (database(), :table_name, :create_statement, :create_time)`
// readTableCreateTimes reads the tables create times
readTableCreateTimes = "SELECT TABLE_NAME, CREATE_TIME FROM %s.`tables`"

// fetchUpdatedTables queries fetches information about updated tables
fetchUpdatedTables = `select table_name, create_statement from %s.tables where table_schema = database() and table_name in ::tableNames`

// fetchTables queries fetches all information about tables
fetchTables = `select table_name, create_statement from %s.tables where table_schema = database()`

// detectViewChange query detects if there is any view change from previous copy.
detectViewChange = `
SELECT distinct table_name
Expand Down Expand Up @@ -401,3 +407,26 @@ func GetFetchViewQuery(viewNames []string) (string, error) {
}
return parsedQuery.GenerateQuery(bv, nil)
}

// GetFetchTableQuery gets the fetch query to run for getting the listed tables. If no tables are provided, then all the tables are fetched.
func GetFetchTableQuery(tableNames []string) (string, error) {
if len(tableNames) == 0 {
parsedQuery, err := generateFullQuery(fetchTables)
if err != nil {
return "", err
}
return parsedQuery.Query, nil
}

tablesBV, err := sqltypes.BuildBindVariable(tableNames)
if err != nil {
return "", err
}
bv := map[string]*querypb.BindVariable{"tableNames": tablesBV}

parsedQuery, err := generateFullQuery(fetchUpdatedTables)
if err != nil {
return "", err
}
return parsedQuery.GenerateQuery(bv, nil)
}
27 changes: 27 additions & 0 deletions go/vt/vttablet/tabletserver/schema/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,3 +921,30 @@ func TestGetFetchViewQuery(t *testing.T) {
})
}
}

// TestGetFetchTableQuery tests the functionality for getting the fetch query to retrieve tables.
func TestGetFetchTableQuery(t *testing.T) {
testcases := []struct {
name string
tableNames []string
expectedQuery string
}{
{
name: "No tables provided",
tableNames: []string{},
expectedQuery: "select table_name, create_statement from _vt.`tables` where table_schema = database()",
}, {
name: "Few tables provided",
tableNames: []string{"v1", "v2", "lead"},
expectedQuery: "select table_name, create_statement from _vt.`tables` where table_schema = database() and table_name in ('v1', 'v2', 'lead')",
},
}

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
query, err := GetFetchTableQuery(testcase.tableNames)
require.NoError(t, err)
require.Equal(t, testcase.expectedQuery, query)
})
}
}

0 comments on commit af82d62

Please sign in to comment.