Skip to content

Commit

Permalink
core/capabilities/remote/target: defer WaitGroup.Done; Close services…
Browse files Browse the repository at this point in the history
… on t.Cleanup; elinimate redundant context cancellation (#13492)
  • Loading branch information
jmank88 authored Jun 11, 2024
1 parent 15f02f6 commit 813c060
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 68 deletions.
21 changes: 7 additions & 14 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand All @@ -22,8 +22,7 @@ import (
)

func Test_Client_DonTopologies(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_OneAtATime,
Expand Down Expand Up @@ -60,8 +59,7 @@ func Test_Client_DonTopologies(t *testing.T) {
}

func Test_Client_TransmissionSchedules(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
Expand Down Expand Up @@ -99,8 +97,7 @@ func Test_Client_TransmissionSchedules(t *testing.T) {
}

func Test_Client_TimesOutIfInsufficientCapabilityPeerResponses(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
Expand Down Expand Up @@ -166,14 +163,13 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
}

callers := make([]commoncap.TargetCapability, numWorkflowPeers)
srvcs := make([]services.Service, numWorkflowPeers)

for i := 0; i < numWorkflowPeers; i++ {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
caller := target.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout, lggr)
require.NoError(t, caller.Start(ctx))
servicetest.Run(t, caller)
broker.RegisterReceiverNode(workflowPeers[i], caller)
callers[i] = caller
srvcs[i] = caller
}

executeInputs, err := values.NewMap(
Expand All @@ -190,6 +186,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
// Fire off all the requests
for _, caller := range callers {
go func(caller commoncap.TargetCapability) {
defer wg.Done()
responseCh, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
Expand All @@ -201,14 +198,10 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
})

responseTest(t, responseCh, err)
wg.Done()
}(caller)
}

wg.Wait()
for i := 0; i < numWorkflowPeers; i++ {
require.NoError(t, srvcs[i].Close())
}
}

// Simple client that only responds once it has received a message from each workflow peer
Expand Down
32 changes: 10 additions & 22 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand All @@ -25,8 +25,7 @@ import (
)

func Test_RemoteTargetCapability_InsufficientCapabilityResponses(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
Expand All @@ -46,8 +45,7 @@ func Test_RemoteTargetCapability_InsufficientCapabilityResponses(t *testing.T) {
}

func Test_RemoteTargetCapability_InsufficientWorkflowRequests(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
Expand All @@ -69,8 +67,7 @@ func Test_RemoteTargetCapability_InsufficientWorkflowRequests(t *testing.T) {
}

func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
Expand Down Expand Up @@ -102,8 +99,7 @@ func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) {
}

func Test_RemoteTargetCapability_DonTopologies(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
Expand Down Expand Up @@ -138,8 +134,7 @@ func Test_RemoteTargetCapability_DonTopologies(t *testing.T) {
}

func Test_RemoteTargetCapability_CapabilityError(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
Expand All @@ -159,8 +154,7 @@ func Test_RemoteTargetCapability_CapabilityError(t *testing.T) {
}

func Test_RemoteTargetCapability_RandomCapabilityError(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
Expand Down Expand Up @@ -226,27 +220,24 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
workflowDonInfo.ID: workflowDonInfo,
}

srvcs := []services.Service{}
capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers)
for i := 0; i < numCapabilityPeers; i++ {
capabilityPeer := capabilityPeers[i]
capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer)
capabilityNode := target.NewServer(capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher,
capabilityNodeResponseTimeout, lggr)
require.NoError(t, capabilityNode.Start(ctx))
servicetest.Run(t, capabilityNode)
broker.RegisterReceiverNode(capabilityPeer, capabilityNode)
capabilityNodes[i] = capabilityNode
srvcs = append(srvcs, capabilityNode)
}

workflowNodes := make([]commoncap.TargetCapability, numWorkflowPeers)
for i := 0; i < numWorkflowPeers; i++ {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
workflowNode := target.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout, lggr)
require.NoError(t, workflowNode.Start(ctx))
servicetest.Run(t, workflowNode)
broker.RegisterReceiverNode(workflowPeers[i], workflowNode)
workflowNodes[i] = workflowNode
srvcs = append(srvcs, workflowNode)
}

executeInputs, err := values.NewMap(
Expand All @@ -262,6 +253,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta

for _, caller := range workflowNodes {
go func(caller commoncap.TargetCapability) {
defer wg.Done()
responseCh, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
Expand All @@ -273,14 +265,10 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
})

responseTest(t, responseCh, err)
wg.Done()
}(caller)
}

wg.Wait()
for _, srv := range srvcs {
require.NoError(t, srv.Close())
}
}

type testMessageBroker struct {
Expand Down
9 changes: 3 additions & 6 deletions core/capabilities/remote/target/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
)

func Test_Server_RespondsAfterSufficientRequests(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

numCapabilityPeers := 4

Expand All @@ -47,8 +46,7 @@ func Test_Server_RespondsAfterSufficientRequests(t *testing.T) {
}

func Test_Server_InsufficientCallers(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

numCapabilityPeers := 4

Expand All @@ -75,8 +73,7 @@ func Test_Server_InsufficientCallers(t *testing.T) {
}

func Test_Server_CapabilityError(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

numCapabilityPeers := 4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ func TestIntegration_LogEventProvider(t *testing.T) {
}

func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

backend, stopMining, accounts := setupBackend(t)
defer stopMining()
Expand Down
36 changes: 12 additions & 24 deletions core/services/ocr2/plugins/threshold/decryption_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func Test_decryptionQueue_Decrypt_ReturnResultAfterCallingDecrypt(t *testing.T)
dq.SetResult([]byte("1"), []byte("decrypted"), nil)
}()

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

pt, err := dq.Decrypt(ctx, []byte("1"), []byte("encrypted"))
require.NoError(t, err)
Expand All @@ -50,8 +49,7 @@ func Test_decryptionQueue_Decrypt_CiphertextIdTooLarge(t *testing.T) {
lggr := logger.TestLogger(t)
dq := NewDecryptionQueue(1, 1000, 16, testutils.WaitTimeout(t), lggr)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

_, err := dq.Decrypt(ctx, []byte("largeCiphertextId"), []byte("ciphertext"))
assert.Equal(t, err.Error(), "ciphertextId too large")
Expand All @@ -61,8 +59,7 @@ func Test_decryptionQueue_Decrypt_EmptyCiphertextId(t *testing.T) {
lggr := logger.TestLogger(t)
dq := NewDecryptionQueue(1, 1000, 64, testutils.WaitTimeout(t), lggr)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

_, err := dq.Decrypt(ctx, []byte(""), []byte("ciphertext"))
assert.Equal(t, err.Error(), "ciphertextId is empty")
Expand All @@ -72,8 +69,7 @@ func Test_decryptionQueue_Decrypt_CiphertextTooLarge(t *testing.T) {
lggr := logger.TestLogger(t)
dq := NewDecryptionQueue(1, 10, 64, testutils.WaitTimeout(t), lggr)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

_, err := dq.Decrypt(ctx, []byte("1"), []byte("largeciphertext"))
assert.Equal(t, err.Error(), "ciphertext too large")
Expand All @@ -83,8 +79,7 @@ func Test_decryptionQueue_Decrypt_EmptyCiphertext(t *testing.T) {
lggr := logger.TestLogger(t)
dq := NewDecryptionQueue(1, 1000, 64, testutils.WaitTimeout(t), lggr)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

_, err := dq.Decrypt(ctx, []byte("1"), []byte(""))
assert.Equal(t, err.Error(), "ciphertext is empty")
Expand All @@ -94,8 +89,7 @@ func Test_decryptionQueue_Decrypt_DuplicateCiphertextId(t *testing.T) {
lggr := logger.TestLogger(t)
dq := NewDecryptionQueue(1, 1000, 64, testutils.WaitTimeout(t), lggr)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

go func() {
_, err := dq.Decrypt(ctx, []byte("1"), []byte("encrypted"))
Expand Down Expand Up @@ -179,8 +173,7 @@ func Test_decryptionQueue_GetCiphertext(t *testing.T) {
lggr := logger.TestLogger(t)
dq := NewDecryptionQueue(3, 1000, 64, testutils.WaitTimeout(t), lggr)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

go func() {
_, err := dq.Decrypt(ctx, []byte("7"), []byte("encrypted"))
Expand Down Expand Up @@ -210,8 +203,7 @@ func Test_decryptionQueue_Decrypt_DecryptCalledAfterReadyResult(t *testing.T) {

dq.SetResult([]byte("9"), []byte("decrypted"), nil)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

pt, err := dq.Decrypt(ctx, []byte("9"), []byte("encrypted"))
require.NoError(t, err)
Expand Down Expand Up @@ -264,8 +256,7 @@ func Test_decryptionQueue_Decrypt_UserErrorDuringDecryption(t *testing.T) {
dq.SetResult(ciphertextId, nil, decryptionPlugin.ErrAggregation)
}()

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

_, err := dq.Decrypt(ctx, ciphertextId, []byte("encrypted"))
assert.Equal(t, err.Error(), "pending decryption request for ciphertextId 0x120f was closed without a response")
Expand All @@ -281,8 +272,7 @@ func Test_decryptionQueue_Decrypt_HandleClosedChannelWithoutPlaintextResponse(t
close(dq.pendingRequests[string(ciphertextId)].chPlaintext)
}()

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

_, err := dq.Decrypt(ctx, ciphertextId, []byte("encrypted"))
assert.Equal(t, err.Error(), "pending decryption request for ciphertextId 0x00ff was closed without a response")
Expand Down Expand Up @@ -380,8 +370,7 @@ func Test_decryptionQueue_GetRequests_PendingRequestQueueShorterThanRequestCount
lggr := logger.TestLogger(t)
dq := NewDecryptionQueue(4, 1000, 64, testutils.WaitTimeout(t), lggr)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

go func() {
_, err := dq.Decrypt(ctx, []byte("11"), []byte("encrypted"))
Expand Down Expand Up @@ -425,8 +414,7 @@ func Test_decryptionQueue_Start(t *testing.T) {
lggr := logger.TestLogger(t)
dq := NewDecryptionQueue(4, 1000, 64, testutils.WaitTimeout(t), lggr)

ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()
ctx := testutils.Context(t)

err := dq.Start(ctx)

Expand Down

0 comments on commit 813c060

Please sign in to comment.