diff --git a/event_source.go b/event_source.go index 807672d..26b0abe 100644 --- a/event_source.go +++ b/event_source.go @@ -4,17 +4,20 @@ import ( "encoding/json" "errors" "fmt" + "io" "github.com/vito/go-sse/sse" ) var ErrUnrecognizedEventType = errors.New("unrecognized event type") +var ErrSourceClosed = errors.New("source closed") + type invalidPayloadError struct { jsonErr error } -func NewInvalidPayloadError(jsonErr error) invalidPayloadError { +func NewInvalidPayloadError(jsonErr error) error { return invalidPayloadError{jsonErr: jsonErr} } @@ -26,7 +29,7 @@ type rawEventSourceError struct { rawError error } -func NewRawEventSourceError(rawError error) rawEventSourceError { +func NewRawEventSourceError(rawError error) error { return rawEventSourceError{rawError: rawError} } @@ -38,7 +41,7 @@ type closeError struct { err error } -func NewCloseError(err error) closeError { +func NewCloseError(err error) error { return closeError{err: err} } @@ -62,7 +65,7 @@ type eventSource struct { rawEventSource RawEventSource } -func NewEventSource(raw RawEventSource) *eventSource { +func NewEventSource(raw RawEventSource) EventSource { return &eventSource{ rawEventSource: raw, } @@ -71,7 +74,16 @@ func NewEventSource(raw RawEventSource) *eventSource { func (e *eventSource) Next() (Event, error) { rawEvent, err := e.rawEventSource.Next() if err != nil { - return nil, NewRawEventSourceError(err) + switch err { + case io.EOF: + return nil, err + + case sse.ErrSourceClosed: + return nil, ErrSourceClosed + + default: + return nil, NewRawEventSourceError(err) + } } return parseRawEvent(rawEvent) diff --git a/event_source_test.go b/event_source_test.go index 2ad915a..3ff38e9 100644 --- a/event_source_test.go +++ b/event_source_test.go @@ -3,6 +3,7 @@ package receptor_test import ( "encoding/json" "errors" + "io" "github.com/cloudfoundry-incubator/receptor" "github.com/cloudfoundry-incubator/receptor/fake_receptor" @@ -277,6 +278,28 @@ var _ = Describe("EventSource", func() { Ω(err).Should(Equal(receptor.NewRawEventSourceError(rawError))) }) }) + + Context("when the raw event source returns io.EOF", func() { + BeforeEach(func() { + fakeRawEventSource.NextReturns(sse.Event{}, io.EOF) + }) + + It("returns io.EOF", func() { + _, err := eventSource.Next() + Ω(err).Should(Equal(io.EOF)) + }) + }) + + Context("when the raw event source returns sse.ErrSourceClosed", func() { + BeforeEach(func() { + fakeRawEventSource.NextReturns(sse.Event{}, sse.ErrSourceClosed) + }) + + It("returns receptor.ErrSourceClosed", func() { + _, err := eventSource.Next() + Ω(err).Should(Equal(receptor.ErrSourceClosed)) + }) + }) }) Describe("Close", func() {