Skip to content

Commit

Permalink
backend: add endpoints for managing subject compat
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Sep 1, 2023
1 parent e70fba5 commit f2c9f74
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 1 deletion.
83 changes: 82 additions & 1 deletion backend/pkg/api/handle_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,88 @@ func (api *API) handlePutSchemaRegistryConfig() http.HandlerFunc {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusBadGateway,
Message: fmt.Sprintf("Failed to retrieve schema registry types from the schema registry: %v", err.Error()),
Message: fmt.Sprintf("Failed to set global compatibility level: %v", err.Error()),
IsSilent: false,
})
return
}
rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}

func (api *API) handlePutSchemaRegistrySubjectConfig() http.HandlerFunc {
if !api.Cfg.Kafka.Schema.Enabled {
return api.handleSchemaRegistryNotConfigured()
}

type request struct {
Compatibility schema.CompatibilityLevel `json:"compatibility"`
}

return func(w http.ResponseWriter, r *http.Request) {
// 1. Parse request parameters
subjectName := rest.GetURLParam(r, "subject")

req := request{}
restErr := rest.Decode(w, r, &req)
if restErr != nil {
rest.SendRESTError(w, r, api.Logger, restErr)
return
}

// 2. Set subject compatibility level
res, err := api.ConsoleSvc.PutSchemaRegistrySubjectConfig(r.Context(), subjectName, req.Compatibility)
if err != nil {
var schemaError *schema.RestError
if errors.As(err, &schemaError) && schemaError.ErrorCode == schema.CodeSubjectNotFound {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusNotFound,
Message: "Requested subject does not exist",
IsSilent: false,
})
return
}

rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusBadGateway,
Message: fmt.Sprintf("Failed to set subject's compatibility level: %v", err.Error()),
IsSilent: false,
})
return
}
rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}

func (api *API) handleDeleteSchemaRegistrySubjectConfig() http.HandlerFunc {
if !api.Cfg.Kafka.Schema.Enabled {
return api.handleSchemaRegistryNotConfigured()
}

return func(w http.ResponseWriter, r *http.Request) {
// 1. Parse request parameters
subjectName := rest.GetURLParam(r, "subject")

// 2. Set subject compatibility level
res, err := api.ConsoleSvc.DeleteSchemaRegistrySubjectConfig(r.Context(), subjectName)
if err != nil {
var schemaError *schema.RestError
if errors.As(err, &schemaError) && schemaError.ErrorCode == schema.CodeSubjectNotFound {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusNotFound,
Message: "Requested subject does not exist",
IsSilent: false,
})
return
}

rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusBadGateway,
Message: fmt.Sprintf("Failed to delete subject's compatibility level: %v", err.Error()),
IsSilent: false,
})
return
Expand Down
2 changes: 2 additions & 0 deletions backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func (api *API) routes() *chi.Mux {
r.Get("/schema-registry/mode", api.handleGetSchemaRegistryMode())
r.Get("/schema-registry/config", api.handleGetSchemaRegistryConfig())
r.Put("/schema-registry/config", api.handlePutSchemaRegistryConfig())
r.Put("/schema-registry/config/{subject}", api.handlePutSchemaRegistrySubjectConfig())
r.Delete("/schema-registry/config/{subject}", api.handleDeleteSchemaRegistrySubjectConfig())
r.Get("/schema-registry/subjects", api.handleGetSchemaSubjects())
r.Get("/schema-registry/schemas/types", api.handleGetSchemaRegistrySchemaTypes())
r.Get("/schema-registry/schemas/ids/{id}/versions", api.handleGetSchemaUsagesByID())
Expand Down
18 changes: 18 additions & 0 deletions backend/pkg/console/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ func (s *Service) PutSchemaRegistryConfig(ctx context.Context, compatLevel schem
return &SchemaRegistryConfig{Compatibility: config.Compatibility}, nil
}

// PutSchemaRegistrySubjectConfig sets the subject's compatibility level.
func (s *Service) PutSchemaRegistrySubjectConfig(ctx context.Context, subject string, compatLevel schema.CompatibilityLevel) (*SchemaRegistryConfig, error) {
config, err := s.kafkaSvc.SchemaService.PutSubjectConfig(ctx, subject, compatLevel)
if err != nil {
return nil, err
}
return &SchemaRegistryConfig{Compatibility: config.Compatibility}, nil
}

// DeleteSchemaRegistrySubjectConfig deletes the subject's compatibility level.
func (s *Service) DeleteSchemaRegistrySubjectConfig(ctx context.Context, subject string) (*SchemaRegistryConfig, error) {
config, err := s.kafkaSvc.SchemaService.DeleteSubjectConfig(ctx, subject)
if err != nil {
return nil, err
}
return &SchemaRegistryConfig{Compatibility: config.Compatibility}, nil
}

// GetSchemaRegistrySubjects returns a list of all register subjects. The list includes
// soft-deleted subjects.
func (s *Service) GetSchemaRegistrySubjects(ctx context.Context) ([]SchemaRegistrySubject, error) {
Expand Down
2 changes: 2 additions & 0 deletions backend/pkg/console/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Servicer interface {
GetSchemaRegistryMode(ctx context.Context) (*SchemaRegistryMode, error)
GetSchemaRegistryConfig(ctx context.Context) (*SchemaRegistryConfig, error)
PutSchemaRegistryConfig(ctx context.Context, compatLevel schema.CompatibilityLevel) (*SchemaRegistryConfig, error)
PutSchemaRegistrySubjectConfig(ctx context.Context, subject string, compatLevel schema.CompatibilityLevel) (*SchemaRegistryConfig, error)
DeleteSchemaRegistrySubjectConfig(ctx context.Context, subject string) (*SchemaRegistryConfig, error)
GetSchemaRegistrySubjects(ctx context.Context) ([]SchemaRegistrySubject, error)
GetSchemaRegistrySubjectDetails(ctx context.Context, subjectName string, version string) (*SchemaRegistrySubjectDetails, error)
GetSchemaRegistrySchemaReferencedBy(ctx context.Context, subjectName, version string) ([]SchemaReference, error)
Expand Down
66 changes: 66 additions & 0 deletions backend/pkg/schema/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,72 @@ func (c *Client) GetSubjectConfig(ctx context.Context, subject string) (*ConfigR
return parsed, nil
}

// PutSubjectConfig sets compatibility level for a given subject.
// If the subject you ask about does not have a subject-specific compatibility level set, this command returns an
// error code.
func (c *Client) PutSubjectConfig(ctx context.Context, subject string, compatLevel CompatibilityLevel) (*PutConfigResponse, error) {
type requestPayload struct {
Compatibility CompatibilityLevel `json:"compatibility"`
}
payload := requestPayload{Compatibility: compatLevel}

res, err := c.client.R().
SetContext(ctx).
SetResult(&PutConfigResponse{}).
SetBody(&payload).
SetPathParam("subject", subject).
Put("/config/{subject}")
if err != nil {
return nil, fmt.Errorf("put config for subject failed: %w", err)
}

if res.IsError() {
restErr, ok := res.Error().(*RestError)
if !ok {
return nil, fmt.Errorf("put config for subject failed: Status code %d", res.StatusCode())
}

return nil, restErr
}

parsed, ok := res.Result().(*PutConfigResponse)
if !ok {
return nil, fmt.Errorf("failed to parse config for subject response")
}

return parsed, nil
}

// DeleteSubjectConfig deletes compatibility level for a given subject.
// If the subject you ask about does not have a subject-specific compatibility level set, this command returns an
// error code.
func (c *Client) DeleteSubjectConfig(ctx context.Context, subject string) (*ConfigResponse, error) {
res, err := c.client.R().
SetContext(ctx).
SetResult(&ConfigResponse{}).
SetPathParam("subject", subject).
Delete("/config/{subject}")
if err != nil {
return nil, fmt.Errorf("delete config for subject failed: %w", err)
}

if res.IsError() {
restErr, ok := res.Error().(*RestError)
if !ok {
return nil, fmt.Errorf("delete config for subject failed: Status code %d", res.StatusCode())
}

return nil, restErr
}

parsed, ok := res.Result().(*ConfigResponse)
if !ok {
return nil, fmt.Errorf("failed to parse config for subject response")
}

return parsed, nil
}

// DeleteSubjectResponse describes the response to deleting a whole subject.
type DeleteSubjectResponse struct {
Versions []int
Expand Down
10 changes: 10 additions & 0 deletions backend/pkg/schema/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ func (s *Service) GetSubjectConfig(ctx context.Context, subject string) (*Config
return s.registryClient.GetSubjectConfig(ctx, subject)
}

// PutSubjectConfig puts compatibility level for a given subject.
func (s *Service) PutSubjectConfig(ctx context.Context, subject string, compatLevel CompatibilityLevel) (*PutConfigResponse, error) {
return s.registryClient.PutSubjectConfig(ctx, subject, compatLevel)
}

// DeleteSubjectConfig puts compatibility level for a given subject.
func (s *Service) DeleteSubjectConfig(ctx context.Context, subject string) (*ConfigResponse, error) {
return s.registryClient.DeleteSubjectConfig(ctx, subject)
}

// DeleteSubject deletes a schema registry subject.
func (s *Service) DeleteSubject(ctx context.Context, subject string, deletePermanently bool) (*DeleteSubjectResponse, error) {
return s.registryClient.DeleteSubject(ctx, subject, deletePermanently)
Expand Down

0 comments on commit f2c9f74

Please sign in to comment.