Skip to content

Commit

Permalink
vstreamclient: framework for robust + simple usage
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Perkins <[email protected]>
  • Loading branch information
derekperkins committed Nov 13, 2024
1 parent fb6115e commit 829c980
Show file tree
Hide file tree
Showing 6 changed files with 1,424 additions and 0 deletions.
35 changes: 35 additions & 0 deletions go/vt/vstreamclient/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package vstreamclient

import (
"iter"
"reflect"
)

// Chunk returns an iterator over consecutive sub-slices of up to n elements of s.
// All but the last sub-slice will have size n.
// All sub-slices are clipped to have no capacity beyond the length.
// If s is empty, the sequence is empty: there is no empty slice in the sequence.
// Chunk panics if n is less than 1.
func reflectChunk(s reflect.Value, n int) iter.Seq[reflect.Value] {
if n < 1 {
panic("cannot be less than 1")
}

if s.Kind() != reflect.Slice {
panic("must be a slice")
}

return func(yield func(s reflect.Value) bool) {
for i := 0; i < s.Len(); i += n {
// Clamp the last chunk to the slice bound as necessary.
// end := min(n, len(s[i:]))
end := min(n, s.Slice(i, s.Len()).Len())

// Set the capacity of each chunk so that appending to a chunk does
// not modify the original slice.
if !yield(s.Slice3(i, i+end, i+end)) {
return
}
}
}
}
134 changes: 134 additions & 0 deletions go/vt/vstreamclient/chunk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Chunk test code is modified from slices/iter_test.go
//
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vstreamclient

import (
"reflect"
"slices"
"testing"
)

func TestChunk(t *testing.T) {
cases := []struct {
name string
s []int
n int
chunks [][]int
}{
{
name: "nil",
s: nil,
n: 1,
chunks: nil,
},
{
name: "empty",
s: []int{},
n: 1,
chunks: nil,
},
{
name: "short",
s: []int{1, 2},
n: 3,
chunks: [][]int{{1, 2}},
},
{
name: "one",
s: []int{1, 2},
n: 2,
chunks: [][]int{{1, 2}},
},
{
name: "even",
s: []int{1, 2, 3, 4},
n: 2,
chunks: [][]int{{1, 2}, {3, 4}},
},
{
name: "odd",
s: []int{1, 2, 3, 4, 5},
n: 2,
chunks: [][]int{{1, 2}, {3, 4}, {5}},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var chunks [][]int
for c := range reflectChunk(reflect.ValueOf(tc.s), tc.n) {
typedC := c.Interface().([]int)
chunks = append(chunks, typedC)
}

if !chunkEqual(chunks, tc.chunks) {
t.Errorf("Chunk(%v, %d) = %v, want %v", tc.s, tc.n, chunks, tc.chunks)
}

if len(chunks) == 0 {
return
}

// Verify that appending to the end of the first chunk does not
// clobber the beginning of the next chunk.
s := slices.Clone(tc.s)
chunks[0] = append(chunks[0], -1)
if !slices.Equal(s, tc.s) {
t.Errorf("slice was clobbered: %v, want %v", s, tc.s)
}
})
}
}

func TestChunkPanics(t *testing.T) {
for _, test := range []struct {
name string
x []struct{}
n int
}{
{
name: "cannot be less than 1",
x: make([]struct{}, 0),
n: 0,
},
} {
if !panics(func() { _ = reflectChunk(reflect.ValueOf(test.x), test.n) }) {
t.Errorf("Chunk %s: got no panic, want panic", test.name)
}
}
}

func TestChunkRange(t *testing.T) {
// Verify Chunk iteration can be stopped.
var got [][]int
for c := range reflectChunk(reflect.ValueOf([]int{1, 2, 3, 4, -100}), 2) {
if len(got) == 2 {
// Found enough values, break early.
break
}

typedC := c.Interface().([]int)
got = append(got, typedC)
}

if want := [][]int{{1, 2}, {3, 4}}; !chunkEqual(got, want) {
t.Errorf("Chunk iteration did not stop, got %v, want %v", got, want)
}
}

func chunkEqual[Slice ~[]E, E comparable](s1, s2 []Slice) bool {
return slices.EqualFunc(s1, s2, slices.Equal[Slice])
}

func panics(f func()) (b bool) {
defer func() {
if x := recover(); x != nil {
b = true
}
}()
f()
return false
}
115 changes: 115 additions & 0 deletions go/vt/vstreamclient/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package vstreamclient

import (
"fmt"
"time"

"vitess.io/vitess/go/sqlescape"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

var (
// DefaultMinFlushDuration is the default minimum duration between flushes, used if not explicitly
// set using WithMinFlushDuration. This can be safely modified if needed before calling New.
DefaultMinFlushDuration = 5 * time.Second

// DefaultMaxRowsPerFlush is the default number of rows to buffer per table, used if not explicitly
// set in the table configuration. This same number is also used to chunk rows when calling flush.
// This can be safely modified if needed before calling New.
DefaultMaxRowsPerFlush = 1000
)

// Option is a function that can be used to configure a VStreamer
type Option func(v *VStreamer) error

// WithMinFlushDuration sets the minimum duration between flushes. This is useful for ensuring that data
// isn't flushed too often, which can be inefficient. The default is 30 seconds.
func WithMinFlushDuration(d time.Duration) Option {
return func(v *VStreamer) error {
if d <= 0 {
return fmt.Errorf("vstreamclient: minimum flush duration must be positive, got %s", d.String())
}

v.minFlushDuration = d
return nil
}
}

func WithHeartbeatSeconds(seconds int) Option {
return func(v *VStreamer) error {
if seconds <= 0 {
return fmt.Errorf("vstreamclient: heartbeat seconds must be positive, got %d", seconds)
}

v.heartbeatSeconds = seconds
return nil
}
}

func WithStateTable(keyspace, table string) Option {
return func(v *VStreamer) error {
shards, ok := v.shardsByKeyspace[keyspace]
if !ok {
return fmt.Errorf("vstreamclient: keyspace %s not found", keyspace)
}

// this could allow for shard pinning, but we can support that if it becomes useful
if len(shards) > 1 {
return fmt.Errorf("vstreamclient: keyspace %s is sharded, only unsharded keyspaces are supported", keyspace)
}

v.vgtidStateKeyspace = sqlescape.EscapeID(keyspace)
v.vgtidStateTable = sqlescape.EscapeID(table)
return nil
}
}

// DefaultFlags returns a default set of flags for a VStreamer, safe to use in most cases, but can be customized
func DefaultFlags() *vtgatepb.VStreamFlags {
return &vtgatepb.VStreamFlags{
HeartbeatInterval: 1,
}
}

// WithFlags lets you manually control all the flag options, instead of using helper functions
func WithFlags(flags *vtgatepb.VStreamFlags) Option {
return func(v *VStreamer) error {
v.flags = flags
return nil
}
}

// WithEventFunc provides for custom event handling functions for specific event types. Only one function
// can be registered per event type, and it is called before the default event handling function. Returning
// an error from the custom function will exit the stream before the default function is called.
func WithEventFunc(fn EventFunc, eventTypes ...binlogdatapb.VEventType) Option {
return func(v *VStreamer) error {
if len(eventTypes) == 0 {
return fmt.Errorf("vstreamclient: no event types provided")
}

if v.eventFuncs == nil {
v.eventFuncs = make(map[binlogdatapb.VEventType]EventFunc)
}

for _, eventType := range eventTypes {
if _, ok := v.eventFuncs[eventType]; ok {
return fmt.Errorf("vstreamclient: event type %s already has a function", eventType.String())
}

v.eventFuncs[eventType] = fn
}

return nil
}
}

// WithStartingVGtid sets the starting VGtid for the VStreamer. This is useful for resuming a stream from a
// specific point, vs what might be stored in the state table.
func WithStartingVGtid(vgtid *binlogdatapb.VGtid) Option {
return func(v *VStreamer) error {
v.latestVgtid = vgtid
return nil
}
}
Loading

0 comments on commit 829c980

Please sign in to comment.