Skip to content

Commit

Permalink
refactor event stream handler close detection
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Gupta <[email protected]>
  • Loading branch information
vito authored and Amit Gupta committed Jan 27, 2015
1 parent 269dfd7 commit c0a7638
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 105 deletions.
72 changes: 28 additions & 44 deletions handlers/event_stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net/http"
"strconv"

"github.com/cloudfoundry-incubator/receptor"
"github.com/cloudfoundry-incubator/receptor/event"
"github.com/pivotal-golang/lager"
"github.com/vito/go-sse/sse"
Expand All @@ -19,12 +18,13 @@ type EventStreamHandler struct {
func NewEventStreamHandler(hub event.Hub, logger lager.Logger) *EventStreamHandler {
return &EventStreamHandler{
hub: hub,
logger: logger.Session("event-stream-handler"),
logger: logger,
}
}

func (h *EventStreamHandler) EventStream(w http.ResponseWriter, req *http.Request) {
logger := h.logger.Session("sink")
logger := h.logger.Session("event-stream-handler")

closeNotifier := w.(http.CloseNotifier).CloseNotify()

flusher := w.(http.Flusher)
Expand All @@ -35,59 +35,43 @@ func (h *EventStreamHandler) EventStream(w http.ResponseWriter, req *http.Reques
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() {
err := source.Close()
if err != nil {
logger.Debug("failed-to-close-event-source", lager.Data{"error-msg": err.Error()})
}

defer source.Close()

go func() {
<-closeNotifier
source.Close()
}()

w.WriteHeader(http.StatusOK)

flusher.Flush()

eventID := 0
errChan := make(chan error, 1)
eventChan := make(chan receptor.Event)

for {
go func() {
e, err := source.Next()
if err != nil {
errChan <- err
} else if e != nil {
eventChan <- e
}
}()

select {
case event := <-eventChan:
payload, err := json.Marshal(event)
if err != nil {
logger.Error("failed-to-marshal-event", err)
return
}

err = sse.Event{
ID: strconv.Itoa(eventID),
Name: string(event.EventType()),
Data: payload,
}.Write(w)
if err != nil {
break
}

flusher.Flush()

eventID++

case err := <-errChan:
event, err := source.Next()
if err != nil {
logger.Error("failed-to-get-next-event", err)
return
}

case <-closeNotifier:
logger.Info("client-closed-response-body")
payload, err := json.Marshal(event)
if err != nil {
logger.Error("failed-to-marshal-event", err)
return
}

err = sse.Event{
ID: strconv.Itoa(eventID),
Name: string(event.EventType()),
Data: payload,
}.Write(w)
if err != nil {
break
}

flusher.Flush()

eventID++
}
}
76 changes: 15 additions & 61 deletions handlers/event_stream_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package handlers_test

import (
"errors"
"io"
"net/http"
"net/http/httptest"

"github.com/cloudfoundry-incubator/receptor"
"github.com/cloudfoundry-incubator/receptor/event/eventfakes"
"github.com/cloudfoundry-incubator/receptor/fake_receptor"
"github.com/cloudfoundry-incubator/receptor/event"
"github.com/cloudfoundry-incubator/receptor/handlers"
"github.com/pivotal-golang/lager"
"github.com/vito/go-sse/sse"
Expand All @@ -35,53 +33,38 @@ func (unmarshalableEvent) EventType() receptor.EventType {

var _ = Describe("Event Stream Handlers", func() {
var (
logger lager.Logger
fakeHub *eventfakes.FakeHub
logger lager.Logger
hub event.Hub

handler *handlers.EventStreamHandler

server *httptest.Server
)

BeforeEach(func() {
fakeHub = new(eventfakes.FakeHub)
hub = event.NewHub()

logger = lager.NewLogger("test")
logger.RegisterSink(lager.NewWriterSink(GinkgoWriter, lager.DEBUG))

handler = handlers.NewEventStreamHandler(fakeHub, logger)
handler = handlers.NewEventStreamHandler(hub, logger)
})

AfterEach(func() {
hub.Close()

if server != nil {
server.Close()
}
})

Describe("EventStream", func() {
var (
fakeSource *fake_receptor.FakeEventSource

eventsToEmit chan<- receptor.Event

response *http.Response
eventStreamDone chan struct{}
)

BeforeEach(func() {
events := make(chan receptor.Event, 10)
eventsToEmit = events

fakeSource = new(fake_receptor.FakeEventSource)
fakeSource.NextStub = func() (receptor.Event, error) {
e, ok := <-events
if !ok {
return nil, errors.New("event stream ended")
}

return e, nil
}

eventStreamDone = make(chan struct{})
server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler.EventStream(w, r)
Expand All @@ -95,15 +78,9 @@ var _ = Describe("Event Stream Handlers", func() {
Ω(err).ShouldNot(HaveOccurred())
})

AfterEach(func() {
// don't care if it's already closed
defer func() { recover() }()
close(eventsToEmit)
})

Context("when failing to subscribe to the event hub", func() {
BeforeEach(func() {
fakeHub.SubscribeReturns(nil, errors.New("failed-to-subscribe"))
hub.Close()
})

It("returns an internal server error", func() {
Expand All @@ -112,22 +89,18 @@ var _ = Describe("Event Stream Handlers", func() {
})

Context("when successfully subscribing to the event hub", func() {
BeforeEach(func() {
fakeHub.SubscribeReturns(fakeSource, nil)
})

It("emits events from the hub to the connection", func() {
reader := sse.NewReadCloser(response.Body)

eventsToEmit <- fakeEvent{"A"}
hub.Emit(fakeEvent{"A"})

Ω(reader.Next()).Should(Equal(sse.Event{
ID: "0",
Name: "fake",
Data: []byte(`{"token":"A"}`),
}))

eventsToEmit <- fakeEvent{"B"}
hub.Emit(fakeEvent{"B"})

Ω(reader.Next()).Should(Equal(sse.Event{
ID: "1",
Expand All @@ -137,53 +110,34 @@ var _ = Describe("Event Stream Handlers", func() {
})

Context("when the source provides an unmarshalable event", func() {
BeforeEach(func() {
unmarshalable := unmarshalableEvent{Fn: func() {}}
eventsToEmit <- unmarshalable
})

It("closes the event stream to the client", func() {
hub.Emit(unmarshalableEvent{Fn: func() {}})

reader := sse.NewReadCloser(response.Body)
_, err := reader.Next()
Ω(err).Should(Equal(io.EOF))
})

It("closes the event source", func() {
Eventually(fakeSource.CloseCallCount).Should(Equal(1))
})
})

Context("when the event source returns an error", func() {
BeforeEach(func() {
close(eventsToEmit)
hub.Close()
})

It("closes the client event stream", func() {
reader := sse.NewReadCloser(response.Body)
_, err := reader.Next()
Ω(err).Should(Equal(io.EOF))
})

It("close the event source", func() {
Eventually(fakeSource.CloseCallCount).Should(Equal(1))
})
})

Context("when the client closes the response body", func() {
It("closes its connection to the hub", func() {
reader := sse.NewReadCloser(response.Body)
eventsToEmit <- fakeEvent{"A"}
err := reader.Close()
Ω(err).ShouldNot(HaveOccurred())
Eventually(fakeSource.CloseCallCount).Should(Equal(1))
})

It("returns early", func() {
reader := sse.NewReadCloser(response.Body)
eventsToEmit <- fakeEvent{"A"}
hub.Emit(fakeEvent{"A"})
err := reader.Close()
Ω(err).ShouldNot(HaveOccurred())
Eventually(eventStreamDone).Should(BeClosed())
Eventually(eventStreamDone, 10).Should(BeClosed())
})
})
})
Expand Down

0 comments on commit c0a7638

Please sign in to comment.