diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index 27c43fe0a53..98318853e2a 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -398,7 +398,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability } var ( - defaultTargetRequestTimeout = time.Minute + // TODO: make this configurable + defaultTargetRequestTimeout = 8 * time.Minute ) func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don registrysyncer.DON, state *registrysyncer.LocalRegistry, remoteWorkflowDONs []registrysyncer.DON) error { diff --git a/core/capabilities/remote/executable/client.go b/core/capabilities/remote/executable/client.go index 9af32eb5f8e..776ddb692ad 100644 --- a/core/capabilities/remote/executable/client.go +++ b/core/capabilities/remote/executable/client.go @@ -41,6 +41,8 @@ var _ commoncap.ExecutableCapability = &client{} var _ types.Receiver = &client{} var _ services.Service = &client{} +const expiryCheckInterval = 30 * time.Second + func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *client { return &client{ @@ -98,7 +100,11 @@ func (c *client) checkDispatcherReady() { } func (c *client) checkForExpiredRequests() { - ticker := time.NewTicker(c.requestTimeout) + tickerInterval := expiryCheckInterval + if c.requestTimeout < tickerInterval { + tickerInterval = c.requestTimeout + } + ticker := time.NewTicker(tickerInterval) defer ticker.Stop() for { select { @@ -116,7 +122,7 @@ func (c *client) expireRequests() { for messageID, req := range c.requestIDToCallerRequest { if req.Expired() { - req.Cancel(errors.New("request expired")) + req.Cancel(errors.New("request expired by executable client")) delete(c.requestIDToCallerRequest, messageID) } diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 4e78fead87e..8be92e878ad 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -102,7 +102,7 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) { methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) { - assert.Equal(t, "error executing request: request expired", responseError.Error()) + assert.Equal(t, "error executing request: request expired by executable client", responseError.Error()) }) }) diff --git a/core/capabilities/remote/executable/server.go b/core/capabilities/remote/executable/server.go index b767a2d7030..d43c7ab5c41 100644 --- a/core/capabilities/remote/executable/server.go +++ b/core/capabilities/remote/executable/server.go @@ -87,7 +87,11 @@ func (r *server) Start(ctx context.Context) error { r.wg.Add(1) go func() { defer r.wg.Done() - ticker := time.NewTicker(r.requestTimeout) + tickerInterval := expiryCheckInterval + if r.requestTimeout < tickerInterval { + tickerInterval = r.requestTimeout + } + ticker := time.NewTicker(tickerInterval) defer ticker.Stop() r.lggr.Info("executable capability server started") for { @@ -118,7 +122,7 @@ func (r *server) expireRequests() { for requestID, executeReq := range r.requestIDToRequest { if executeReq.request.Expired() { - err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired") + err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired by executable server") if err != nil { r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err) }