diff --git a/client.go b/client.go index 582772d..2476be9 100644 --- a/client.go +++ b/client.go @@ -10,6 +10,7 @@ import ( "strconv" "time" + "github.com/cloudfoundry-incubator/cf_http" "github.com/tedsuo/rata" "github.com/vito/go-sse/sse" ) @@ -54,7 +55,7 @@ type Client interface { func NewClient(url string) Client { return &client{ - httpClient: &http.Client{}, + httpClient: cf_http.NewClient(), reqGen: rata.NewRequestGenerator(url, Routes), } } diff --git a/cmd/receptor/main.go b/cmd/receptor/main.go index bd3499c..4135844 100644 --- a/cmd/receptor/main.go +++ b/cmd/receptor/main.go @@ -9,8 +9,9 @@ import ( "strings" "time" - "github.com/cloudfoundry-incubator/cf-debug-server" + cf_debug_server "github.com/cloudfoundry-incubator/cf-debug-server" cf_lager "github.com/cloudfoundry-incubator/cf-lager" + "github.com/cloudfoundry-incubator/cf_http" "github.com/cloudfoundry-incubator/natbeat" "github.com/cloudfoundry-incubator/receptor/event" "github.com/cloudfoundry-incubator/receptor/handlers" @@ -111,6 +112,12 @@ var bbsWatchRetryWaitDuration = flag.Duration( "Duration to wait before retrying watching the BBS when watching fails", ) +var communicationTimeout = flag.Duration( + "communicationTimeout", + 10*time.Second, + "Timeout applied to all HTTP requests.", +) + const ( dropsondeDestination = "localhost:3457" dropsondeOrigin = "receptor" @@ -121,6 +128,8 @@ func main() { cf_lager.AddFlags(flag.CommandLine) flag.Parse() + cf_http.Initialize(*communicationTimeout) + logger := cf_lager.New("receptor") logger.Info("starting") diff --git a/task_handler/task_worker.go b/task_handler/task_worker.go index 9237429..8fa1b76 100644 --- a/task_handler/task_worker.go +++ b/task_handler/task_worker.go @@ -6,7 +6,9 @@ import ( "fmt" "net/http" "os" + "regexp" + "github.com/cloudfoundry-incubator/cf_http" "github.com/cloudfoundry-incubator/receptor/serialization" Bbs "github.com/cloudfoundry-incubator/runtime-schema/bbs" "github.com/cloudfoundry-incubator/runtime-schema/models" @@ -38,7 +40,7 @@ func newTaskWorker(taskQueue <-chan models.Task, bbs Bbs.ReceptorBBS, logger lag taskQueue: taskQueue, bbs: bbs, logger: logger, - httpClient: http.Client{}, + httpClient: cf_http.NewClient(), } } @@ -46,7 +48,7 @@ type taskWorker struct { taskQueue <-chan models.Task bbs Bbs.ReceptorBBS logger lager.Logger - httpClient http.Client + httpClient *http.Client } func (t *taskWorker) Run(signals <-chan os.Signal, ready chan<- struct{}) error { @@ -95,6 +97,10 @@ func (t *taskWorker) handleCompletedTask(task models.Task) { response, err := t.httpClient.Do(request) if err != nil { + matched, _ := regexp.MatchString("use of closed network connection", err.Error()) + if matched { + continue + } logger.Error("doing-request-failed", err) return } diff --git a/task_handler/task_worker_test.go b/task_handler/task_worker_test.go index 6fbf1bd..133fcfb 100644 --- a/task_handler/task_worker_test.go +++ b/task_handler/task_worker_test.go @@ -5,7 +5,9 @@ import ( "net/http" "net/url" "os" + "time" + "github.com/cloudfoundry-incubator/cf_http" "github.com/cloudfoundry-incubator/receptor/task_handler" "github.com/cloudfoundry-incubator/runtime-schema/bbs/fake_bbs" "github.com/cloudfoundry-incubator/runtime-schema/models" @@ -27,9 +29,12 @@ var _ = Describe("TaskWorker", func() { fakeServer *ghttp.Server logger lager.Logger + timeout time.Duration ) BeforeEach(func() { + timeout = 1 * time.Second + cf_http.Initialize(timeout) fakeServer = ghttp.NewServer() logger = lager.NewLogger("task-watcher-test") @@ -225,6 +230,51 @@ var _ = Describe("TaskWorker", func() { }) }) }) + + Context("when the request fails with a timeout", func() { + var sleepCh chan time.Duration + + BeforeEach(func() { + sleepCh = make(chan time.Duration) + fakeServer.RouteToHandler("POST", "/the-callback/url", func(w http.ResponseWriter, req *http.Request) { + time.Sleep(<-sleepCh) + w.WriteHeader(200) + }) + }) + + It("retries the request 2 more times", func() { + simulateTaskCompleting() + sleepCh <- timeout + 100*time.Millisecond + Eventually(fakeServer.ReceivedRequests).Should(HaveLen(1)) + + sleepCh <- timeout + 100*time.Millisecond + Consistently(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(0)) + Eventually(fakeServer.ReceivedRequests).Should(HaveLen(2)) + + sleepCh <- timeout + 100*time.Millisecond + Consistently(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(0)) + Eventually(fakeServer.ReceivedRequests).Should(HaveLen(3)) + + Eventually(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(0)) + }) + + Context("when the request fails with timeout once and then succeeds", func() { + It("does resolves the task", func() { + simulateTaskCompleting() + sleepCh <- (timeout + 100*time.Millisecond) + + Eventually(fakeServer.ReceivedRequests).Should(HaveLen(1)) + Consistently(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(0)) + + sleepCh <- 0 + Eventually(fakeServer.ReceivedRequests).Should(HaveLen(2)) + Eventually(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(1)) + + _, resolvedTaskGuid := fakeBBS.ResolveTaskArgsForCall(0) + Ω(resolvedTaskGuid).Should(Equal("the-task-guid")) + }) + }) + }) }) })