Skip to content

Commit

Permalink
backend: add validate endpoint for schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 23, 2023
1 parent 0a2913d commit fe31c24
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 34 deletions.
2 changes: 2 additions & 0 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ require (
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/sethgrid/pester v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,10 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis=
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
Expand Down
50 changes: 50 additions & 0 deletions backend/pkg/api/handle_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,53 @@ func (api *API) handleCreateSchema() http.HandlerFunc {
rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}

func (api *API) handleValidateSchema() http.HandlerFunc {
if !api.Cfg.Kafka.Schema.Enabled {
return api.handleSchemaRegistryNotConfigured()
}

return func(w http.ResponseWriter, r *http.Request) {
// 1. Parse request parameters
subjectName := rest.GetURLParam(r, "subject")

version := rest.GetURLParam(r, "version")
switch version {
case console.SchemaVersionsLatest:
default:
// Must be number or it's invalid input
_, err := strconv.Atoi(version)
if err != nil {
descriptiveErr := fmt.Errorf("version %q is not valid. Must be %q or a positive integer", version, console.SchemaVersionsLatest)
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: descriptiveErr,
Status: http.StatusBadRequest,
Message: descriptiveErr.Error(),
IsSilent: false,
})
return
}
}

var payload schema.Schema
restErr := rest.Decode(w, r, &payload)
if restErr != nil {
rest.SendRESTError(w, r, api.Logger, restErr)
return
}
if payload.Schema == "" {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: fmt.Errorf("payload validation failed for validating schema"),
Status: http.StatusBadRequest,
Message: "You must set the schema field when validating the schema",
InternalLogs: []zapcore.Field{zap.String("subject_name", subjectName)},
IsSilent: false,
})
return
}

// 2. Send validate request
res := api.ConsoleSvc.ValidateSchemaRegistrySchema(r.Context(), subjectName, version, payload)
rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}
1 change: 1 addition & 0 deletions backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (api *API) routes() *chi.Mux {
r.Get("/schema-registry/schemas/types", api.handleGetSchemaRegistrySchemaTypes())
r.Delete("/schema-registry/subjects/{subject}", api.handleDeleteSubject())
r.Post("/schema-registry/subjects/{subject}/versions", api.handleCreateSchema())
r.Post("/schema-registry/subjects/{subject}/versions/{version}/validate", api.handleValidateSchema())
r.Delete("/schema-registry/subjects/{subject}/versions/{version}", api.handleDeleteSubjectVersion())
r.Get("/schema-registry/subjects/{subject}/versions/{version}", api.handleGetSchemaSubjectDetails())
r.Get("/schema-registry/subjects/{subject}/versions/{version}/referencedby", api.handleGetSchemaReferences())
Expand Down
86 changes: 78 additions & 8 deletions backend/pkg/console/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (s *Service) GetSchemaRegistryConfig(ctx context.Context) (*SchemaRegistryC
return &SchemaRegistryConfig{Compatibility: config.Compatibility}, nil
}

// PutSchemaRegistryConfig sets the global compatibility level.
func (s *Service) PutSchemaRegistryConfig(ctx context.Context, compatLevel schema.CompatibilityLevel) (*SchemaRegistryConfig, error) {
config, err := s.kafkaSvc.SchemaService.PutConfig(ctx, compatLevel)
if err != nil {
Expand Down Expand Up @@ -119,7 +120,7 @@ func (s *Service) GetSchemaRegistrySubjects(ctx context.Context) ([]SchemaRegist
// or the full schema information that's part of the subject.
type SchemaRegistrySubjectDetails struct {
Name string `json:"name"`
Type string `json:"type"`
Type schema.SchemaType `json:"type"`
Compatibility schema.CompatibilityLevel `json:"compatibility"`
RegisteredVersions []SchemaRegistrySubjectDetailsVersion `json:"versions"`
LatestActiveVersion int `json:"latestActiveVersion"`
Expand Down Expand Up @@ -208,7 +209,7 @@ func (s *Service) GetSchemaRegistrySubjectDetails(ctx context.Context, subjectNa
schemas = append(schemas, schemaRes)
}

schemaType := "unknown"
var schemaType schema.SchemaType
if len(schemas) > 0 {
schemaType = schemas[len(schemas)-1].Type
}
Expand Down Expand Up @@ -295,12 +296,12 @@ func (s *Service) getSchemaRegistrySchemaVersions(ctx context.Context, subjectNa

// SchemaRegistryVersionedSchema describes a retrieved schema.
type SchemaRegistryVersionedSchema struct {
ID int `json:"id"`
Version int `json:"version"`
IsSoftDeleted bool `json:"isSoftDeleted"`
Type string `json:"type"`
Schema string `json:"schema"`
References []Reference `json:"references"`
ID int `json:"id"`
Version int `json:"version"`
IsSoftDeleted bool `json:"isSoftDeleted"`
Type schema.SchemaType `json:"type"`
Schema string `json:"schema"`
References []Reference `json:"references"`
}

// Reference describes a reference to a different schema stored in the schema registry.
Expand Down Expand Up @@ -407,3 +408,72 @@ func (s *Service) CreateSchemaRegistrySchema(ctx context.Context, subjectName st

return &CreateSchemaResponse{ID: res.ID}, nil
}

// SchemaRegistrySchemaValidation is the response to a schema validation.
type SchemaRegistrySchemaValidation struct {
Compatibility SchemaRegistrySchemaValidationCompatibility `json:"compatibility"`
ParsingError string `json:"parsingError,omitempty"`
IsValid bool `json:"isValid"`
}

// SchemaRegistrySchemaValidationCompatibility is the response to the compatibility check
// performed by the schema registry.
type SchemaRegistrySchemaValidationCompatibility struct {
IsCompatible bool `json:"isCompatible"`
Error string `json:"error,omitempty"`
}

// ValidateSchemaRegistrySchema validates a given schema by checking:
// 1. Compatibility to previous versions if they exist.
// 2. Validating the schema for correctness.
func (s *Service) ValidateSchemaRegistrySchema(
ctx context.Context,
subjectName string,
version string,
sch schema.Schema,
) *SchemaRegistrySchemaValidation {
// Compatibility check from schema registry
var compatErr string
var isCompatible bool
compatRes, err := s.kafkaSvc.SchemaService.CheckCompatibility(ctx, subjectName, version, sch)
if err != nil {
compatErr = err.Error()

// If subject doesn't exist, we will reset the error, because new subject schemas
// don't have any existing schema and therefore can't be incompatible.
var schemaErr *schema.RestError
if errors.As(err, &schemaErr) {
if schemaErr.ErrorCode == schema.CodeSubjectNotFound {
compatErr = ""
isCompatible = true
}
}
} else {
isCompatible = compatRes.IsCompatible
}

var parsingErr string
switch sch.Type {
case schema.TypeAvro:
if err := s.kafkaSvc.SchemaService.ValidateAvroSchema(ctx, sch); err != nil {
parsingErr = err.Error()
}
case schema.TypeJSON:
if err := s.kafkaSvc.SchemaService.ValidateJSONSchema(ctx, subjectName, sch, nil); err != nil {
parsingErr = err.Error()
}
case schema.TypeProtobuf:
if err := s.kafkaSvc.SchemaService.ValidateProtobufSchema(ctx, subjectName, sch); err != nil {
parsingErr = err.Error()
}
}

return &SchemaRegistrySchemaValidation{
Compatibility: SchemaRegistrySchemaValidationCompatibility{
IsCompatible: isCompatible,
Error: compatErr,
},
ParsingError: parsingErr,
IsValid: parsingErr == "" && isCompatible,
}
}
1 change: 1 addition & 0 deletions backend/pkg/console/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ type Servicer interface {
DeleteSchemaRegistrySubjectVersion(ctx context.Context, subject, version string, deletePermanently bool) (*SchemaRegistryDeleteSubjectVersionResponse, error)
GetSchemaRegistrySchemaTypes(ctx context.Context) (*SchemaRegistrySchemaTypes, error)
CreateSchemaRegistrySchema(ctx context.Context, subjectName string, schema schema.Schema) (*CreateSchemaResponse, error)
ValidateSchemaRegistrySchema(ctx context.Context, subjectName string, version string, schema schema.Schema) *SchemaRegistrySchemaValidation
}
59 changes: 41 additions & 18 deletions backend/pkg/schema/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,8 @@ func newClient(cfg config.Schema) (*Client, error) {
//
//nolint:revive // This is stuttering when calling this with the pkg name, but without that the
type SchemaResponse struct {
Schema string `json:"schema"`
References []Reference `json:"references,omitempty"`
}

// Reference describes a reference to a different schema stored in the schema registry.
type Reference struct {
Name string `json:"name"`
Subject string `json:"subject"`
Version int `json:"version"`
Schema string `json:"schema"`
References []SchemaReference `json:"references,omitempty"`
}

// GetSchemaByID returns the schema string identified by the input ID.
Expand Down Expand Up @@ -168,12 +161,12 @@ func (c *Client) GetSchemaByID(ctx context.Context, id uint32) (*SchemaResponse,
//
//nolint:revive // This is stuttering when calling this with the pkg name, but without that the
type SchemaVersionedResponse struct {
Subject string `json:"subject"`
SchemaID int `json:"id"`
Version int `json:"version"`
Schema string `json:"schema"`
Type string `json:"schemaType"`
References []Reference `json:"references"`
Subject string `json:"subject"`
SchemaID int `json:"id"`
Version int `json:"version"`
Schema string `json:"schema"`
Type SchemaType `json:"schemaType"`
References []SchemaReference `json:"references"`
}

// GetSchemaBySubject returns the schema for the specified version of this subject. The unescaped schema only is returned.
Expand Down Expand Up @@ -211,9 +204,6 @@ func (c *Client) GetSchemaBySubject(ctx context.Context, subject, version string
if !ok {
return nil, fmt.Errorf("failed to parse schema by subject response")
}
if parsed.Type == "" {
parsed.Type = TypeAvro.String()
}

return parsed, nil
}
Expand Down Expand Up @@ -645,6 +635,7 @@ func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted boo
return schemas, nil
}

// GetSchemaReferencesResponse is the response to fetching schema references.
type GetSchemaReferencesResponse struct {
SchemaIDs []int `json:"schemaIds"`
}
Expand Down Expand Up @@ -674,6 +665,38 @@ func (c *Client) GetSchemaReferences(ctx context.Context, subject, version strin
return &GetSchemaReferencesResponse{SchemaIDs: schemaIDs}, nil
}

// CheckCompatibilityResponse is the response to a compatibility check for a schema.
type CheckCompatibilityResponse struct {
IsCompatible bool `json:"is_compatible"`
}

// CheckCompatibility checks if a schema is compatible with the given version
// that exists. You can use 'latest' to check compatibility with the latest version.
func (c *Client) CheckCompatibility(ctx context.Context, subject string, version string, schema Schema) (*CheckCompatibilityResponse, error) {
var checkCompatRes CheckCompatibilityResponse
res, err := c.client.R().
SetContext(ctx).
SetResult(&checkCompatRes).
SetPathParam("subject", subject).
SetPathParam("version", version).
SetQueryParam("verbose", "true").
SetBody(&schema).
Post("/compatibility/subjects/{subject}/versions/{version}")
if err != nil {
return nil, fmt.Errorf("check compatibility failed: %w", err)
}

if res.IsError() {
restErr, ok := res.Error().(*RestError)
if !ok {
return nil, fmt.Errorf("check compatibility failed: Status code %d", res.StatusCode())
}
return nil, restErr
}

return &checkCompatRes, nil
}

// CheckConnectivity checks whether the schema registry can be access by GETing the /subjects
func (c *Client) CheckConnectivity(ctx context.Context) error {
url := "subjects"
Expand Down
Loading

0 comments on commit fe31c24

Please sign in to comment.