Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Brendan Dougherty <[email protected]>
  • Loading branch information
brendar committed Feb 16, 2024
1 parent e1151b9 commit eecec66
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 3 deletions.
4 changes: 3 additions & 1 deletion examples/local/101_initial_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,7 @@ vtctldclient ApplyVSchema --vschema-file vschema_commerce_initial.json commerce
CELL=zone1 ./scripts/vtgate-up.sh

# start vtadmin
./scripts/vtadmin-up.sh
#./scripts/vtadmin-up.sh

echo "Making 100 the primary"
vtctlclient PlannedReparentShard -- --keyspace_shard=commerce/0 --new_primary=zone1-0000000100
2 changes: 1 addition & 1 deletion examples/local/401_teardown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

source ./env.sh

./scripts/vtadmin-down.sh
#./scripts/vtadmin-down.sh

./scripts/vtorc-down.sh

Expand Down
75 changes: 75 additions & 0 deletions examples/local/run_queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"context"
"fmt"
"log"
"os"
"sync"
"time"

_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

const NumQueries = 10000
const Concurrency = 100
const ShouldDelay = false
const Delay = 100 * time.Millisecond

func main() {
connectCtx, connectCancel := context.WithTimeout(context.Background(), 1*time.Second)
defer connectCancel()

fmt.Println("Connecting")
conn, err := vtgateconn.Dial(connectCtx, "localhost:15991")
if err != nil {
log.Fatal(err)
}
defer conn.Close()

session := conn.Session("commerce", nil)

var wg sync.WaitGroup
wg.Add(Concurrency)

tasks := make(chan int, NumQueries)

for i := 0; i < Concurrency; i++ {
go func(id int) {
defer wg.Done()
for range tasks {
duration, err := runQuery(session)
if err != nil {
fmt.Printf("%v %v\n", duration, err)
os.Exit(1)
}
//fmt.Println(duration)
if ShouldDelay {
time.Sleep(Delay)
}
}
}(i)
}

// Enqueue X tasks
for i := 0; i < NumQueries; i++ {
tasks <- i
}

// Close the channel to signal to the goroutines that no more tasks will be added
close(tasks)

// Wait for all goroutines to finish
wg.Wait()
}

func runQuery(session *vtgateconn.VTGateSession) (time.Duration, error) {
queryCtx, queryCancel := context.WithTimeout(context.Background(), 1*time.Second)
defer queryCancel()
start := time.Now()
_, err := session.Execute(queryCtx, "SELECT * FROM customer", nil)
end := time.Now()
return end.Sub(start), err
}
1 change: 1 addition & 0 deletions examples/local/scripts/vtgate-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mysql_server_socket_path="/tmp/mysql.sock"
# shellcheck disable=SC2086
vtgate \
$TOPOLOGY_FLAGS \
--alsologtostderr \
--log_dir $VTDATAROOT/tmp \
--log_queries_to_file $VTDATAROOT/tmp/vtgate_querylog.txt \
--port $web_port \
Expand Down
12 changes: 12 additions & 0 deletions examples/local/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ echo "Starting vttablet for $alias..."
# shellcheck disable=SC2086
vttablet \
$TOPOLOGY_FLAGS \
--alsologtostderr \
--log_dir $VTDATAROOT/tmp \
--log_queries_to_file $VTDATAROOT/tmp/$tablet_logfile \
--tablet-path $alias \
Expand All @@ -55,6 +56,17 @@ vttablet \
--service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \
--pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \
--vtctld_addr http://$hostname:$vtctld_web_port/ \
--enable_consolidator=false \
--enable_consolidator_replicas=false \
--queryserver-config-pool-size 100 \
--queryserver-config-query-pool-timeout 0.1 \
--queryserver-config-query-pool-waiter-cap 100 \
--queryserver-config-transaction-cap 100 \
--queryserver-config-txpool-timeout 0.1 \
--queryserver-config-txpool-waiter-cap 100 \
--queryserver-config-stream-pool-size 100 \
--queryserver-config-stream-pool-timeout 0.1 \
--queryserver-config-stream-pool-waiter-cap 100 \
--disable_active_reparents \
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

Expand Down
30 changes: 30 additions & 0 deletions go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type (

reopenMutex sync.Mutex
refresh *poolRefresh
verbose sync2.AtomicBool
}
)

Expand Down Expand Up @@ -180,6 +181,10 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur
return rp
}

func (rp *ResourcePool) Verbose(v bool) {
rp.verbose.Set(v)
}

func (rp *ResourcePool) Name() string {
return "ResourcePool"
}
Expand Down Expand Up @@ -263,6 +268,7 @@ func (rp *ResourcePool) Get(ctx context.Context, setting *Setting) (resource Res
}

func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error) {
rp.verboseInfo("AYO getting a resource without settings")
rp.getCount.Add(1)
// Fetch
var wrapper resourceWrapper
Expand All @@ -274,19 +280,24 @@ func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error)
// check normal resources first
case wrapper, ok = <-rp.resources:
default:
rp.verboseInfo("AYO nothing in rp.resources")
select {
// then checking setting resources
case wrapper, ok = <-rp.settingResources:
default:
rp.verboseInfo("AYO nothing in rp.settingResources")
rp.verboseInfo("AYO waiting for a resource")
// now waiting
startTime := time.Now()
select {
case wrapper, ok = <-rp.resources:
case wrapper, ok = <-rp.settingResources:
case <-ctx.Done():
rp.verboseError("AYO timed out waiting for a resource")
return nil, ErrTimeout
}
rp.recordWait(startTime)
rp.verboseInfo("AYO got a resource after waiting")
}
}
if !ok {
Expand All @@ -295,6 +306,7 @@ func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error)

// if the resource has setting applied, we will close it and return a new one
if wrapper.resource != nil && wrapper.resource.IsSettingApplied() {
rp.verboseInfo("AYO got a resource with settings applied. Gonna reset it.")
rp.resetSettingCount.Add(1)
err = wrapper.resource.ResetSetting(ctx)
if err != nil {
Expand All @@ -307,20 +319,38 @@ func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error)

// Unwrap
if wrapper.resource == nil {
rp.verboseInfo("AYO gonna create a new resource.")
wrapper.resource, err = rp.factory(ctx)
if err != nil {
rp.resources <- resourceWrapper{}
rp.verboseError("AYO error creating resource: %s", err)
return nil, err
}
rp.verboseInfo("AYO created a resource")
rp.active.Add(1)
}

rp.verboseInfo("AYO got a resource now (from pool or just created)")

if rp.available.Add(-1) <= 0 {
rp.exhausted.Add(1)
}
rp.inUse.Add(1)
return wrapper.resource, err
}

func (rp *ResourcePool) verboseInfo(format string, args ...interface{}) {
if rp.verbose.Get() {
log.Infof(format, args...)
}
}

func (rp *ResourcePool) verboseError(format string, args ...interface{}) {
if rp.verbose.Get() {
log.Infof(format, args...)
}
}

func (rp *ResourcePool) getWithSettings(ctx context.Context, setting *Setting) (Resource, error) {
rp.getSettingCount.Add(1)
var wrapper resourceWrapper
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams dbconfigs.Connector) {
refreshCheck = netutil.DNSTracker(appParams.Host())
}

cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.getLogWaitCallback(), refreshCheck, mysqlctl.PoolDynamicHostnameResolution)
pool := pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.getLogWaitCallback(), refreshCheck, mysqlctl.PoolDynamicHostnameResolution)
if cp.name == "ConnPool" {
pool.Verbose(true)
}
cp.connections = pool
cp.appDebugParams = appDebugParams

cp.dbaPool.Open(dbaParams)
Expand Down

0 comments on commit eecec66

Please sign in to comment.