Skip to content

Commit

Permalink
slack-vitess-r15.0.5: misc required backports, pt. 1 (#361)
Browse files Browse the repository at this point in the history
* Add `VStreamerCount` stat to `vttablet` (vitessio#11978)

* Add `VStreamersActive` stat to `vttablet`

Signed-off-by: Tim Vaillancourt <[email protected]>

* Improve desc

Signed-off-by: Tim Vaillancourt <[email protected]>

* Add PR suggestions

Signed-off-by: Tim Vaillancourt <[email protected]>

* Move to *stats.Gauge

Signed-off-by: Tim Vaillancourt <[email protected]>

* Single defer

Signed-off-by: Tim Vaillancourt <[email protected]>

Signed-off-by: Tim Vaillancourt <[email protected]>

* Add `Uptime` metric (vitessio#12712)

* Add `Uptime` metric to `vtgate`+`vttablet`

Signed-off-by: Tim Vaillancourt <[email protected]>

* move to go/vt/servenv/status.go

Signed-off-by: Tim Vaillancourt <[email protected]>

* Use nanoseconds for uptime

Signed-off-by: Tim Vaillancourt <[email protected]>

* Move Uptime metrics to servenv.go, remove dupe start time.Time

Signed-off-by: Tim Vaillancourt <[email protected]>

* Use serverStart time.Time

Signed-off-by: Tim Vaillancourt <[email protected]>

---------

Signed-off-by: Tim Vaillancourt <[email protected]>

* Implement the RowsColumnTypeScanType interface in the go sql driver for Vitess in order to get the column types (vitessio#12007)

Signed-off-by: Johan Oskarsson <[email protected]>

Signed-off-by: Johan Oskarsson <[email protected]>
Co-authored-by: Johan Oskarsson <[email protected]>

* Add vstream metrics to vtgate (vitessio#13098)

* Add vstream metrics to vtgate

Signed-off-by: twthorn <[email protected]>

* Update unit test name and use cell variable

Signed-off-by: twthorn <[email protected]>

* Reset metrics for TestVStreamsCreatedAndLagMetrics, fix data race issue

Signed-off-by: twthorn <[email protected]>

---------

Signed-off-by: twthorn <[email protected]>

* add lock to flaky TestValidateVersionShard test

Signed-off-by: Tim Vaillancourt <[email protected]>

* typo

Signed-off-by: Tim Vaillancourt <[email protected]>

---------

Signed-off-by: Tim Vaillancourt <[email protected]>
Signed-off-by: Johan Oskarsson <[email protected]>
Signed-off-by: twthorn <[email protected]>
Co-authored-by: Johan Oskarsson <[email protected]>
Co-authored-by: Johan Oskarsson <[email protected]>
Co-authored-by: Thomas Thornton <[email protected]>
  • Loading branch information
4 people authored May 22, 2024
1 parent a912583 commit e9875f2
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 6 deletions.
5 changes: 5 additions & 0 deletions go/vt/servenv/servenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func Init() {
mu.Lock()
defer mu.Unlock()

// Uptime metric
_ = stats.NewGaugeFunc("Uptime", "Uptime in nanoseconds", func() int64 {
return int64(time.Since(serverStart).Nanoseconds())
})

// Ignore SIGPIPE if specified
// The Go runtime catches SIGPIPE for us on all fds except stdout/stderr
// See https://golang.org/pkg/os/signal/#hdr-SIGPIPE
Expand Down
61 changes: 61 additions & 0 deletions go/vt/vitessdriver/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
package vitessdriver

import (
"database/sql"
"database/sql/driver"
"io"
"reflect"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/query"
)

// rows creates a database/sql/driver compliant Row iterator
Expand Down Expand Up @@ -58,3 +62,60 @@ func (ri *rows) Next(dest []driver.Value) error {
ri.index++
return nil
}

var (
typeInt8 = reflect.TypeOf(int8(0))
typeUint8 = reflect.TypeOf(uint8(0))
typeInt16 = reflect.TypeOf(int16(0))
typeUint16 = reflect.TypeOf(uint16(0))
typeInt32 = reflect.TypeOf(int32(0))
typeUint32 = reflect.TypeOf(uint32(0))
typeInt64 = reflect.TypeOf(int64(0))
typeUint64 = reflect.TypeOf(uint64(0))
typeFloat32 = reflect.TypeOf(float32(0))
typeFloat64 = reflect.TypeOf(float64(0))
typeRawBytes = reflect.TypeOf(sql.RawBytes{})
typeTime = reflect.TypeOf(time.Time{})
typeUnknown = reflect.TypeOf(new(interface{}))
)

// Implements the RowsColumnTypeScanType interface
func (ri *rows) ColumnTypeScanType(index int) reflect.Type {
field := ri.qr.Fields[index]
switch field.GetType() {
case query.Type_INT8:
return typeInt8
case query.Type_UINT8:
return typeUint8
case query.Type_INT16, query.Type_YEAR:
return typeInt16
case query.Type_UINT16:
return typeUint16
case query.Type_INT24:
return typeInt32
case query.Type_UINT24: // no 24 bit type, using 32 instead
return typeUint32
case query.Type_INT32:
return typeInt32
case query.Type_UINT32:
return typeUint32
case query.Type_INT64:
return typeInt64
case query.Type_UINT64:
return typeUint64
case query.Type_FLOAT32:
return typeFloat32
case query.Type_FLOAT64:
return typeFloat64
case query.Type_TIMESTAMP, query.Type_DECIMAL, query.Type_VARCHAR, query.Type_TEXT,
query.Type_BLOB, query.Type_VARBINARY, query.Type_CHAR, query.Type_BINARY, query.Type_BIT,
query.Type_ENUM, query.Type_SET, query.Type_TUPLE, query.Type_GEOMETRY, query.Type_JSON,
query.Type_HEXNUM, query.Type_HEXVAL, query.Type_BITNUM:

return typeRawBytes
case query.Type_DATE, query.Type_TIME, query.Type_DATETIME:
return typeTime
default:
return typeUnknown
}
}
91 changes: 91 additions & 0 deletions go/vt/vitessdriver/rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package vitessdriver

import (
"database/sql/driver"
"fmt"
"io"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -135,3 +137,92 @@ func TestRows(t *testing.T) {

_ = ri.Close()
}

// Test that the ColumnTypeScanType function returns the correct reflection type for each
// sql type. The sql type in turn comes from a table column's type.
func TestColumnTypeScanType(t *testing.T) {
var r = sqltypes.Result{
Fields: []*querypb.Field{
{
Name: "field1",
Type: sqltypes.Int8,
},
{
Name: "field2",
Type: sqltypes.Uint8,
},
{
Name: "field3",
Type: sqltypes.Int16,
},
{
Name: "field4",
Type: sqltypes.Uint16,
},
{
Name: "field5",
Type: sqltypes.Int24,
},
{
Name: "field6",
Type: sqltypes.Uint24,
},
{
Name: "field7",
Type: sqltypes.Int32,
},
{
Name: "field8",
Type: sqltypes.Uint32,
},
{
Name: "field9",
Type: sqltypes.Int64,
},
{
Name: "field10",
Type: sqltypes.Uint64,
},
{
Name: "field11",
Type: sqltypes.Float32,
},
{
Name: "field12",
Type: sqltypes.Float64,
},
{
Name: "field13",
Type: sqltypes.VarBinary,
},
{
Name: "field14",
Type: sqltypes.Datetime,
},
},
}

ri := newRows(&r, &converter{}).(driver.RowsColumnTypeScanType)
defer ri.Close()

wantTypes := []reflect.Type{
typeInt8,
typeUint8,
typeInt16,
typeUint16,
typeInt32,
typeUint32,
typeInt32,
typeUint32,
typeInt64,
typeUint64,
typeFloat32,
typeFloat64,
typeRawBytes,
typeTime,
}

for i := 0; i < len(wantTypes); i++ {
assert.Equal(t, ri.ColumnTypeScanType(i), wantTypes[i], fmt.Sprintf("unexpected type %v, wanted %v", ri.ColumnTypeScanType(i), wantTypes[i]))
}
}
14 changes: 10 additions & 4 deletions go/vt/vtctl/grpcvtctldserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -11322,7 +11323,7 @@ func TestValidateVersionShard(t *testing.T) {
name string
req *vtctldatapb.ValidateVersionShardRequest
expected *vtctldatapb.ValidateVersionShardResponse
setup func()
setup func(*sync.Mutex)
shouldErr bool
}{
{
Expand All @@ -11334,7 +11335,9 @@ func TestValidateVersionShard(t *testing.T) {
expected: &vtctldatapb.ValidateVersionShardResponse{
Results: []string{},
},
setup: func() {
setup: func(testSetupMu *sync.Mutex) {
testSetupMu.Lock()
defer testSetupMu.Unlock()
addrVersionMap := map[string]string{
"primary:0": "version1",
"replica:0": "version1",
Expand All @@ -11352,7 +11355,9 @@ func TestValidateVersionShard(t *testing.T) {
expected: &vtctldatapb.ValidateVersionShardResponse{
Results: []string{"primary zone1-0000000100 version version1 is different than replica zone1-0000000101 version version:\"version2\""},
},
setup: func() {
setup: func(testSetupMu *sync.Mutex) {
testSetupMu.Lock()
defer testSetupMu.Unlock()
addrVersionMap := map[string]string{
"primary:0": "version1",
"replica:0": "version2",
Expand All @@ -11363,12 +11368,13 @@ func TestValidateVersionShard(t *testing.T) {
},
}

var testSetupMu sync.Mutex
for _, tt := range tests {
curT := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

curT.setup()
curT.setup(&testSetupMu)
resp, err := vtctld.ValidateVersionShard(ctx, curT.req)
if curT.shouldErr {
assert.Error(t, err)
Expand Down
24 changes: 24 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"sync"
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
Expand All @@ -45,6 +47,9 @@ type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
}

// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
Expand Down Expand Up @@ -115,10 +120,19 @@ type journalEvent struct {
}

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")
return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
[]string{"Keyspace", "ShardName", "TabletType"}),
vstreamsLag: exporter.NewGaugesWithMultiLabels(
"VStreamsLag",
"Difference between event current time and the binlog event timestamp",
[]string{"Keyspace", "ShardName", "TabletType"}),
}
}

Expand Down Expand Up @@ -503,10 +517,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
Filter: vs.filter,
TableLastPKs: sgtid.TablePKs,
}
var vstreamCreatedOnce sync.Once
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0

labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()}

vstreamCreatedOnce.Do(func() {
vs.vsm.vstreamsCreated.Add(labels, 1)
})

select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -586,6 +607,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
default:
sendevents = append(sendevents, event)
}
lag := event.CurrentTime/1e9 - event.Timestamp
vs.vsm.vstreamsLag.Set(labels, lag)

}
if len(sendevents) != 0 {
eventss = append(eventss, sendevents)
Expand Down
56 changes: 55 additions & 1 deletion go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,59 @@ func TestVStreamMulti(t *testing.T) {
}
}

func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cell := "aa"
ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet())

send0 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 15 * 1e9},
}
sbc0.AddVStreamEvents(send0, nil)

send1 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
}
sbc1.AddVStreamEvents(send1, nil)

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}, {
Keyspace: ks,
Shard: "20-40",
Gtid: "pos",
}},
}
ch := startVStream(ctx, t, vsm, vgtid, nil)
<-ch
<-ch
wantVStreamsCreated := make(map[string]int64)
wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1
wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1
assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches")

wantVStreamsLag := make(map[string]int64)
wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5
wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")
}

func TestVStreamRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1094,7 +1147,8 @@ func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid
func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants ...*binlogdatapb.VStreamResponse) {
t.Helper()
for i, want := range wants {
got := <-ch
val := <-ch
got := proto.Clone(val).(*binlogdatapb.VStreamResponse)
require.NotNil(t, got)
for _, event := range got.Events {
event.Timestamp = 0
Expand Down
Loading

0 comments on commit e9875f2

Please sign in to comment.