Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 committed Apr 20, 2024
1 parent 8b82b2e commit dbcfd03
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 23 deletions.
17 changes: 11 additions & 6 deletions pkg/http/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,19 @@ func (d DefaultSSEResponder) channelEventStream(w http.ResponseWriter, r *http.R
}
}

event, err := d.marshaler.Marshal(ctx, v)
if err != nil {
_, _ = w.Write([]byte(fmt.Sprintf("event: error\ndata: {\"error\":\"%v\"}\n\n", err)))
if f, ok := w.(http.Flusher); ok {
f.Flush()
event, ok := v.(ServerSentEvent)
if !ok {
var err error
event, err = d.marshaler.Marshal(ctx, v)
if err != nil {
_, _ = w.Write([]byte(fmt.Sprintf("event: error\ndata: {\"error\":\"%v\"}\n\n", err)))
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
continue
}
continue
}

_, _ = w.Write([]byte(fmt.Sprintf("event: %s\ndata: %s\n\n", event.Event, event.Data)))
if f, ok := w.(http.Flusher); ok {
f.Flush()
Expand Down
53 changes: 37 additions & 16 deletions pkg/http/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,26 +215,24 @@ func (h sseHandler) handleEventStream(w http.ResponseWriter, r *http.Request) {
responder.Respond(w, r, responsesChan)
}

type ResponseFunc[T any] func(r *http.Request) (response T, err error)
type OneModelResponseFunc[T any] func(r *http.Request) (response T, err error)
type EventsResponseFunc[T any] func(r *http.Request, msg *message.Message) (response T, err error)
type ValidFunc func(r *http.Request, msg *message.Message) (ok bool)

type DefaultStreamAdapter[T any] struct {
responseFunc ResponseFunc[T]
validFunc ValidFunc
type OneModelStreamAdapter[T any] struct {
responseFunc OneModelResponseFunc[T]
}

func NewDefaultStreamAdapter[T any](
responseFunc ResponseFunc[T],
validFunc ValidFunc,
) DefaultStreamAdapter[T] {
return DefaultStreamAdapter[T]{
func NewOneModelStreamAdapter[T any](
responseFunc OneModelResponseFunc[T],
) OneModelStreamAdapter[T] {
return OneModelStreamAdapter[T]{
responseFunc: responseFunc,
validFunc: validFunc,
}
}

func (d DefaultStreamAdapter[T]) InitialStreamResponse(w http.ResponseWriter, r *http.Request) (response T, ok bool) {
resp, err := d.responseFunc(r)
func (a OneModelStreamAdapter[T]) InitialStreamResponse(w http.ResponseWriter, r *http.Request) (response T, ok bool) {
resp, err := a.responseFunc(r)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
var empty T
Expand All @@ -244,14 +242,37 @@ func (d DefaultStreamAdapter[T]) InitialStreamResponse(w http.ResponseWriter, r
return resp, true
}

func (d DefaultStreamAdapter[T]) NextStreamResponse(r *http.Request, msg *message.Message) (response T, ok bool) {
var empty T
if !d.validFunc(r, msg) {
func (a OneModelStreamAdapter[T]) NextStreamResponse(r *http.Request, msg *message.Message) (response T, ok bool) {
resp, err := a.responseFunc(r)
if err != nil {
var empty T
return empty, false
}

resp, err := d.responseFunc(r)
return resp, true
}

type EventsStreamAdapter[T any] struct {
responseFunc EventsResponseFunc[T]
}

func NewEventsStreamAdapter[T any](
responseFunc EventsResponseFunc[T],
) EventsStreamAdapter[T] {
return EventsStreamAdapter[T]{
responseFunc: responseFunc,
}
}

func (a EventsStreamAdapter[T]) InitialStreamResponse(w http.ResponseWriter, r *http.Request) (response T, ok bool) {
var empty T
return empty, true
}

func (a EventsStreamAdapter[T]) NextStreamResponse(r *http.Request, msg *message.Message) (response T, ok bool) {
resp, err := a.responseFunc(r, msg)
if err != nil {
var empty T
return empty, false
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/http/sse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestSSE(t *testing.T) {
postsRepository: postsRepositoryMock{},
})

postStreamAdapterFunc := http.NewDefaultStreamAdapter(
postStreamAdapterFunc := http.NewOneModelStreamAdapter(
func(r *netHTTP.Request) (response Post, err error) {
postID := chi.URLParam(r, "id")

Expand Down

0 comments on commit dbcfd03

Please sign in to comment.