Skip to content

Commit

Permalink
add udfs to vschema on update (#15771)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Apr 22, 2024
1 parent 14b36d0 commit c1bddd7
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
7 changes: 3 additions & 4 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,22 @@ func (t *Tracker) loadUDFs(conn queryservice.QueryService, target *querypb.Targe
t.mu.Lock()
defer t.mu.Unlock()

var udfs []string
err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_UDFS, nil, func(schemaRes *querypb.GetSchemaResponse) error {
var udfs []string
for _, udf := range schemaRes.Udfs {
if !udf.Aggregating {
continue
}
udfs = append(udfs, udf.Name)
}

t.udfs[target.Keyspace] = udfs
return nil
})
if err != nil {
log.Errorf("error fetching new UDFs for %v: %w", target.Keyspace, err)
return err
}
log.Infof("finished loading UDFs for keyspace %s", target.Keyspace)
t.udfs[target.Keyspace] = udfs
log.Infof("finished loading %d UDFs for keyspace %s", len(udfs), target.Keyspace)
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/vschema_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (vm *VSchemaManager) updateFromSchema(vschema *vindexes.VSchema) {
for ksName, ks := range vschema.Keyspaces {
vm.updateTableInfo(vschema, ks, ksName)
vm.updateViewInfo(ks, ksName)
vm.updateUDFsInfo(ks, ksName)
}
}

Expand Down Expand Up @@ -267,6 +268,11 @@ func (vm *VSchemaManager) updateTableInfo(vschema *vindexes.VSchema, ks *vindexe
}
}

// updateUDFsInfo updates the aggregate UDFs in the Vschema.
func (vm *VSchemaManager) updateUDFsInfo(ks *vindexes.KeyspaceSchema, ksName string) {
ks.AggregateUDFs = vm.schema.UDFs(ksName)
}

func markErrorIfCyclesInFk(vschema *vindexes.VSchema) {
for ksName, ks := range vschema.Keyspaces {
// Only check cyclic foreign keys for keyspaces that have
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtgate/vschema_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,38 @@ func TestRebuildVSchema(t *testing.T) {
}
}

// TestVSchemaUDFsUpdate tests that the UDFs are updated in the VSchema.
func TestVSchemaUDFsUpdate(t *testing.T) {
ks := &vindexes.Keyspace{Name: "ks", Sharded: true}

vm := &VSchemaManager{}
var vs *vindexes.VSchema
vm.subscriber = func(vschema *vindexes.VSchema, _ *VSchemaStats) {
vs = vschema
vs.ResetCreated()
}
vm.schema = &fakeSchema{udfs: []string{"udf1", "udf2"}}
vm.VSchemaUpdate(&vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
"ks": {Sharded: true},
},
}, nil)

utils.MustMatchFn(".globalTables", ".uniqueVindexes")(t, &vindexes.VSchema{
RoutingRules: map[string]*vindexes.RoutingRule{},
Keyspaces: map[string]*vindexes.KeyspaceSchema{
"ks": {
Keyspace: ks,
ForeignKeyMode: vschemapb.Keyspace_unmanaged,
Tables: map[string]*vindexes.Table{},
Vindexes: map[string]vindexes.Vindex{},
AggregateUDFs: []string{"udf1", "udf2"},
},
},
}, vs)
utils.MustMatch(t, vs, vm.currentVschema, "currentVschema does not match Vschema")
}

func TestMarkErrorIfCyclesInFk(t *testing.T) {
ksName := "ks"
keyspace := &vindexes.Keyspace{
Expand Down

0 comments on commit c1bddd7

Please sign in to comment.