diff --git a/go/vt/vttablet/tabletmanager/rpc_agent.go b/go/vt/vttablet/tabletmanager/rpc_agent.go index 450b706411f..66ab8a801db 100644 --- a/go/vt/vttablet/tabletmanager/rpc_agent.go +++ b/go/vt/vttablet/tabletmanager/rpc_agent.go @@ -85,10 +85,10 @@ type RPCTM interface { GetUnresolvedTransactions(ctx context.Context, abandonAgeSeconds int64) ([]*querypb.TransactionMetadata, error) - ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error - ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) + ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error + // Replication related methods PrimaryStatus(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) diff --git a/go/vt/vttablet/tabletmanager/rpc_transaction.go b/go/vt/vttablet/tabletmanager/rpc_transaction.go index 49fbb5a967e..f6ec092e357 100644 --- a/go/vt/vttablet/tabletmanager/rpc_transaction.go +++ b/go/vt/vttablet/tabletmanager/rpc_transaction.go @@ -34,27 +34,27 @@ func (tm *TabletManager) GetUnresolvedTransactions(ctx context.Context, abandonA return tm.QueryServiceControl.UnresolvedTransactions(ctx, target, abandonAgeSeconds) } -// ConcludeTransaction concludes the given distributed transaction. -func (tm *TabletManager) ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error { +// ReadTransaction returns the transaction metadata for the given distributed transaction ID. +func (tm *TabletManager) ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) { if err := tm.waitForGrantsToHaveApplied(ctx); err != nil { - return err + return nil, err } tablet := tm.Tablet() target := &querypb.Target{Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type} - if req.Mm { - return tm.QueryServiceControl.ConcludeTransaction(ctx, target, req.Dtid) - } - return tm.QueryServiceControl.RollbackPrepared(ctx, target, req.Dtid, 0) + return tm.QueryServiceControl.ReadTransaction(ctx, target, req.Dtid) } -// ReadTransaction returns the transaction metadata for the given distributed transaction ID. -func (tm *TabletManager) ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) { +// ConcludeTransaction concludes the given distributed transaction. +func (tm *TabletManager) ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error { if err := tm.waitForGrantsToHaveApplied(ctx); err != nil { - return nil, err + return err } tablet := tm.Tablet() target := &querypb.Target{Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type} - return tm.QueryServiceControl.ReadTransaction(ctx, target, req.Dtid) + if req.Mm { + return tm.QueryServiceControl.ConcludeTransaction(ctx, target, req.Dtid) + } + return tm.QueryServiceControl.RollbackPrepared(ctx, target, req.Dtid, 0) } diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index 82da2e91cfe..aa19e22a9df 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" "reflect" + "regexp" + "strconv" "strings" "testing" "time" @@ -419,6 +421,62 @@ func TestExecutorSetRollback(t *testing.T) { require.Contains(t, err.Error(), "could not transition to ROLLBACK: aa") } +func TestExecutorUnresolvedTransactions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + txe, tsv, db := newTestTxExecutor(t, ctx) + defer db.Close() + defer tsv.StopService() + + pattern := `(?i)select\s+t\.dtid,\s+t\.state,\s+p\.keyspace,\s+p\.shard\s+from\s+_vt\.dt_state\s+t\s+join\s+_vt\.dt_participant\s+p\s+on\s+t\.dtid\s+=\s+p\.dtid\s+where\s+time_created\s+<\s+(\d+)\s+order\s+by\s+t\.state\s+desc,\s+t\.dtid` + re := regexp.MustCompile(pattern) + + var executedQuery string + db.AddQueryPatternWithCallback(pattern, &sqltypes.Result{}, func(query string) { + executedQuery = query + }) + + tcases := []struct { + abandonAge time.Duration + expected time.Time + }{ + {abandonAge: 0, expected: time.Now().Add(-txe.te.abandonAge)}, + {abandonAge: 100 * time.Second, expected: time.Now().Add(-100 * time.Second)}, + } + + for _, tcase := range tcases { + t.Run(fmt.Sprintf("abandonAge=%v", tcase.abandonAge), func(t *testing.T) { + _, err := txe.UnresolvedTransactions(tcase.abandonAge) + require.NoError(t, err) + require.NotEmpty(t, executedQuery) + + // extract the time value + matches := re.FindStringSubmatch(executedQuery) + require.Len(t, matches, 2) + timeCreated := convertNanoStringToTime(t, matches[1]) + + // diff should be in microseconds, so we allow 10ms difference + require.WithinDuration(t, timeCreated, tcase.expected, 10*time.Millisecond) + }) + } + +} + +func convertNanoStringToTime(t *testing.T, unixNanoStr string) time.Time { + t.Helper() + + // Convert the string to an integer (int64) + unixNano, err := strconv.ParseInt(unixNanoStr, 10, 64) + require.NoError(t, err) + + // Convert nanoseconds to seconds and nanoseconds + seconds := unixNano / int64(time.Second) + nanos := unixNano % int64(time.Second) + + // Create a time.Time object + return time.Unix(seconds, nanos) +} + func TestExecutorConcludeTransaction(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go/vt/vttablet/tmclient/rpc_client_api.go b/go/vt/vttablet/tmclient/rpc_client_api.go index d3b7d310385..d22b0b8b34e 100644 --- a/go/vt/vttablet/tmclient/rpc_client_api.go +++ b/go/vt/vttablet/tmclient/rpc_client_api.go @@ -146,15 +146,19 @@ type TabletManagerClient interface { // query faster. Close() should close the pool in that case. ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error) + // + // Distributed Transaction related methods + // + // GetUnresolvedTransactions returns the list of unresolved transactions for the tablet. GetUnresolvedTransactions(ctx context.Context, tablet *topodatapb.Tablet) ([]*querypb.TransactionMetadata, error) - // ConcludeTransaction conclude the transaction on the tablet. - ConcludeTransaction(ctx context.Context, tablet *topodatapb.Tablet, dtid string, mm bool) error - // ReadTransaction returns the metadata for the specified distributed transaction ID. ReadTransaction(ctx context.Context, tablet *topodatapb.Tablet, dtid string) (*querypb.TransactionMetadata, error) + // ConcludeTransaction conclude the transaction on the tablet. + ConcludeTransaction(ctx context.Context, tablet *topodatapb.Tablet, dtid string, mm bool) error + // // Replication related methods // diff --git a/go/vt/vttablet/tmrpctest/test_tm_rpc.go b/go/vt/vttablet/tmrpctest/test_tm_rpc.go index 5d11eaf9afc..f50ae23bf57 100644 --- a/go/vt/vttablet/tmrpctest/test_tm_rpc.go +++ b/go/vt/vttablet/tmrpctest/test_tm_rpc.go @@ -415,6 +415,16 @@ func tmRPCTestGetUnresolvedTransactionsPanic(ctx context.Context, t *testing.T, expectHandleRPCPanic(t, "GetUnresolvedTransactions", false /*verbose*/, err) } +func tmRPCTestReadTransaction(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { + _, err := client.ReadTransaction(ctx, tablet, "aa") + require.NoError(t, err) +} + +func tmRPCTestReadTransactionPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { + _, err := client.ReadTransaction(ctx, tablet, "aa") + expectHandleRPCPanic(t, "ReadTransaction", false /*verbose*/, err) +} + // // Various read-write methods // @@ -744,18 +754,18 @@ func (fra *fakeRPCTM) GetUnresolvedTransactions(ctx context.Context, abandonAgeS return nil, nil } -func (fra *fakeRPCTM) ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error { +func (fra *fakeRPCTM) ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) { if fra.panics { panic(fmt.Errorf("test-triggered panic")) } - return nil + return nil, nil } -func (fra *fakeRPCTM) ReadTransaction(ctx context.Context, req *tabletmanagerdatapb.ReadTransactionRequest) (*querypb.TransactionMetadata, error) { +func (fra *fakeRPCTM) ConcludeTransaction(ctx context.Context, req *tabletmanagerdatapb.ConcludeTransactionRequest) error { if fra.panics { panic(fmt.Errorf("test-triggered panic")) } - return nil, nil + return nil } func tmRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { @@ -1457,6 +1467,7 @@ func Run(t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.T tmRPCTestGetPermissions(ctx, t, client, tablet) tmRPCTestGetGlobalStatusVars(ctx, t, client, tablet) tmRPCTestGetUnresolvedTransactions(ctx, t, client, tablet) + tmRPCTestReadTransaction(ctx, t, client, tablet) // Various read-write methods tmRPCTestSetReadOnly(ctx, t, client, tablet) @@ -1519,6 +1530,7 @@ func Run(t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.T tmRPCTestGetPermissionsPanic(ctx, t, client, tablet) tmRPCTestGetGlobalStatusVarsPanic(ctx, t, client, tablet) tmRPCTestGetUnresolvedTransactionsPanic(ctx, t, client, tablet) + tmRPCTestReadTransactionPanic(ctx, t, client, tablet) // Various read-write methods tmRPCTestSetReadOnlyPanic(ctx, t, client, tablet)