diff --git a/core/capabilities/remote/target/client_test.go b/core/capabilities/remote/target/client_test.go index 8665ffe7544..5f9261eed8f 100644 --- a/core/capabilities/remote/target/client_test.go +++ b/core/capabilities/remote/target/client_test.go @@ -11,7 +11,7 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" - "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -22,8 +22,7 @@ import ( ) func Test_Client_DonTopologies(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) transmissionSchedule, err := values.NewMap(map[string]any{ "schedule": transmission.Schedule_OneAtATime, @@ -60,8 +59,7 @@ func Test_Client_DonTopologies(t *testing.T) { } func Test_Client_TransmissionSchedules(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { require.NoError(t, responseError) @@ -99,8 +97,7 @@ func Test_Client_TransmissionSchedules(t *testing.T) { } func Test_Client_TimesOutIfInsufficientCapabilityPeerResponses(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { require.NoError(t, responseError) @@ -166,14 +163,13 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo } callers := make([]commoncap.TargetCapability, numWorkflowPeers) - srvcs := make([]services.Service, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) caller := target.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout, lggr) - require.NoError(t, caller.Start(ctx)) + servicetest.Run(t, caller) broker.RegisterReceiverNode(workflowPeers[i], caller) callers[i] = caller - srvcs[i] = caller } executeInputs, err := values.NewMap( @@ -190,6 +186,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo // Fire off all the requests for _, caller := range callers { go func(caller commoncap.TargetCapability) { + defer wg.Done() responseCh, err := caller.Execute(ctx, commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ @@ -201,14 +198,10 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo }) responseTest(t, responseCh, err) - wg.Done() }(caller) } wg.Wait() - for i := 0; i < numWorkflowPeers; i++ { - require.NoError(t, srvcs[i].Close()) - } } // Simple client that only responds once it has received a message from each workflow peer diff --git a/core/capabilities/remote/target/endtoend_test.go b/core/capabilities/remote/target/endtoend_test.go index a5379250e5e..c9e9fea28f0 100644 --- a/core/capabilities/remote/target/endtoend_test.go +++ b/core/capabilities/remote/target/endtoend_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/require" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -25,8 +25,7 @@ import ( ) func Test_RemoteTargetCapability_InsufficientCapabilityResponses(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { require.NoError(t, responseError) @@ -46,8 +45,7 @@ func Test_RemoteTargetCapability_InsufficientCapabilityResponses(t *testing.T) { } func Test_RemoteTargetCapability_InsufficientWorkflowRequests(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { require.NoError(t, responseError) @@ -69,8 +67,7 @@ func Test_RemoteTargetCapability_InsufficientWorkflowRequests(t *testing.T) { } func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { require.NoError(t, responseError) @@ -102,8 +99,7 @@ func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) { } func Test_RemoteTargetCapability_DonTopologies(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { require.NoError(t, responseError) @@ -138,8 +134,7 @@ func Test_RemoteTargetCapability_DonTopologies(t *testing.T) { } func Test_RemoteTargetCapability_CapabilityError(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { require.NoError(t, responseError) @@ -159,8 +154,7 @@ func Test_RemoteTargetCapability_CapabilityError(t *testing.T) { } func Test_RemoteTargetCapability_RandomCapabilityError(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { require.NoError(t, responseError) @@ -226,27 +220,24 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta workflowDonInfo.ID: workflowDonInfo, } - srvcs := []services.Service{} capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers) for i := 0; i < numCapabilityPeers; i++ { capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) capabilityNode := target.NewServer(capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, capabilityNodeResponseTimeout, lggr) - require.NoError(t, capabilityNode.Start(ctx)) + servicetest.Run(t, capabilityNode) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode - srvcs = append(srvcs, capabilityNode) } workflowNodes := make([]commoncap.TargetCapability, numWorkflowPeers) for i := 0; i < numWorkflowPeers; i++ { workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) workflowNode := target.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout, lggr) - require.NoError(t, workflowNode.Start(ctx)) + servicetest.Run(t, workflowNode) broker.RegisterReceiverNode(workflowPeers[i], workflowNode) workflowNodes[i] = workflowNode - srvcs = append(srvcs, workflowNode) } executeInputs, err := values.NewMap( @@ -262,6 +253,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta for _, caller := range workflowNodes { go func(caller commoncap.TargetCapability) { + defer wg.Done() responseCh, err := caller.Execute(ctx, commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ @@ -273,14 +265,10 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta }) responseTest(t, responseCh, err) - wg.Done() }(caller) } wg.Wait() - for _, srv := range srvcs { - require.NoError(t, srv.Close()) - } } type testMessageBroker struct { diff --git a/core/capabilities/remote/target/server_test.go b/core/capabilities/remote/target/server_test.go index e6d85ebff25..80c0d5bc6e0 100644 --- a/core/capabilities/remote/target/server_test.go +++ b/core/capabilities/remote/target/server_test.go @@ -19,8 +19,7 @@ import ( ) func Test_Server_RespondsAfterSufficientRequests(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) numCapabilityPeers := 4 @@ -47,8 +46,7 @@ func Test_Server_RespondsAfterSufficientRequests(t *testing.T) { } func Test_Server_InsufficientCallers(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) numCapabilityPeers := 4 @@ -75,8 +73,7 @@ func Test_Server_InsufficientCallers(t *testing.T) { } func Test_Server_CapabilityError(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) numCapabilityPeers := 4 diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go index 46314dde418..ace17ca2dbc 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go @@ -145,8 +145,7 @@ func TestIntegration_LogEventProvider(t *testing.T) { } func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) backend, stopMining, accounts := setupBackend(t) defer stopMining() diff --git a/core/services/ocr2/plugins/threshold/decryption_queue_test.go b/core/services/ocr2/plugins/threshold/decryption_queue_test.go index 2a9f8d4c85b..a017b883b3d 100644 --- a/core/services/ocr2/plugins/threshold/decryption_queue_test.go +++ b/core/services/ocr2/plugins/threshold/decryption_queue_test.go @@ -36,8 +36,7 @@ func Test_decryptionQueue_Decrypt_ReturnResultAfterCallingDecrypt(t *testing.T) dq.SetResult([]byte("1"), []byte("decrypted"), nil) }() - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) pt, err := dq.Decrypt(ctx, []byte("1"), []byte("encrypted")) require.NoError(t, err) @@ -50,8 +49,7 @@ func Test_decryptionQueue_Decrypt_CiphertextIdTooLarge(t *testing.T) { lggr := logger.TestLogger(t) dq := NewDecryptionQueue(1, 1000, 16, testutils.WaitTimeout(t), lggr) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) _, err := dq.Decrypt(ctx, []byte("largeCiphertextId"), []byte("ciphertext")) assert.Equal(t, err.Error(), "ciphertextId too large") @@ -61,8 +59,7 @@ func Test_decryptionQueue_Decrypt_EmptyCiphertextId(t *testing.T) { lggr := logger.TestLogger(t) dq := NewDecryptionQueue(1, 1000, 64, testutils.WaitTimeout(t), lggr) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) _, err := dq.Decrypt(ctx, []byte(""), []byte("ciphertext")) assert.Equal(t, err.Error(), "ciphertextId is empty") @@ -72,8 +69,7 @@ func Test_decryptionQueue_Decrypt_CiphertextTooLarge(t *testing.T) { lggr := logger.TestLogger(t) dq := NewDecryptionQueue(1, 10, 64, testutils.WaitTimeout(t), lggr) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) _, err := dq.Decrypt(ctx, []byte("1"), []byte("largeciphertext")) assert.Equal(t, err.Error(), "ciphertext too large") @@ -83,8 +79,7 @@ func Test_decryptionQueue_Decrypt_EmptyCiphertext(t *testing.T) { lggr := logger.TestLogger(t) dq := NewDecryptionQueue(1, 1000, 64, testutils.WaitTimeout(t), lggr) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) _, err := dq.Decrypt(ctx, []byte("1"), []byte("")) assert.Equal(t, err.Error(), "ciphertext is empty") @@ -94,8 +89,7 @@ func Test_decryptionQueue_Decrypt_DuplicateCiphertextId(t *testing.T) { lggr := logger.TestLogger(t) dq := NewDecryptionQueue(1, 1000, 64, testutils.WaitTimeout(t), lggr) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) go func() { _, err := dq.Decrypt(ctx, []byte("1"), []byte("encrypted")) @@ -179,8 +173,7 @@ func Test_decryptionQueue_GetCiphertext(t *testing.T) { lggr := logger.TestLogger(t) dq := NewDecryptionQueue(3, 1000, 64, testutils.WaitTimeout(t), lggr) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) go func() { _, err := dq.Decrypt(ctx, []byte("7"), []byte("encrypted")) @@ -210,8 +203,7 @@ func Test_decryptionQueue_Decrypt_DecryptCalledAfterReadyResult(t *testing.T) { dq.SetResult([]byte("9"), []byte("decrypted"), nil) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) pt, err := dq.Decrypt(ctx, []byte("9"), []byte("encrypted")) require.NoError(t, err) @@ -264,8 +256,7 @@ func Test_decryptionQueue_Decrypt_UserErrorDuringDecryption(t *testing.T) { dq.SetResult(ciphertextId, nil, decryptionPlugin.ErrAggregation) }() - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) _, err := dq.Decrypt(ctx, ciphertextId, []byte("encrypted")) assert.Equal(t, err.Error(), "pending decryption request for ciphertextId 0x120f was closed without a response") @@ -281,8 +272,7 @@ func Test_decryptionQueue_Decrypt_HandleClosedChannelWithoutPlaintextResponse(t close(dq.pendingRequests[string(ciphertextId)].chPlaintext) }() - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) _, err := dq.Decrypt(ctx, ciphertextId, []byte("encrypted")) assert.Equal(t, err.Error(), "pending decryption request for ciphertextId 0x00ff was closed without a response") @@ -380,8 +370,7 @@ func Test_decryptionQueue_GetRequests_PendingRequestQueueShorterThanRequestCount lggr := logger.TestLogger(t) dq := NewDecryptionQueue(4, 1000, 64, testutils.WaitTimeout(t), lggr) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) go func() { _, err := dq.Decrypt(ctx, []byte("11"), []byte("encrypted")) @@ -425,8 +414,7 @@ func Test_decryptionQueue_Start(t *testing.T) { lggr := logger.TestLogger(t) dq := NewDecryptionQueue(4, 1000, 64, testutils.WaitTimeout(t), lggr) - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() + ctx := testutils.Context(t) err := dq.Start(ctx)