From fe31c24885d3d76cf9484801f8fe5505b391c7a3 Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim <23424570+weeco@users.noreply.github.com> Date: Tue, 22 Aug 2023 12:15:19 +0100 Subject: [PATCH] backend: add validate endpoint for schemas --- backend/go.mod | 2 + backend/go.sum | 4 + backend/pkg/api/handle_schema_registry.go | 50 ++++++++++++ backend/pkg/api/routes.go | 1 + backend/pkg/console/schema_registry.go | 86 ++++++++++++++++++-- backend/pkg/console/servicer.go | 1 + backend/pkg/schema/client.go | 59 +++++++++----- backend/pkg/schema/service.go | 98 +++++++++++++++++++++-- 8 files changed, 267 insertions(+), 34 deletions(-) diff --git a/backend/go.mod b/backend/go.mod index d23260c4b..e311daf18 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -115,6 +115,8 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect github.com/sergi/go-diff v1.3.1 // indirect github.com/sethgrid/pester v1.2.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/backend/go.sum b/backend/go.sum index 9a416b5d9..dbcaef29a 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -514,6 +514,10 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= +github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis= +github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= diff --git a/backend/pkg/api/handle_schema_registry.go b/backend/pkg/api/handle_schema_registry.go index aefdf1348..3d71b5b26 100644 --- a/backend/pkg/api/handle_schema_registry.go +++ b/backend/pkg/api/handle_schema_registry.go @@ -428,3 +428,53 @@ func (api *API) handleCreateSchema() http.HandlerFunc { rest.SendResponse(w, r, api.Logger, http.StatusOK, res) } } + +func (api *API) handleValidateSchema() http.HandlerFunc { + if !api.Cfg.Kafka.Schema.Enabled { + return api.handleSchemaRegistryNotConfigured() + } + + return func(w http.ResponseWriter, r *http.Request) { + // 1. Parse request parameters + subjectName := rest.GetURLParam(r, "subject") + + version := rest.GetURLParam(r, "version") + switch version { + case console.SchemaVersionsLatest: + default: + // Must be number or it's invalid input + _, err := strconv.Atoi(version) + if err != nil { + descriptiveErr := fmt.Errorf("version %q is not valid. Must be %q or a positive integer", version, console.SchemaVersionsLatest) + rest.SendRESTError(w, r, api.Logger, &rest.Error{ + Err: descriptiveErr, + Status: http.StatusBadRequest, + Message: descriptiveErr.Error(), + IsSilent: false, + }) + return + } + } + + var payload schema.Schema + restErr := rest.Decode(w, r, &payload) + if restErr != nil { + rest.SendRESTError(w, r, api.Logger, restErr) + return + } + if payload.Schema == "" { + rest.SendRESTError(w, r, api.Logger, &rest.Error{ + Err: fmt.Errorf("payload validation failed for validating schema"), + Status: http.StatusBadRequest, + Message: "You must set the schema field when validating the schema", + InternalLogs: []zapcore.Field{zap.String("subject_name", subjectName)}, + IsSilent: false, + }) + return + } + + // 2. Send validate request + res := api.ConsoleSvc.ValidateSchemaRegistrySchema(r.Context(), subjectName, version, payload) + rest.SendResponse(w, r, api.Logger, http.StatusOK, res) + } +} diff --git a/backend/pkg/api/routes.go b/backend/pkg/api/routes.go index ed36cc996..742ebba3c 100644 --- a/backend/pkg/api/routes.go +++ b/backend/pkg/api/routes.go @@ -140,6 +140,7 @@ func (api *API) routes() *chi.Mux { r.Get("/schema-registry/schemas/types", api.handleGetSchemaRegistrySchemaTypes()) r.Delete("/schema-registry/subjects/{subject}", api.handleDeleteSubject()) r.Post("/schema-registry/subjects/{subject}/versions", api.handleCreateSchema()) + r.Post("/schema-registry/subjects/{subject}/versions/{version}/validate", api.handleValidateSchema()) r.Delete("/schema-registry/subjects/{subject}/versions/{version}", api.handleDeleteSubjectVersion()) r.Get("/schema-registry/subjects/{subject}/versions/{version}", api.handleGetSchemaSubjectDetails()) r.Get("/schema-registry/subjects/{subject}/versions/{version}/referencedby", api.handleGetSchemaReferences()) diff --git a/backend/pkg/console/schema_registry.go b/backend/pkg/console/schema_registry.go index 7cffd377e..fc6d895c7 100644 --- a/backend/pkg/console/schema_registry.go +++ b/backend/pkg/console/schema_registry.go @@ -58,6 +58,7 @@ func (s *Service) GetSchemaRegistryConfig(ctx context.Context) (*SchemaRegistryC return &SchemaRegistryConfig{Compatibility: config.Compatibility}, nil } +// PutSchemaRegistryConfig sets the global compatibility level. func (s *Service) PutSchemaRegistryConfig(ctx context.Context, compatLevel schema.CompatibilityLevel) (*SchemaRegistryConfig, error) { config, err := s.kafkaSvc.SchemaService.PutConfig(ctx, compatLevel) if err != nil { @@ -119,7 +120,7 @@ func (s *Service) GetSchemaRegistrySubjects(ctx context.Context) ([]SchemaRegist // or the full schema information that's part of the subject. type SchemaRegistrySubjectDetails struct { Name string `json:"name"` - Type string `json:"type"` + Type schema.SchemaType `json:"type"` Compatibility schema.CompatibilityLevel `json:"compatibility"` RegisteredVersions []SchemaRegistrySubjectDetailsVersion `json:"versions"` LatestActiveVersion int `json:"latestActiveVersion"` @@ -208,7 +209,7 @@ func (s *Service) GetSchemaRegistrySubjectDetails(ctx context.Context, subjectNa schemas = append(schemas, schemaRes) } - schemaType := "unknown" + var schemaType schema.SchemaType if len(schemas) > 0 { schemaType = schemas[len(schemas)-1].Type } @@ -295,12 +296,12 @@ func (s *Service) getSchemaRegistrySchemaVersions(ctx context.Context, subjectNa // SchemaRegistryVersionedSchema describes a retrieved schema. type SchemaRegistryVersionedSchema struct { - ID int `json:"id"` - Version int `json:"version"` - IsSoftDeleted bool `json:"isSoftDeleted"` - Type string `json:"type"` - Schema string `json:"schema"` - References []Reference `json:"references"` + ID int `json:"id"` + Version int `json:"version"` + IsSoftDeleted bool `json:"isSoftDeleted"` + Type schema.SchemaType `json:"type"` + Schema string `json:"schema"` + References []Reference `json:"references"` } // Reference describes a reference to a different schema stored in the schema registry. @@ -407,3 +408,72 @@ func (s *Service) CreateSchemaRegistrySchema(ctx context.Context, subjectName st return &CreateSchemaResponse{ID: res.ID}, nil } + +// SchemaRegistrySchemaValidation is the response to a schema validation. +type SchemaRegistrySchemaValidation struct { + Compatibility SchemaRegistrySchemaValidationCompatibility `json:"compatibility"` + ParsingError string `json:"parsingError,omitempty"` + IsValid bool `json:"isValid"` +} + +// SchemaRegistrySchemaValidationCompatibility is the response to the compatibility check +// performed by the schema registry. +type SchemaRegistrySchemaValidationCompatibility struct { + IsCompatible bool `json:"isCompatible"` + Error string `json:"error,omitempty"` +} + +// ValidateSchemaRegistrySchema validates a given schema by checking: +// 1. Compatibility to previous versions if they exist. +// 2. Validating the schema for correctness. +func (s *Service) ValidateSchemaRegistrySchema( + ctx context.Context, + subjectName string, + version string, + sch schema.Schema, +) *SchemaRegistrySchemaValidation { + // Compatibility check from schema registry + var compatErr string + var isCompatible bool + compatRes, err := s.kafkaSvc.SchemaService.CheckCompatibility(ctx, subjectName, version, sch) + if err != nil { + compatErr = err.Error() + + // If subject doesn't exist, we will reset the error, because new subject schemas + // don't have any existing schema and therefore can't be incompatible. + var schemaErr *schema.RestError + if errors.As(err, &schemaErr) { + if schemaErr.ErrorCode == schema.CodeSubjectNotFound { + compatErr = "" + isCompatible = true + } + } + } else { + isCompatible = compatRes.IsCompatible + } + + var parsingErr string + switch sch.Type { + case schema.TypeAvro: + if err := s.kafkaSvc.SchemaService.ValidateAvroSchema(ctx, sch); err != nil { + parsingErr = err.Error() + } + case schema.TypeJSON: + if err := s.kafkaSvc.SchemaService.ValidateJSONSchema(ctx, subjectName, sch, nil); err != nil { + parsingErr = err.Error() + } + case schema.TypeProtobuf: + if err := s.kafkaSvc.SchemaService.ValidateProtobufSchema(ctx, subjectName, sch); err != nil { + parsingErr = err.Error() + } + } + + return &SchemaRegistrySchemaValidation{ + Compatibility: SchemaRegistrySchemaValidationCompatibility{ + IsCompatible: isCompatible, + Error: compatErr, + }, + ParsingError: parsingErr, + IsValid: parsingErr == "" && isCompatible, + } +} diff --git a/backend/pkg/console/servicer.go b/backend/pkg/console/servicer.go index c0cac70c6..5ac985980 100644 --- a/backend/pkg/console/servicer.go +++ b/backend/pkg/console/servicer.go @@ -61,4 +61,5 @@ type Servicer interface { DeleteSchemaRegistrySubjectVersion(ctx context.Context, subject, version string, deletePermanently bool) (*SchemaRegistryDeleteSubjectVersionResponse, error) GetSchemaRegistrySchemaTypes(ctx context.Context) (*SchemaRegistrySchemaTypes, error) CreateSchemaRegistrySchema(ctx context.Context, subjectName string, schema schema.Schema) (*CreateSchemaResponse, error) + ValidateSchemaRegistrySchema(ctx context.Context, subjectName string, version string, schema schema.Schema) *SchemaRegistrySchemaValidation } diff --git a/backend/pkg/schema/client.go b/backend/pkg/schema/client.go index 21cd72408..5dcacd152 100644 --- a/backend/pkg/schema/client.go +++ b/backend/pkg/schema/client.go @@ -123,15 +123,8 @@ func newClient(cfg config.Schema) (*Client, error) { // //nolint:revive // This is stuttering when calling this with the pkg name, but without that the type SchemaResponse struct { - Schema string `json:"schema"` - References []Reference `json:"references,omitempty"` -} - -// Reference describes a reference to a different schema stored in the schema registry. -type Reference struct { - Name string `json:"name"` - Subject string `json:"subject"` - Version int `json:"version"` + Schema string `json:"schema"` + References []SchemaReference `json:"references,omitempty"` } // GetSchemaByID returns the schema string identified by the input ID. @@ -168,12 +161,12 @@ func (c *Client) GetSchemaByID(ctx context.Context, id uint32) (*SchemaResponse, // //nolint:revive // This is stuttering when calling this with the pkg name, but without that the type SchemaVersionedResponse struct { - Subject string `json:"subject"` - SchemaID int `json:"id"` - Version int `json:"version"` - Schema string `json:"schema"` - Type string `json:"schemaType"` - References []Reference `json:"references"` + Subject string `json:"subject"` + SchemaID int `json:"id"` + Version int `json:"version"` + Schema string `json:"schema"` + Type SchemaType `json:"schemaType"` + References []SchemaReference `json:"references"` } // GetSchemaBySubject returns the schema for the specified version of this subject. The unescaped schema only is returned. @@ -211,9 +204,6 @@ func (c *Client) GetSchemaBySubject(ctx context.Context, subject, version string if !ok { return nil, fmt.Errorf("failed to parse schema by subject response") } - if parsed.Type == "" { - parsed.Type = TypeAvro.String() - } return parsed, nil } @@ -645,6 +635,7 @@ func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted boo return schemas, nil } +// GetSchemaReferencesResponse is the response to fetching schema references. type GetSchemaReferencesResponse struct { SchemaIDs []int `json:"schemaIds"` } @@ -674,6 +665,38 @@ func (c *Client) GetSchemaReferences(ctx context.Context, subject, version strin return &GetSchemaReferencesResponse{SchemaIDs: schemaIDs}, nil } +// CheckCompatibilityResponse is the response to a compatibility check for a schema. +type CheckCompatibilityResponse struct { + IsCompatible bool `json:"is_compatible"` +} + +// CheckCompatibility checks if a schema is compatible with the given version +// that exists. You can use 'latest' to check compatibility with the latest version. +func (c *Client) CheckCompatibility(ctx context.Context, subject string, version string, schema Schema) (*CheckCompatibilityResponse, error) { + var checkCompatRes CheckCompatibilityResponse + res, err := c.client.R(). + SetContext(ctx). + SetResult(&checkCompatRes). + SetPathParam("subject", subject). + SetPathParam("version", version). + SetQueryParam("verbose", "true"). + SetBody(&schema). + Post("/compatibility/subjects/{subject}/versions/{version}") + if err != nil { + return nil, fmt.Errorf("check compatibility failed: %w", err) + } + + if res.IsError() { + restErr, ok := res.Error().(*RestError) + if !ok { + return nil, fmt.Errorf("check compatibility failed: Status code %d", res.StatusCode()) + } + return nil, restErr + } + + return &checkCompatRes, nil +} + // CheckConnectivity checks whether the schema registry can be access by GETing the /subjects func (c *Client) CheckConnectivity(ctx context.Context) error { url := "subjects" diff --git a/backend/pkg/schema/service.go b/backend/pkg/schema/service.go index e80ac23c3..71c18b17d 100644 --- a/backend/pkg/schema/service.go +++ b/backend/pkg/schema/service.go @@ -13,11 +13,13 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/hamba/avro/v2" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/desc/protoparse" + "github.com/santhosh-tekuri/jsonschema/v5" "github.com/twmb/go-cache/cache" "go.uber.org/zap" "golang.org/x/sync/singleflight" @@ -79,7 +81,7 @@ func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDe // 1. Index all returned schemas by their respective subject name and version as stored in the schema registry schemasBySubjectAndVersion := make(map[string]map[int]SchemaVersionedResponse) for _, schema := range schemasRes { - if schema.Type != TypeProtobuf.String() { + if schema.Type != TypeProtobuf { continue } _, exists := schemasBySubjectAndVersion[schema.Subject] @@ -93,7 +95,7 @@ func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDe // registered in their own proto registry. fdBySchemaID := make(map[int]*desc.FileDescriptor) for _, schema := range schemasRes { - if schema.Type != TypeProtobuf.String() { + if schema.Type != TypeProtobuf { continue } @@ -184,7 +186,7 @@ func (s *Service) GetAvroSchemaByID(ctx context.Context, schemaID uint32) (avro. return nil, fmt.Errorf("failed to get schema from registry: %w", err) } - codec, err := s.ParseAvroSchemaWithReferences(ctx, schemaRes) + codec, err := s.ParseAvroSchemaWithReferences(ctx, schemaRes, avro.DefaultSchemaCache) if err != nil { return nil, fmt.Errorf("failed to parse schema: %w", err) } @@ -251,11 +253,17 @@ func (s *Service) GetSchemaReferences(ctx context.Context, subject, version stri return s.registryClient.GetSchemaReferences(ctx, subject, version) } +// CheckCompatibility checks if a schema is compatible with the given version +// that exists. You can use 'latest' to check compatibility with the latest version. +func (s *Service) CheckCompatibility(ctx context.Context, subject string, version string, schema Schema) (*CheckCompatibilityResponse, error) { + return s.registryClient.CheckCompatibility(ctx, subject, version, schema) +} + // ParseAvroSchemaWithReferences parses an avro schema that potentially has references // to other schemas. References will be resolved by requesting and parsing them // recursively. If any of the referenced schemas can't be fetched or parsed an // error will be returned. -func (s *Service) ParseAvroSchemaWithReferences(ctx context.Context, schema *SchemaResponse) (avro.Schema, error) { +func (s *Service) ParseAvroSchemaWithReferences(ctx context.Context, schema *SchemaResponse, schemaCache *avro.SchemaCache) (avro.Schema, error) { if len(schema.References) == 0 { return avro.Parse(schema.Schema) } @@ -268,10 +276,14 @@ func (s *Service) ParseAvroSchemaWithReferences(ctx context.Context, schema *Sch return nil, err } - if _, err := s.ParseAvroSchemaWithReferences(ctx, &SchemaResponse{ - Schema: schemaRef.Schema, - References: schemaRef.References, - }); err != nil { + if _, err := s.ParseAvroSchemaWithReferences( + ctx, + &SchemaResponse{ + Schema: schemaRef.Schema, + References: schemaRef.References, + }, + schemaCache, + ); err != nil { return nil, fmt.Errorf( "failed to parse schema reference (subject: %q, version %q): %w", reference.Subject, reference.Version, err, @@ -283,6 +295,76 @@ func (s *Service) ParseAvroSchemaWithReferences(ctx context.Context, schema *Sch return avro.Parse(schema.Schema) } +// ValidateAvroSchema tries to parse the given avro schema with the avro library. +// If there's an issue with the given schema, it will be returned to the user +// so that they can fix the schema string. +func (s *Service) ValidateAvroSchema(ctx context.Context, sch Schema) error { + tempCache := avro.SchemaCache{} + schemaRes := &SchemaResponse{Schema: sch.Schema, References: sch.References} + _, err := s.ParseAvroSchemaWithReferences(ctx, schemaRes, &tempCache) + return err +} + +// ValidateJSONSchema validates a JSON schema for syntax issues. +func (s *Service) ValidateJSONSchema(ctx context.Context, name string, sch Schema, schemaCompiler *jsonschema.Compiler) error { + if schemaCompiler == nil { + schemaCompiler = jsonschema.NewCompiler() + } + + for _, ref := range sch.References { + schemaRefRes, err := s.GetSchemaBySubjectAndVersion(ctx, ref.Subject, strconv.Itoa(ref.Version)) + if err != nil { + return fmt.Errorf("failed to retrieve reference %q: %w", ref.Subject, err) + } + schemaRef := Schema{ + Schema: schemaRefRes.Schema, + Type: schemaRefRes.Type, + References: nil, + } + if err := s.ValidateJSONSchema(ctx, ref.Name, schemaRef, schemaCompiler); err != nil { + return err + } + } + + // Prevent a panic by the schema compiler by checking the name before AddResource + if strings.IndexByte(name, '#') != -1 { + return fmt.Errorf("hashtags are not allowed as part of the schema name") + } + err := schemaCompiler.AddResource(name, strings.NewReader(sch.Schema)) + if err != nil { + return fmt.Errorf("failed to add resource for %q", name) + } + + _, err = jsonschema.CompileString(name, sch.Schema) + if err != nil { + return fmt.Errorf("failed to validate schema %q: %w", name, err) + } + return nil +} + +// ValidateProtobufSchema validates a given protobuf schema by trying to parse it as a descriptor +// along with all its references. +func (s *Service) ValidateProtobufSchema(ctx context.Context, name string, sch Schema) error { + schemasByPath := make(map[string]string) + schemasByPath[name] = sch.Schema + + for _, ref := range sch.References { + schemaRefRes, err := s.GetSchemaBySubjectAndVersion(ctx, ref.Subject, strconv.Itoa(ref.Version)) + if err != nil { + return fmt.Errorf("failed to retrieve reference %q: %w", ref.Subject, err) + } + schemasByPath[ref.Name] = schemaRefRes.Schema + } + parser := protoparse.Parser{ + Accessor: protoparse.FileContentsFromMap(schemasByPath), + InferImportPaths: true, + ValidateUnlinkedFiles: true, + IncludeSourceCodeInfo: true, + } + _, err := parser.ParseFiles(name) + return err +} + // GetSchemaBySubjectAndVersion retrieves a schema from the schema registry // by a given tuple. func (s *Service) GetSchemaBySubjectAndVersion(ctx context.Context, subject string, version string) (*SchemaVersionedResponse, error) {