diff --git a/e2e/sink/cmd/main.go b/e2e/sink/cmd/main.go new file mode 100644 index 000000000..5fd6d0624 --- /dev/null +++ b/e2e/sink/cmd/main.go @@ -0,0 +1,20 @@ +package main + +import ( + "github.com/kyma-project/eventing-manager/sink/internal/handler" + "go.uber.org/zap" +) + +func main() { + logger, err := zap.NewProduction() + if err != nil { + panic(err) + } + defer logger.Sync() + + sHandler := handler.NewSinkHandler(logger) + err = sHandler.Start() + if err != nil { + logger.Error("failed to start SinkHandler", zap.Error(err)) + } +} diff --git a/e2e/sink/go.mod b/e2e/sink/go.mod new file mode 100644 index 000000000..843b7cc61 --- /dev/null +++ b/e2e/sink/go.mod @@ -0,0 +1,17 @@ +module github.com/kyma-project/eventing-manager/sink + +go 1.20 + +require ( + github.com/cloudevents/sdk-go/v2 v2.14.0 + github.com/gorilla/mux v1.8.0 + go.uber.org/zap v1.10.0 +) + +require ( + github.com/json-iterator/go v1.1.10 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.1.0 // indirect +) diff --git a/e2e/sink/go.sum b/e2e/sink/go.sum new file mode 100644 index 000000000..70a2a0997 --- /dev/null +++ b/e2e/sink/go.sum @@ -0,0 +1,32 @@ +github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s= +github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= +github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/e2e/sink/internal/handler/handler.go b/e2e/sink/internal/handler/handler.go new file mode 100644 index 000000000..b7b97ec27 --- /dev/null +++ b/e2e/sink/internal/handler/handler.go @@ -0,0 +1,115 @@ +package handler + +import ( + "context" + "net/http" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/gorilla/mux" + "go.uber.org/zap" + + cev2event "github.com/cloudevents/sdk-go/v2/event" + cev2http "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +type Handler interface { + Start() error +} + +type SinkHandler struct { + logger *zap.Logger + events map[string]*cev2event.Event +} + +func NewSinkHandler(logger *zap.Logger) *SinkHandler { + return &SinkHandler{ + logger: logger, + events: make(map[string]*cev2event.Event), + } +} + +func (h *SinkHandler) Start() error { + router := mux.NewRouter() + router.HandleFunc("/event", h.StoreEvent).Methods(http.MethodPost) + router.HandleFunc("/event/{eventID}", h.GetEvent).Methods(http.MethodGet) + + return http.ListenAndServe(":8080", router) +} + +func (h *SinkHandler) StoreEvent(w http.ResponseWriter, r *http.Request) { + event, err := extractCloudEventFromRequest(r) + if err != nil { + h.namedLogger().With().Error("failed to extract CloudEvent from request", zap.Error(err)) + e := writeResponse(w, http.StatusBadRequest, []byte(err.Error())) + if e != nil { + h.namedLogger().Error("failed to write response", zap.Error(e)) + } + return + } + + h.events[event.ID()] = event + err = writeResponse(w, http.StatusNoContent, []byte("")) + if err != nil { + h.namedLogger().Error("failed to write response", zap.Error(err)) + } +} + +func (h *SinkHandler) GetEvent(w http.ResponseWriter, r *http.Request) { + eventID := mux.Vars(r)["eventID"] + event, ok := h.events[eventID] + if !ok { + h.namedLogger().With().Error("event not found", zap.String("eventID", eventID)) + e := writeResponse(w, http.StatusNotFound, []byte("event not found")) + if e != nil { + h.namedLogger().Error("failed to write response", zap.Error(e)) + } + return + } + + respBody, err := event.MarshalJSON() + if err != nil { + h.namedLogger().With().Error("failed to marshal event", zap.Error(err)) + e := writeResponse(w, http.StatusInternalServerError, []byte(err.Error())) + if e != nil { + h.namedLogger().Error("failed to write response", zap.Error(e)) + } + return + } + + err = writeResponse(w, http.StatusOK, respBody) + if err != nil { + h.namedLogger().Error("failed to write response", zap.Error(err)) + } +} + +func (h *SinkHandler) namedLogger() *zap.Logger { + return h.logger.Named("sink-handler") +} + +// extractCloudEventFromRequest converts an incoming CloudEvent request to an Event. +func extractCloudEventFromRequest(r *http.Request) (*cev2event.Event, error) { + message := cev2http.NewMessageFromHttpRequest(r) + defer func() { _ = message.Finish(nil) }() + + event, err := binding.ToEvent(context.Background(), message) + if err != nil { + return nil, err + } + + err = event.Validate() + if err != nil { + return nil, err + } + return event, nil +} + +// writeResponse writes the HTTP response given the status code and response body. +func writeResponse(writer http.ResponseWriter, statusCode int, respBody []byte) error { + writer.WriteHeader(statusCode) + + if respBody == nil { + return nil + } + _, err := writer.Write(respBody) + return err +}