Skip to content

Commit

Permalink
backend: add endpoint to create new schema
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 15, 2023
1 parent 8ea65b0 commit bebf717
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 0 deletions.
43 changes: 43 additions & 0 deletions backend/pkg/api/handle_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,46 @@ func (api *API) handleDeleteSubjectVersion() http.HandlerFunc {
rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}

func (api *API) handleCreateSchema() http.HandlerFunc {
if !api.Cfg.Kafka.Schema.Enabled {
return api.handleSchemaRegistryNotConfigured()
}

return func(w http.ResponseWriter, r *http.Request) {
// 1. Parse request parameters
subjectName := rest.GetURLParam(r, "subject")

var payload schema.Schema
restErr := rest.Decode(w, r, &payload)
if restErr != nil {
rest.SendRESTError(w, r, api.Logger, restErr)
return
}
if payload.Schema == "" {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: fmt.Errorf("payload validation failed for creating schema"),
Status: http.StatusBadRequest,
Message: "You must set the schema field when creating a new schema",
InternalLogs: []zapcore.Field{zap.String("subject_name", subjectName)},
IsSilent: false,
})
return
}

// 2. Send create request
res, err := api.ConsoleSvc.CreateSchemaRegistrySchema(r.Context(), subjectName, payload)
if err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusServiceUnavailable,
Message: fmt.Sprintf("Failed to create schema: %v", err.Error()),
InternalLogs: []zapcore.Field{zap.String("subject_name", subjectName)},
IsSilent: false,
})
return
}

rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}
1 change: 1 addition & 0 deletions backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (api *API) routes() *chi.Mux {
r.Get("/schema-registry/subjects", api.handleGetSchemaSubjects())
r.Get("/schema-registry/schemas/types", api.handleGetSchemaRegistrySchemaTypes())
r.Delete("/schema-registry/subjects/{subject}", api.handleDeleteSubject())
r.Post("/schema-registry/subjects/{subject}/versions", api.handleCreateSchema())
r.Delete("/schema-registry/subjects/{subject}/versions/{version}", api.handleDeleteSubjectVersion())
r.Get("/schema-registry/subjects/{subject}/versions/{version}", api.handleGetSchemaSubjectDetails())

Expand Down
15 changes: 15 additions & 0 deletions backend/pkg/console/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,18 @@ func (s *Service) GetSchemaRegistrySchemaTypes(ctx context.Context) (*SchemaRegi
}
return &SchemaRegistrySchemaTypes{SchemaTypes: res}, nil
}

// CreateSchemaResponse is the response to creating a new schema.
type CreateSchemaResponse struct {
ID int `json:"id"`
}

// CreateSchemaRegistrySchema registers a new schema for the given subject in the schema registry.
func (s *Service) CreateSchemaRegistrySchema(ctx context.Context, subjectName string, schema schema.Schema) (*CreateSchemaResponse, error) {
res, err := s.kafkaSvc.SchemaService.CreateSchema(ctx, subjectName, schema)
if err != nil {
return nil, err
}

return &CreateSchemaResponse{ID: res.ID}, nil
}
2 changes: 2 additions & 0 deletions backend/pkg/console/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/twmb/franz-go/pkg/kmsg"

"github.com/redpanda-data/console/backend/pkg/kafka"
"github.com/redpanda-data/console/backend/pkg/schema"
)

// Servicer is an interface for the Console package that offers all methods to serve the responses for the API layer.
Expand Down Expand Up @@ -57,4 +58,5 @@ type Servicer interface {
DeleteSchemaRegistrySubject(ctx context.Context, subjectName string, deletePermanently bool) (*SchemaRegistryDeleteSubjectResponse, error)
DeleteSchemaRegistrySubjectVersion(ctx context.Context, subject, version string, deletePermanently bool) (*SchemaRegistryDeleteSubjectVersionResponse, error)
GetSchemaRegistrySchemaTypes(ctx context.Context) (*SchemaRegistrySchemaTypes, error)
CreateSchemaRegistrySchema(ctx context.Context, subjectName string, schema schema.Schema) (*CreateSchemaResponse, error)
}
56 changes: 56 additions & 0 deletions backend/pkg/schema/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,62 @@ func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]Schema
return schemas, nil
}

// Schema is the object form of a schema for the HTTP API.
type Schema struct {
// Schema is the actual unescaped text of a schema.
Schema string `json:"schema"`

// Type is the type of a schema. The default type is avro.
Type SchemaType `json:"schemaType,omitempty"`

// References declares other schemas this schema references. See the
// docs on SchemaReference for more details.
References []SchemaReference `json:"references,omitempty"`
}

// SchemaReference is a way for a one schema to reference another. The details
// for how referencing is done are type specific; for example, JSON objects
// that use the key "$ref" can refer to another schema via URL. For more details
// on references, see the following link:
//
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references
// https://docs.confluent.io/platform/current/schema-registry/develop/api.html
type SchemaReference struct {
Name string `json:"name"`
Subject string `json:"subject"`
Version int `json:"version"`
}

// CreateSchemaResponse is the response to creating a schema.
type CreateSchemaResponse struct {
ID int `json:"id"`
}

// CreateSchema registers a new schema under the specified subject.
func (c *Client) CreateSchema(ctx context.Context, subjectName string, schema Schema) (*CreateSchemaResponse, error) {
var createSchemaRes CreateSchemaResponse
res, err := c.client.R().
SetContext(ctx).
SetResult(&createSchemaRes).
SetPathParam("subject", subjectName).
SetQueryParam("normalize", "true").
SetBody(&schema).
Post("/subjects/{subject}/versions")
if err != nil {
return nil, fmt.Errorf("create schema failed: %w", err)
}

if res.IsError() {
restErr, ok := res.Error().(*RestError)
if !ok {
return nil, fmt.Errorf("create schema failed: Status code %d", res.StatusCode())
}
return nil, restErr
}

return &createSchemaRes, nil
}

// GetSchemasIndividually returns all schemas by describing all schemas one by one. This may be used against
// schema registry that don't support the /schemas endpoint that returns a list of all registered schemas.
func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted bool) ([]SchemaVersionedResponse, error) {
Expand Down
63 changes: 63 additions & 0 deletions backend/pkg/schema/enums.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package schema

import (
"fmt"
"strings"
)

// SchemaType as an enum representing schema types. The default schema type
// is avro.
type SchemaType int

const (
TypeAvro SchemaType = iota
TypeProtobuf
TypeJSON
)

// String presentation of schema type.
func (t SchemaType) String() string {
switch t {
case TypeAvro:
return "AVRO"
case TypeProtobuf:
return "PROTOBUF"
case TypeJSON:
return "JSON"
default:
return ""
}
}

// MarshalText marshals the SchemaType.
func (t SchemaType) MarshalText() ([]byte, error) {
s := t.String()
if s == "" {
return nil, fmt.Errorf("unknown schema type %d", t)
}
return []byte(s), nil
}

// UnmarshalText unmarshals the schema type.
func (t *SchemaType) UnmarshalText(text []byte) error {
switch s := strings.ToUpper(string(text)); s {
default:
return fmt.Errorf("unknown schema type %q", s)
case "", "AVRO":
*t = TypeAvro
case "PROTOBUF":
*t = TypeProtobuf
case "JSON":
*t = TypeJSON
}
return nil
}
5 changes: 5 additions & 0 deletions backend/pkg/schema/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,8 @@ func (s *Service) GetSchemaBySubjectAndVersion(ctx context.Context, subject stri

return cachedSchema, err
}

// CreateSchema registers a new schema for the given subject.
func (s *Service) CreateSchema(ctx context.Context, subject string, schema Schema) (*CreateSchemaResponse, error) {
return s.registryClient.CreateSchema(ctx, subject, schema)
}

0 comments on commit bebf717

Please sign in to comment.