Skip to content

Commit

Permalink
Implement Send And Get Event APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
muralov committed Sep 7, 2023
1 parent 01f5bd7 commit 7d6d4a0
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 0 deletions.
20 changes: 20 additions & 0 deletions e2e/sink/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"github.com/kyma-project/eventing-manager/sink/internal/handler"
"go.uber.org/zap"
)

func main() {
logger, err := zap.NewProduction()
if err != nil {
panic(err)
}
defer logger.Sync()

sHandler := handler.NewSinkHandler(logger)
err = sHandler.Start()
if err != nil {
logger.Error("failed to start SinkHandler", zap.Error(err))
}
}
17 changes: 17 additions & 0 deletions e2e/sink/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module github.com/kyma-project/eventing-manager/sink

go 1.20

require (
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/gorilla/mux v1.8.0
go.uber.org/zap v1.10.0
)

require (
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
)
32 changes: 32 additions & 0 deletions e2e/sink/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s=
github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
115 changes: 115 additions & 0 deletions e2e/sink/internal/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package handler

import (
"context"
"net/http"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/gorilla/mux"
"go.uber.org/zap"

cev2event "github.com/cloudevents/sdk-go/v2/event"
cev2http "github.com/cloudevents/sdk-go/v2/protocol/http"
)

type Handler interface {
Start() error
}

type SinkHandler struct {
logger *zap.Logger
events map[string]*cev2event.Event
}

func NewSinkHandler(logger *zap.Logger) *SinkHandler {
return &SinkHandler{
logger: logger,
events: make(map[string]*cev2event.Event),
}
}

func (h *SinkHandler) Start() error {
router := mux.NewRouter()
router.HandleFunc("/event", h.StoreEvent).Methods(http.MethodPost)
router.HandleFunc("/event/{eventID}", h.GetEvent).Methods(http.MethodGet)

return http.ListenAndServe(":8080", router)
}

func (h *SinkHandler) StoreEvent(w http.ResponseWriter, r *http.Request) {
event, err := extractCloudEventFromRequest(r)
if err != nil {
h.namedLogger().With().Error("failed to extract CloudEvent from request", zap.Error(err))
e := writeResponse(w, http.StatusBadRequest, []byte(err.Error()))
if e != nil {
h.namedLogger().Error("failed to write response", zap.Error(e))
}
return
}

h.events[event.ID()] = event
err = writeResponse(w, http.StatusNoContent, []byte(""))
if err != nil {
h.namedLogger().Error("failed to write response", zap.Error(err))
}
}

func (h *SinkHandler) GetEvent(w http.ResponseWriter, r *http.Request) {
eventID := mux.Vars(r)["eventID"]
event, ok := h.events[eventID]
if !ok {
h.namedLogger().With().Error("event not found", zap.String("eventID", eventID))
e := writeResponse(w, http.StatusNotFound, []byte("event not found"))
if e != nil {
h.namedLogger().Error("failed to write response", zap.Error(e))
}
return
}

respBody, err := event.MarshalJSON()
if err != nil {
h.namedLogger().With().Error("failed to marshal event", zap.Error(err))
e := writeResponse(w, http.StatusInternalServerError, []byte(err.Error()))
if e != nil {
h.namedLogger().Error("failed to write response", zap.Error(e))
}
return
}

err = writeResponse(w, http.StatusOK, respBody)
if err != nil {
h.namedLogger().Error("failed to write response", zap.Error(err))
}
}

func (h *SinkHandler) namedLogger() *zap.Logger {
return h.logger.Named("sink-handler")
}

// extractCloudEventFromRequest converts an incoming CloudEvent request to an Event.
func extractCloudEventFromRequest(r *http.Request) (*cev2event.Event, error) {
message := cev2http.NewMessageFromHttpRequest(r)
defer func() { _ = message.Finish(nil) }()

event, err := binding.ToEvent(context.Background(), message)
if err != nil {
return nil, err
}

err = event.Validate()
if err != nil {
return nil, err
}
return event, nil
}

// writeResponse writes the HTTP response given the status code and response body.
func writeResponse(writer http.ResponseWriter, statusCode int, respBody []byte) error {
writer.WriteHeader(statusCode)

if respBody == nil {
return nil
}
_, err := writer.Write(respBody)
return err
}

0 comments on commit 7d6d4a0

Please sign in to comment.