Skip to content

Commit

Permalink
test: added additional tests
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Sep 17, 2024
1 parent ae06dd1 commit 6f131d8
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 20 deletions.
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ type RPCTM interface {

GetUnresolvedTransactions(ctx context.Context, abandonAgeSeconds int64) ([]*querypb.TransactionMetadata, error)

ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error

ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error)

ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error

// Replication related methods
PrimaryStatus(ctx context.Context) (*replicationdatapb.PrimaryStatus, error)

Expand Down
22 changes: 11 additions & 11 deletions go/vt/vttablet/tabletmanager/rpc_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ func (tm *TabletManager) GetUnresolvedTransactions(ctx context.Context, abandonA
return tm.QueryServiceControl.UnresolvedTransactions(ctx, target, abandonAgeSeconds)
}

// ConcludeTransaction concludes the given distributed transaction.
func (tm *TabletManager) ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error {
// ReadTransaction returns the transaction metadata for the given distributed transaction ID.
func (tm *TabletManager) ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) {
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return err
return nil, err
}

tablet := tm.Tablet()
target := &querypb.Target{Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type}
if req.Mm {
return tm.QueryServiceControl.ConcludeTransaction(ctx, target, req.Dtid)
}
return tm.QueryServiceControl.RollbackPrepared(ctx, target, req.Dtid, 0)
return tm.QueryServiceControl.ReadTransaction(ctx, target, req.Dtid)
}

// ReadTransaction returns the transaction metadata for the given distributed transaction ID.
func (tm *TabletManager) ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) {
// ConcludeTransaction concludes the given distributed transaction.
func (tm *TabletManager) ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error {
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
return err
}

tablet := tm.Tablet()
target := &querypb.Target{Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type}
return tm.QueryServiceControl.ReadTransaction(ctx, target, req.Dtid)
if req.Mm {
return tm.QueryServiceControl.ConcludeTransaction(ctx, target, req.Dtid)
}
return tm.QueryServiceControl.RollbackPrepared(ctx, target, req.Dtid, 0)
}
58 changes: 58 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -419,6 +421,62 @@ func TestExecutorSetRollback(t *testing.T) {
require.Contains(t, err.Error(), "could not transition to ROLLBACK: aa")
}

func TestExecutorUnresolvedTransactions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()

pattern := `(?i)select\s+t\.dtid,\s+t\.state,\s+p\.keyspace,\s+p\.shard\s+from\s+_vt\.dt_state\s+t\s+join\s+_vt\.dt_participant\s+p\s+on\s+t\.dtid\s+=\s+p\.dtid\s+where\s+time_created\s+<\s+(\d+)\s+order\s+by\s+t\.state\s+desc,\s+t\.dtid`
re := regexp.MustCompile(pattern)

var executedQuery string
db.AddQueryPatternWithCallback(pattern, &sqltypes.Result{}, func(query string) {
executedQuery = query
})

tcases := []struct {
abandonAge time.Duration
expected time.Time
}{
{abandonAge: 0, expected: time.Now().Add(-txe.te.abandonAge)},
{abandonAge: 100 * time.Second, expected: time.Now().Add(-100 * time.Second)},
}

for _, tcase := range tcases {
t.Run(fmt.Sprintf("abandonAge=%v", tcase.abandonAge), func(t *testing.T) {
_, err := txe.UnresolvedTransactions(tcase.abandonAge)
require.NoError(t, err)
require.NotEmpty(t, executedQuery)

// extract the time value
matches := re.FindStringSubmatch(executedQuery)
require.Len(t, matches, 2)
timeCreated := convertNanoStringToTime(t, matches[1])

// diff should be in microseconds, so we allow 10ms difference
require.WithinDuration(t, timeCreated, tcase.expected, 10*time.Millisecond)
})
}

}

func convertNanoStringToTime(t *testing.T, unixNanoStr string) time.Time {
t.Helper()

// Convert the string to an integer (int64)
unixNano, err := strconv.ParseInt(unixNanoStr, 10, 64)
require.NoError(t, err)

// Convert nanoseconds to seconds and nanoseconds
seconds := unixNano / int64(time.Second)
nanos := unixNano % int64(time.Second)

// Create a time.Time object
return time.Unix(seconds, nanos)
}

func TestExecutorConcludeTransaction(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/tmclient/rpc_client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,19 @@ type TabletManagerClient interface {
// query faster. Close() should close the pool in that case.
ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error)

//
// Distributed Transaction related methods
//

// GetUnresolvedTransactions returns the list of unresolved transactions for the tablet.
GetUnresolvedTransactions(ctx context.Context, tablet *topodatapb.Tablet) ([]*querypb.TransactionMetadata, error)

// ConcludeTransaction conclude the transaction on the tablet.
ConcludeTransaction(ctx context.Context, tablet *topodatapb.Tablet, dtid string, mm bool) error

// ReadTransaction returns the metadata for the specified distributed transaction ID.
ReadTransaction(ctx context.Context, tablet *topodatapb.Tablet, dtid string) (*querypb.TransactionMetadata, error)

// ConcludeTransaction conclude the transaction on the tablet.
ConcludeTransaction(ctx context.Context, tablet *topodatapb.Tablet, dtid string, mm bool) error

//
// Replication related methods
//
Expand Down
20 changes: 16 additions & 4 deletions go/vt/vttablet/tmrpctest/test_tm_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,16 @@ func tmRPCTestGetUnresolvedTransactionsPanic(ctx context.Context, t *testing.T,
expectHandleRPCPanic(t, "GetUnresolvedTransactions", false /*verbose*/, err)
}

func tmRPCTestReadTransaction(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
_, err := client.ReadTransaction(ctx, tablet, "aa")
require.NoError(t, err)
}

func tmRPCTestReadTransactionPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
_, err := client.ReadTransaction(ctx, tablet, "aa")
expectHandleRPCPanic(t, "ReadTransaction", false /*verbose*/, err)
}

//
// Various read-write methods
//
Expand Down Expand Up @@ -744,18 +754,18 @@ func (fra *fakeRPCTM) GetUnresolvedTransactions(ctx context.Context, abandonAgeS
return nil, nil
}

func (fra *fakeRPCTM) ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error {
func (fra *fakeRPCTM) ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
return nil
return nil, nil
}

func (fra *fakeRPCTM) ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) {
func (fra *fakeRPCTM) ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
return nil, nil
return nil
}

func tmRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
Expand Down Expand Up @@ -1457,6 +1467,7 @@ func Run(t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.T
tmRPCTestGetPermissions(ctx, t, client, tablet)
tmRPCTestGetGlobalStatusVars(ctx, t, client, tablet)
tmRPCTestGetUnresolvedTransactions(ctx, t, client, tablet)
tmRPCTestReadTransaction(ctx, t, client, tablet)

// Various read-write methods
tmRPCTestSetReadOnly(ctx, t, client, tablet)
Expand Down Expand Up @@ -1519,6 +1530,7 @@ func Run(t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.T
tmRPCTestGetPermissionsPanic(ctx, t, client, tablet)
tmRPCTestGetGlobalStatusVarsPanic(ctx, t, client, tablet)
tmRPCTestGetUnresolvedTransactionsPanic(ctx, t, client, tablet)
tmRPCTestReadTransactionPanic(ctx, t, client, tablet)

// Various read-write methods
tmRPCTestSetReadOnlyPanic(ctx, t, client, tablet)
Expand Down

0 comments on commit 6f131d8

Please sign in to comment.