From acedfdef02cd0e6e17979fe121115ab2858c5f1c Mon Sep 17 00:00:00 2001 From: Noble Mittal Date: Mon, 16 Dec 2024 12:48:56 +0530 Subject: [PATCH] refac: use lookup instead of workflowFetcher for lookup vindex actions Signed-off-by: Noble Mittal --- go/vt/vtctl/workflow/lookup.go | 40 +++++++++++++++-------- go/vt/vtctl/workflow/materializer_test.go | 30 ++++++++--------- go/vt/vtctl/workflow/server.go | 4 +-- 3 files changed, 43 insertions(+), 31 deletions(-) diff --git a/go/vt/vtctl/workflow/lookup.go b/go/vt/vtctl/workflow/lookup.go index bd4527328b1..a8b8343428b 100644 --- a/go/vt/vtctl/workflow/lookup.go +++ b/go/vt/vtctl/workflow/lookup.go @@ -26,11 +26,14 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/sqlescape" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tmclient" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -38,9 +41,18 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) +// lookup is responsible for performing actions related to lookup vindexes. +type lookup struct { + ts *topo.Server + tmc tmclient.TabletManagerClient + + logger logutil.Logger + parser *sqlparser.Parser +} + // prepareCreateLookup performs the preparatory steps for creating a // Lookup Vindex. -func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( +func (l *lookup) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { var ( // sourceVSchemaTable is the table info present in the vschema. @@ -54,7 +66,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke ) // Validate input vindex. - vindex, vInfo, err := wf.validateAndGetVindex(specs) + vindex, vInfo, err := l.validateAndGetVindex(specs) if err != nil { return nil, nil, nil, nil, err } @@ -69,7 +81,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke return nil, nil, nil, nil, err } - sourceVSchema, targetVSchema, err = wf.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace) + sourceVSchema, targetVSchema, err = l.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace) if err != nil { return nil, nil, nil, nil, err } @@ -91,7 +103,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke } // Validate against source schema. - sourceShards, err := wf.ts.GetServingShards(ctx, keyspace) + sourceShards, err := l.ts.GetServingShards(ctx, keyspace) if err != nil { return nil, nil, nil, nil, err } @@ -102,7 +114,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke } req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{vInfo.sourceTableName}} - tableSchema, err := schematools.GetSchema(ctx, wf.ts, wf.tmc, onesource.PrimaryAlias, req) + tableSchema, err := schematools.GetSchema(ctx, l.ts, l.tmc, onesource.PrimaryAlias, req) if err != nil { return nil, nil, nil, nil, err } @@ -113,7 +125,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke } // Generate "create table" statement. - createDDL, err = wf.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex) + createDDL, err = l.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex) if err != nil { return nil, nil, nil, nil, err } @@ -191,7 +203,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke if targetChanged { cancelFunc = func() error { // Restore the original target vschema. - return wf.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema) + return l.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema) } } @@ -230,7 +242,7 @@ type vindexInfo struct { } // validateAndGetVindex validates and extracts vindex configuration -func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) { +func (l *lookup) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) { if specs == nil { return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") } @@ -245,7 +257,7 @@ func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vsc return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) } - targetKeyspace, targetTableName, err := wf.parser.ParseTable(vindex.Params["table"]) + targetKeyspace, targetTableName, err := l.parser.ParseTable(vindex.Params["table"]) if err != nil || targetKeyspace == "" { return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .", vindex.Params["table"]) @@ -306,8 +318,8 @@ func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vsc }, nil } -func (wf *workflowFetcher) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) { - sourceVSchema, err = wf.ts.GetVSchema(ctx, sourceKeyspace) +func (l *lookup) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) { + sourceVSchema, err = l.ts.GetVSchema(ctx, sourceKeyspace) if err != nil { return nil, nil, err } @@ -319,7 +331,7 @@ func (wf *workflowFetcher) getTargetAndSourceVSchema(ctx context.Context, source if sourceKeyspace == targetKeyspace { targetVSchema = sourceVSchema } else { - targetVSchema, err = wf.ts.GetVSchema(ctx, targetKeyspace) + targetVSchema, err = l.ts.GetVSchema(ctx, targetKeyspace) if err != nil { return nil, nil, err } @@ -367,7 +379,7 @@ func getSourceTable(specs *vschemapb.Keyspace, targetTableName string, fromCols return sourceTable, sourceTableName, nil } -func (wf *workflowFetcher) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) { +func (l *lookup) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) { lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n") if len(lines) < 3 { // Should never happen. @@ -405,7 +417,7 @@ func (wf *workflowFetcher) generateCreateDDLStatement(tableSchema *tabletmanager createDDL := strings.Join(modified, "\n") // Confirm that our DDL is valid before we create anything. - if _, err := wf.parser.ParseStrictDDL(createDDL); err != nil { + if _, err := l.parser.ParseStrictDDL(createDDL); err != nil { return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", err, createDDL) } diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index f8e32d2ec39..7c71d79bb7b 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1515,13 +1515,13 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { setStartingVschema() }() } - w := &workflowFetcher{ + l := &lookup{ ts: env.ws.ts, tmc: env.ws.tmc, logger: env.ws.Logger(), parser: env.ws.SQLParser(), } - outms, _, _, cancelFunc, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) + outms, _, _, cancelFunc, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) if tcase.err != "" { require.Error(t, err) require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err) @@ -1769,13 +1769,13 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) { t.Fatal(err) } - w := &workflowFetcher{ + l := &lookup{ ts: env.ws.ts, tmc: env.ws.tmc, logger: env.ws.Logger(), parser: env.ws.SQLParser(), } - _, got, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, got, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, tcase.out) { t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out) @@ -2011,13 +2011,13 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { t.Fatal(err) } - w := &workflowFetcher{ + l := &lookup{ ts: env.ws.ts, tmc: env.ws.tmc, logger: env.ws.Logger(), parser: env.ws.SQLParser(), } - _, _, got, cancelFunc, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, _, got, cancelFunc, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) @@ -2139,13 +2139,13 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) { t.Fatal(err) } - w := &workflowFetcher{ + l := &lookup{ ts: env.ws.ts, tmc: env.ws.tmc, logger: env.ws.Logger(), parser: env.ws.SQLParser(), } - _, got, _, _, err := w.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) + _, got, _, _, err := l.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("same keyspace: got:\n%v, want\n%v", got, want) @@ -2271,13 +2271,13 @@ func TestCreateCustomizedVindex(t *testing.T) { t.Fatal(err) } - w := &workflowFetcher{ + l := &lookup{ ts: env.ws.ts, tmc: env.ws.tmc, logger: env.ws.Logger(), parser: env.ws.SQLParser(), } - _, got, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + _, got, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("customize create lookup error same: got:\n%v, want\n%v", got, want) @@ -2395,13 +2395,13 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { t.Fatal(err) } - w := &workflowFetcher{ + l := &lookup{ ts: env.ws.ts, tmc: env.ws.tmc, logger: env.ws.Logger(), parser: env.ws.SQLParser(), } - ms, ks, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + ms, ks, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(wantKs, ks) { t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) @@ -2481,17 +2481,17 @@ func TestStopAfterCopyFlag(t *testing.T) { t.Fatal(err) } - w := &workflowFetcher{ + l := &lookup{ ts: env.ws.ts, tmc: env.ws.tmc, logger: env.ws.Logger(), parser: env.ws.SQLParser(), } - ms1, _, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + ms1, _, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) require.Equal(t, ms1.StopAfterCopy, true) - ms2, _, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) + ms2, _, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) require.NoError(t, err) require.Equal(t, ms2.StopAfterCopy, false) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 6f9c413e1fb..afabea11b7b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -567,13 +567,13 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup span.Annotate("cells", req.Cells) span.Annotate("tablet_types", req.TabletTypes) - w := &workflowFetcher{ + l := &lookup{ ts: s.ts, tmc: s.tmc, logger: s.Logger(), parser: s.SQLParser(), } - ms, sourceVSchema, targetVSchema, cancelFunc, err := w.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) + ms, sourceVSchema, targetVSchema, cancelFunc, err := l.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) if err != nil { return nil, err }