Skip to content

Commit

Permalink
Use cf_http.NewClient instead of http.Client
Browse files Browse the repository at this point in the history
[#84811946]

Signed-off-by: Atul Kshirsagar <[email protected]>
  • Loading branch information
krishicks authored and vito committed Jan 20, 2015
1 parent 28ee9ff commit a40bd6b
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 4 deletions.
3 changes: 2 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"time"

"github.com/cloudfoundry-incubator/cf_http"
"github.com/tedsuo/rata"
"github.com/vito/go-sse/sse"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ type Client interface {

func NewClient(url string) Client {
return &client{
httpClient: &http.Client{},
httpClient: cf_http.NewClient(),
reqGen: rata.NewRequestGenerator(url, Routes),
}
}
Expand Down
11 changes: 10 additions & 1 deletion cmd/receptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"strings"
"time"

"github.com/cloudfoundry-incubator/cf-debug-server"
cf_debug_server "github.com/cloudfoundry-incubator/cf-debug-server"
cf_lager "github.com/cloudfoundry-incubator/cf-lager"
"github.com/cloudfoundry-incubator/cf_http"
"github.com/cloudfoundry-incubator/natbeat"
"github.com/cloudfoundry-incubator/receptor/event"
"github.com/cloudfoundry-incubator/receptor/handlers"
Expand Down Expand Up @@ -111,6 +112,12 @@ var bbsWatchRetryWaitDuration = flag.Duration(
"Duration to wait before retrying watching the BBS when watching fails",
)

var communicationTimeout = flag.Duration(
"communicationTimeout",
10*time.Second,
"Timeout applied to all HTTP requests.",
)

const (
dropsondeDestination = "localhost:3457"
dropsondeOrigin = "receptor"
Expand All @@ -121,6 +128,8 @@ func main() {
cf_lager.AddFlags(flag.CommandLine)
flag.Parse()

cf_http.Initialize(*communicationTimeout)

logger := cf_lager.New("receptor")
logger.Info("starting")

Expand Down
10 changes: 8 additions & 2 deletions task_handler/task_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"net/http"
"os"
"regexp"

"github.com/cloudfoundry-incubator/cf_http"
"github.com/cloudfoundry-incubator/receptor/serialization"
Bbs "github.com/cloudfoundry-incubator/runtime-schema/bbs"
"github.com/cloudfoundry-incubator/runtime-schema/models"
Expand Down Expand Up @@ -38,15 +40,15 @@ func newTaskWorker(taskQueue <-chan models.Task, bbs Bbs.ReceptorBBS, logger lag
taskQueue: taskQueue,
bbs: bbs,
logger: logger,
httpClient: http.Client{},
httpClient: cf_http.NewClient(),
}
}

type taskWorker struct {
taskQueue <-chan models.Task
bbs Bbs.ReceptorBBS
logger lager.Logger
httpClient http.Client
httpClient *http.Client
}

func (t *taskWorker) Run(signals <-chan os.Signal, ready chan<- struct{}) error {
Expand Down Expand Up @@ -95,6 +97,10 @@ func (t *taskWorker) handleCompletedTask(task models.Task) {

response, err := t.httpClient.Do(request)
if err != nil {
matched, _ := regexp.MatchString("use of closed network connection", err.Error())
if matched {
continue
}
logger.Error("doing-request-failed", err)
return
}
Expand Down
50 changes: 50 additions & 0 deletions task_handler/task_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"net/http"
"net/url"
"os"
"time"

"github.com/cloudfoundry-incubator/cf_http"
"github.com/cloudfoundry-incubator/receptor/task_handler"
"github.com/cloudfoundry-incubator/runtime-schema/bbs/fake_bbs"
"github.com/cloudfoundry-incubator/runtime-schema/models"
Expand All @@ -27,9 +29,12 @@ var _ = Describe("TaskWorker", func() {

fakeServer *ghttp.Server
logger lager.Logger
timeout time.Duration
)

BeforeEach(func() {
timeout = 1 * time.Second
cf_http.Initialize(timeout)
fakeServer = ghttp.NewServer()

logger = lager.NewLogger("task-watcher-test")
Expand Down Expand Up @@ -225,6 +230,51 @@ var _ = Describe("TaskWorker", func() {
})
})
})

Context("when the request fails with a timeout", func() {
var sleepCh chan time.Duration

BeforeEach(func() {
sleepCh = make(chan time.Duration)
fakeServer.RouteToHandler("POST", "/the-callback/url", func(w http.ResponseWriter, req *http.Request) {
time.Sleep(<-sleepCh)
w.WriteHeader(200)
})
})

It("retries the request 2 more times", func() {
simulateTaskCompleting()
sleepCh <- timeout + 100*time.Millisecond
Eventually(fakeServer.ReceivedRequests).Should(HaveLen(1))

sleepCh <- timeout + 100*time.Millisecond
Consistently(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(0))
Eventually(fakeServer.ReceivedRequests).Should(HaveLen(2))

sleepCh <- timeout + 100*time.Millisecond
Consistently(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(0))
Eventually(fakeServer.ReceivedRequests).Should(HaveLen(3))

Eventually(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(0))
})

Context("when the request fails with timeout once and then succeeds", func() {
It("does resolves the task", func() {
simulateTaskCompleting()
sleepCh <- (timeout + 100*time.Millisecond)

Eventually(fakeServer.ReceivedRequests).Should(HaveLen(1))
Consistently(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(0))

sleepCh <- 0
Eventually(fakeServer.ReceivedRequests).Should(HaveLen(2))
Eventually(fakeBBS.ResolveTaskCallCount, 0.25).Should(Equal(1))

_, resolvedTaskGuid := fakeBBS.ResolveTaskArgsForCall(0)
Ω(resolvedTaskGuid).Should(Equal("the-task-guid"))
})
})
})
})
})

Expand Down

0 comments on commit a40bd6b

Please sign in to comment.