Skip to content

Commit

Permalink
final touches on the trace engine
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Sep 15, 2024
1 parent 8a61b2f commit 60f2212
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 65 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries,
func (p *Plan) MarshalJSON() ([]byte, error) {
var instructions *PrimitiveDescription
if p.Instructions != nil {
description := PrimitiveToPlanDescription(p.Instructions)
description := PrimitiveToPlanDescription(p.Instructions, nil)
instructions = &description
}

Expand Down
40 changes: 33 additions & 7 deletions go/vt/vtgate/engine/plan_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type PrimitiveDescription struct {
TargetTabletType topodatapb.TabletType
Other map[string]any

ID PrimitiveID
InputName string
Inputs []PrimitiveDescription

Expand Down Expand Up @@ -97,7 +96,11 @@ func (pd PrimitiveDescription) MarshalJSON() ([]byte, error) {
if err := marshalAdd(prepend, buf, "NoOfCalls", len(pd.Stats)); err != nil {
return nil, err
}
if err := marshalAdd(prepend, buf, "Rows", pd.Stats); err != nil {

if err := marshalAdd(prepend, buf, "AvgRowSize", average(pd.Stats)); err != nil {
return nil, err
}
if err := marshalAdd(prepend, buf, "MedianRowSize", median(pd.Stats)); err != nil {
return nil, err
}
}
Expand All @@ -117,6 +120,28 @@ func (pd PrimitiveDescription) MarshalJSON() ([]byte, error) {
return buf.Bytes(), nil
}

func average(nums []int) float64 {
total := 0
for _, num := range nums {
total += num
}
return float64(total) / float64(len(nums))
}

func median(nums []int) float64 {
sortedNums := make([]int, len(nums))
copy(sortedNums, nums)
sort.Ints(sortedNums)

n := len(sortedNums)
if n%2 == 0 {
mid1 := sortedNums[n/2-1]
mid2 := sortedNums[n/2]
return float64(mid1+mid2) / 2.0
}
return float64(sortedNums[n/2])
}

func (pd PrimitiveDescription) addToGraph(g *graphviz.Graph) (*graphviz.Node, error) {
var nodes []*graphviz.Node
for _, input := range pd.Inputs {
Expand Down Expand Up @@ -157,7 +182,7 @@ func (pd PrimitiveDescription) addToGraph(g *graphviz.Graph) (*graphviz.Node, er

func GraphViz(p Primitive) (*graphviz.Graph, error) {
g := graphviz.New()
description := PrimitiveToPlanDescription(p)
description := PrimitiveToPlanDescription(p, nil)
_, err := description.addToGraph(g)
if err != nil {
return nil, err
Expand Down Expand Up @@ -193,15 +218,16 @@ func marshalAdd(prepend string, buf *bytes.Buffer, name string, obj any) error {
}

// PrimitiveToPlanDescription transforms a primitive tree into a corresponding PlanDescription tree
func PrimitiveToPlanDescription(in Primitive) PrimitiveDescription {
// If stats is not nil, it will be used to populate the stats field of the PlanDescription
func PrimitiveToPlanDescription(in Primitive, stats map[int]RowsReceived) PrimitiveDescription {
this := in.description()
if id := in.GetID(); id > 0 {
this.ID = id
if id := in.GetID(); stats != nil && id > 0 {
this.Stats = stats[int(id)]
}

inputs, infos := in.Inputs()
for idx, input := range inputs {
pd := PrimitiveToPlanDescription(input)
pd := PrimitiveToPlanDescription(input, stats)
if infos != nil {
for k, v := range infos[idx] {
if k == inputName {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/plan_description_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func TestCreateRoutePlanDescription(t *testing.T) {
route := createRoute()

planDescription := PrimitiveToPlanDescription(route)
planDescription := PrimitiveToPlanDescription(route, nil)

expected := PrimitiveDescription{
OperatorType: "Route",
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestPlanDescriptionWithInputs(t *testing.T) {
Input: route,
}

planDescription := PrimitiveToPlanDescription(limit)
planDescription := PrimitiveToPlanDescription(limit, nil)

expected := PrimitiveDescription{
OperatorType: "Limit",
Expand Down
23 changes: 3 additions & 20 deletions go/vt/vtgate/engine/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,9 @@ func (t *Trace) NeedsTransaction() bool {
return t.Inner.NeedsTransaction()
}

func preWalk(desc *PrimitiveDescription, f func(*PrimitiveDescription)) {
f(desc)
for _, input := range desc.Inputs {
preWalk(&input, f)
}
}

func (t *Trace) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
getOpStats := vcursor.StartPrimitiveTrace()
_, err := t.Inner.TryExecute(ctx, vcursor, bindVars, wantfields)
_, err := vcursor.ExecutePrimitive(ctx, t.Inner, bindVars, wantfields)
if err != nil {
return nil, err
}
Expand All @@ -84,7 +77,7 @@ func (t *Trace) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[st
func (t *Trace) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
getOpsStats := vcursor.StartPrimitiveTrace()
noop := func(result *sqltypes.Result) error { return nil }
err := t.Inner.TryStreamExecute(ctx, vcursor, bindVars, wantfields, noop)
err := vcursor.StreamExecutePrimitive(ctx, t.Inner, bindVars, wantfields, noop)
if err != nil {
return err
}
Expand All @@ -98,17 +91,7 @@ func (t *Trace) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars
}

func (t *Trace) getExplainTraceOutput(getOpStats func() map[int]RowsReceived) (*sqltypes.Result, error) {
description := PrimitiveToPlanDescription(t.Inner)
statsMap := getOpStats()

// let's add the stats to the description
preWalk(&description, func(desc *PrimitiveDescription) {
stats, found := statsMap[int(desc.ID)]
if !found {
return
}
desc.Stats = stats
})
description := PrimitiveToPlanDescription(t.Inner, getOpStats())

output, err := json.MarshalIndent(description, "", "\t")
if err != nil {
Expand Down
66 changes: 32 additions & 34 deletions go/vt/vtgate/executor_vexplain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
Expand All @@ -48,63 +47,62 @@ func TestSimpleVexplainTrace(t *testing.T) {
sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
sbc.SetResults([]*sqltypes.Result{{
Fields: []*querypb.Field{
{Name: "col1", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "col2", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)},
{Name: "weight_string(col2)", Type: sqltypes.VarBinary, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_BINARY_FLAG)},
{Name: "col1", Type: sqltypes.Int32},
{Name: "col2", Type: sqltypes.Int32},
{Name: "weight_string(col2)"},
},
InsertID: 0,
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(1),
// i%4 ensures that there are duplicates across shards.
// This will allow us to test that cross-shard ordering
// still works correctly.
sqltypes.NewInt32(int32(i % 4)),
sqltypes.NULL,
}, {
sqltypes.NewInt32(1),
// i%4 ensures that there are duplicates across shards.
// This will allow us to test that cross-shard ordering
// still works correctly.
sqltypes.NewInt32(int32(i % 4)),
sqltypes.NULL,
}},
Rows: [][]sqltypes.Value{
{sqltypes.NewInt32(1), sqltypes.NewInt32(int32(i % 4)), sqltypes.NULL},
{sqltypes.NewInt32(2), sqltypes.NewInt32(int32(i % 4)), sqltypes.NULL},
},
}})
conns = append(conns, sbc)
}
executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()

query := "vexplain trace select col1, col2 from music order by col2 desc"
query := "vexplain trace select count(*), col2 from music group by col2"
session := &vtgatepb.Session{
TargetString: "@primary",
}
gotResult, err := executorExec(ctx, executor, session, query, nil)
require.NoError(t, err)

wantQueries := []*querypb.BoundQuery{{
Sql: "select col1, col2, weight_string(col2) from music order by music.col2 desc",
Sql: "select count(*), col2, weight_string(col2) from music group by col2, weight_string(col2) order by col2 asc",
BindVariables: map[string]*querypb.BindVariable{},
}}
for _, conn := range conns {
utils.MustMatch(t, wantQueries, conn.Queries)
}

expectedRowString := `{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "TestExecutor",
"Sharded": true
},
"OperatorType": "Aggregate",
"Variant": "Ordered",
"NoOfCalls": 1,
"Rows": [
16
],
"FieldQuery": "select col1, col2, weight_string(col2) from music where 1 != 1",
"OrderBy": "(1|2) DESC",
"Query": "select col1, col2, weight_string(col2) from music order by music.col2 desc",
"AvgRowSize": 4,
"MedianRowSize": 4,
"Aggregates": "sum_count_star(0) AS count(*)",
"GroupBy": "(1|2)",
"ResultColumns": 2,
"Table": "music"
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "TestExecutor",
"Sharded": true
},
"NoOfCalls": 2,
"AvgRowSize": 16,
"MedianRowSize": 16,
"FieldQuery": "select count(*), col2, weight_string(col2) from music where 1 != 1 group by col2, weight_string(col2)",
"OrderBy": "(1|2) ASC",
"Query": "select count(*), col2, weight_string(col2) from music group by col2, weight_string(col2) order by col2 asc",
"Table": "music"
}
]
}`

gotRowString := gotResult.Rows[0][0].ToString()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/vexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func buildVExplainVtgatePlan(ctx context.Context, explainStatement sqlparser.Sta
if err != nil {
return nil, err
}
description := engine.PrimitiveToPlanDescription(innerInstruction.primitive)
description := engine.PrimitiveToPlanDescription(innerInstruction.primitive, nil)
output, err := json.MarshalIndent(description, "", "\t")
if err != nil {
return nil, err
Expand Down

0 comments on commit 60f2212

Please sign in to comment.