From bebf717937064012aec94c189c11422e5e39b66f Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim <23424570+weeco@users.noreply.github.com> Date: Tue, 15 Aug 2023 13:53:15 +0100 Subject: [PATCH] backend: add endpoint to create new schema --- backend/pkg/api/handle_schema_registry.go | 43 ++++++++++++++++ backend/pkg/api/routes.go | 1 + backend/pkg/console/schema_registry.go | 15 ++++++ backend/pkg/console/servicer.go | 2 + backend/pkg/schema/client.go | 56 ++++++++++++++++++++ backend/pkg/schema/enums.go | 63 +++++++++++++++++++++++ backend/pkg/schema/service.go | 5 ++ 7 files changed, 185 insertions(+) create mode 100644 backend/pkg/schema/enums.go diff --git a/backend/pkg/api/handle_schema_registry.go b/backend/pkg/api/handle_schema_registry.go index 1edfb1888..3e71c7fa8 100644 --- a/backend/pkg/api/handle_schema_registry.go +++ b/backend/pkg/api/handle_schema_registry.go @@ -318,3 +318,46 @@ func (api *API) handleDeleteSubjectVersion() http.HandlerFunc { rest.SendResponse(w, r, api.Logger, http.StatusOK, res) } } + +func (api *API) handleCreateSchema() http.HandlerFunc { + if !api.Cfg.Kafka.Schema.Enabled { + return api.handleSchemaRegistryNotConfigured() + } + + return func(w http.ResponseWriter, r *http.Request) { + // 1. Parse request parameters + subjectName := rest.GetURLParam(r, "subject") + + var payload schema.Schema + restErr := rest.Decode(w, r, &payload) + if restErr != nil { + rest.SendRESTError(w, r, api.Logger, restErr) + return + } + if payload.Schema == "" { + rest.SendRESTError(w, r, api.Logger, &rest.Error{ + Err: fmt.Errorf("payload validation failed for creating schema"), + Status: http.StatusBadRequest, + Message: "You must set the schema field when creating a new schema", + InternalLogs: []zapcore.Field{zap.String("subject_name", subjectName)}, + IsSilent: false, + }) + return + } + + // 2. Send create request + res, err := api.ConsoleSvc.CreateSchemaRegistrySchema(r.Context(), subjectName, payload) + if err != nil { + rest.SendRESTError(w, r, api.Logger, &rest.Error{ + Err: err, + Status: http.StatusServiceUnavailable, + Message: fmt.Sprintf("Failed to create schema: %v", err.Error()), + InternalLogs: []zapcore.Field{zap.String("subject_name", subjectName)}, + IsSilent: false, + }) + return + } + + rest.SendResponse(w, r, api.Logger, http.StatusOK, res) + } +} diff --git a/backend/pkg/api/routes.go b/backend/pkg/api/routes.go index 300633860..3db816732 100644 --- a/backend/pkg/api/routes.go +++ b/backend/pkg/api/routes.go @@ -138,6 +138,7 @@ func (api *API) routes() *chi.Mux { r.Get("/schema-registry/subjects", api.handleGetSchemaSubjects()) r.Get("/schema-registry/schemas/types", api.handleGetSchemaRegistrySchemaTypes()) r.Delete("/schema-registry/subjects/{subject}", api.handleDeleteSubject()) + r.Post("/schema-registry/subjects/{subject}/versions", api.handleCreateSchema()) r.Delete("/schema-registry/subjects/{subject}/versions/{version}", api.handleDeleteSubjectVersion()) r.Get("/schema-registry/subjects/{subject}/versions/{version}", api.handleGetSchemaSubjectDetails()) diff --git a/backend/pkg/console/schema_registry.go b/backend/pkg/console/schema_registry.go index 5e8bd3710..29968fc7a 100644 --- a/backend/pkg/console/schema_registry.go +++ b/backend/pkg/console/schema_registry.go @@ -346,3 +346,18 @@ func (s *Service) GetSchemaRegistrySchemaTypes(ctx context.Context) (*SchemaRegi } return &SchemaRegistrySchemaTypes{SchemaTypes: res}, nil } + +// CreateSchemaResponse is the response to creating a new schema. +type CreateSchemaResponse struct { + ID int `json:"id"` +} + +// CreateSchemaRegistrySchema registers a new schema for the given subject in the schema registry. +func (s *Service) CreateSchemaRegistrySchema(ctx context.Context, subjectName string, schema schema.Schema) (*CreateSchemaResponse, error) { + res, err := s.kafkaSvc.SchemaService.CreateSchema(ctx, subjectName, schema) + if err != nil { + return nil, err + } + + return &CreateSchemaResponse{ID: res.ID}, nil +} diff --git a/backend/pkg/console/servicer.go b/backend/pkg/console/servicer.go index 22b170088..8d419075e 100644 --- a/backend/pkg/console/servicer.go +++ b/backend/pkg/console/servicer.go @@ -8,6 +8,7 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" "github.com/redpanda-data/console/backend/pkg/kafka" + "github.com/redpanda-data/console/backend/pkg/schema" ) // Servicer is an interface for the Console package that offers all methods to serve the responses for the API layer. @@ -57,4 +58,5 @@ type Servicer interface { DeleteSchemaRegistrySubject(ctx context.Context, subjectName string, deletePermanently bool) (*SchemaRegistryDeleteSubjectResponse, error) DeleteSchemaRegistrySubjectVersion(ctx context.Context, subject, version string, deletePermanently bool) (*SchemaRegistryDeleteSubjectVersionResponse, error) GetSchemaRegistrySchemaTypes(ctx context.Context) (*SchemaRegistrySchemaTypes, error) + CreateSchemaRegistrySchema(ctx context.Context, subjectName string, schema schema.Schema) (*CreateSchemaResponse, error) } diff --git a/backend/pkg/schema/client.go b/backend/pkg/schema/client.go index 0ea147dc8..eb2abcbdc 100644 --- a/backend/pkg/schema/client.go +++ b/backend/pkg/schema/client.go @@ -511,6 +511,62 @@ func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]Schema return schemas, nil } +// Schema is the object form of a schema for the HTTP API. +type Schema struct { + // Schema is the actual unescaped text of a schema. + Schema string `json:"schema"` + + // Type is the type of a schema. The default type is avro. + Type SchemaType `json:"schemaType,omitempty"` + + // References declares other schemas this schema references. See the + // docs on SchemaReference for more details. + References []SchemaReference `json:"references,omitempty"` +} + +// SchemaReference is a way for a one schema to reference another. The details +// for how referencing is done are type specific; for example, JSON objects +// that use the key "$ref" can refer to another schema via URL. For more details +// on references, see the following link: +// +// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references +// https://docs.confluent.io/platform/current/schema-registry/develop/api.html +type SchemaReference struct { + Name string `json:"name"` + Subject string `json:"subject"` + Version int `json:"version"` +} + +// CreateSchemaResponse is the response to creating a schema. +type CreateSchemaResponse struct { + ID int `json:"id"` +} + +// CreateSchema registers a new schema under the specified subject. +func (c *Client) CreateSchema(ctx context.Context, subjectName string, schema Schema) (*CreateSchemaResponse, error) { + var createSchemaRes CreateSchemaResponse + res, err := c.client.R(). + SetContext(ctx). + SetResult(&createSchemaRes). + SetPathParam("subject", subjectName). + SetQueryParam("normalize", "true"). + SetBody(&schema). + Post("/subjects/{subject}/versions") + if err != nil { + return nil, fmt.Errorf("create schema failed: %w", err) + } + + if res.IsError() { + restErr, ok := res.Error().(*RestError) + if !ok { + return nil, fmt.Errorf("create schema failed: Status code %d", res.StatusCode()) + } + return nil, restErr + } + + return &createSchemaRes, nil +} + // GetSchemasIndividually returns all schemas by describing all schemas one by one. This may be used against // schema registry that don't support the /schemas endpoint that returns a list of all registered schemas. func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted bool) ([]SchemaVersionedResponse, error) { diff --git a/backend/pkg/schema/enums.go b/backend/pkg/schema/enums.go new file mode 100644 index 000000000..656f356e2 --- /dev/null +++ b/backend/pkg/schema/enums.go @@ -0,0 +1,63 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package schema + +import ( + "fmt" + "strings" +) + +// SchemaType as an enum representing schema types. The default schema type +// is avro. +type SchemaType int + +const ( + TypeAvro SchemaType = iota + TypeProtobuf + TypeJSON +) + +// String presentation of schema type. +func (t SchemaType) String() string { + switch t { + case TypeAvro: + return "AVRO" + case TypeProtobuf: + return "PROTOBUF" + case TypeJSON: + return "JSON" + default: + return "" + } +} + +// MarshalText marshals the SchemaType. +func (t SchemaType) MarshalText() ([]byte, error) { + s := t.String() + if s == "" { + return nil, fmt.Errorf("unknown schema type %d", t) + } + return []byte(s), nil +} + +// UnmarshalText unmarshals the schema type. +func (t *SchemaType) UnmarshalText(text []byte) error { + switch s := strings.ToUpper(string(text)); s { + default: + return fmt.Errorf("unknown schema type %q", s) + case "", "AVRO": + *t = TypeAvro + case "PROTOBUF": + *t = TypeProtobuf + case "JSON": + *t = TypeJSON + } + return nil +} diff --git a/backend/pkg/schema/service.go b/backend/pkg/schema/service.go index 09e9e0324..d3e6cde69 100644 --- a/backend/pkg/schema/service.go +++ b/backend/pkg/schema/service.go @@ -287,3 +287,8 @@ func (s *Service) GetSchemaBySubjectAndVersion(ctx context.Context, subject stri return cachedSchema, err } + +// CreateSchema registers a new schema for the given subject. +func (s *Service) CreateSchema(ctx context.Context, subject string, schema Schema) (*CreateSchemaResponse, error) { + return s.registryClient.CreateSchema(ctx, subject, schema) +}