Skip to content

Commit

Permalink
backend: add endpoint to delete subject version
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 14, 2023
1 parent 1222c24 commit 11a9909
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 1 deletion.
100 changes: 100 additions & 0 deletions backend/pkg/api/handle_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,24 @@ func (api *API) handleDeleteSubject() http.HandlerFunc {
// 1. Parse request parameters
subjectName := rest.GetURLParam(r, "subject")

version := rest.GetURLParam(r, "version")
switch version {
case console.SchemaVersionsAll, console.SchemaVersionsLatest:
default:
// Must be number or it's invalid input
_, err := strconv.Atoi(version)
if err != nil {
descriptiveErr := fmt.Errorf("version %q is not valid. Must be %q, %q or a positive integer", version, console.SchemaVersionsLatest, console.SchemaVersionsAll)
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: descriptiveErr,
Status: http.StatusBadRequest,
Message: descriptiveErr.Error(),
IsSilent: false,
})
return
}
}

deletePermanentlyStr := rest.GetQueryParam(r, "permanent")
if deletePermanentlyStr == "" {
deletePermanentlyStr = "false"
Expand Down Expand Up @@ -198,3 +216,85 @@ func (api *API) handleDeleteSubject() http.HandlerFunc {
rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}

func (api *API) handleDeleteSubjectVersion() http.HandlerFunc {
if !api.Cfg.Kafka.Schema.Enabled {
return api.handleSchemaRegistryNotConfigured()
}

return func(w http.ResponseWriter, r *http.Request) {
// 1. Parse request parameters
subjectName := rest.GetURLParam(r, "subject")

version := rest.GetURLParam(r, "version")
switch version {
case console.SchemaVersionsLatest:
default:
// Must be number or it's invalid input
_, err := strconv.Atoi(version)
if err != nil {
descriptiveErr := fmt.Errorf("version %q is not valid. Must be %q or a positive integer", version, console.SchemaVersionsLatest)
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: descriptiveErr,
Status: http.StatusBadRequest,
Message: descriptiveErr.Error(),
IsSilent: false,
})
return
}
}

deletePermanentlyStr := rest.GetQueryParam(r, "permanent")
if deletePermanentlyStr == "" {
deletePermanentlyStr = "false"
}
deletePermanently, err := strconv.ParseBool(deletePermanentlyStr)
if err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: fmt.Errorf("failed to parse deletePermanently query param %q: %w", deletePermanentlyStr, err),
Status: http.StatusBadRequest,
Message: fmt.Sprintf("Failed to parse 'permanent' query param with value %q: %v", deletePermanentlyStr, err.Error()),
IsSilent: false,
})
return
}

// 2. Send delete request
res, err := api.ConsoleSvc.DeleteSchemaRegistrySubjectVersion(r.Context(), subjectName, version, deletePermanently)
if err != nil {
var schemaError *schema.RestError
if errors.As(err, &schemaError) && schemaError.ErrorCode == schema.CodeSubjectNotFound {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusNotFound,
Message: "Requested subject does not exist",
IsSilent: false,
})
return
}
if errors.As(err, &schemaError) && schemaError.ErrorCode == schema.CodeVersionNotFound {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusNotFound,
Message: "Requested version does not exist on the given subject",
IsSilent: false,
})
return
}

rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusServiceUnavailable,
Message: fmt.Sprintf("Failed to delete schema registry subject version: %v", err.Error()),
InternalLogs: []zapcore.Field{
zap.String("subject_name", subjectName),
zap.String("version", version),
},
IsSilent: false,
})
return
}

rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}
1 change: 1 addition & 0 deletions backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (api *API) routes() *chi.Mux {
r.Get("/schema-registry/config", api.handleGetSchemaRegistryConfig())
r.Get("/schema-registry/subjects", api.handleGetSchemaSubjects())
r.Delete("/schema-registry/subjects/{subject}", api.handleDeleteSubject())
r.Delete("/schema-registry/subjects/{subject}/versions/{version}", api.handleDeleteSubjectVersion())
r.Get("/schema-registry/subjects/{subject}/versions/{version}", api.handleGetSchemaSubjectDetails())

// Kafka Connect
Expand Down
14 changes: 14 additions & 0 deletions backend/pkg/console/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,17 @@ func (s *Service) DeleteSchemaRegistrySubject(ctx context.Context, subjectName s
}
return &SchemaRegistryDeleteSubjectResponse{DeletedVersions: res.Versions}, nil
}

// SchemaRegistryDeleteSubjectVersionResponse is the response to deleting a subject version.
type SchemaRegistryDeleteSubjectVersionResponse struct {
DeletedVersion int `json:"deletedVersion"`
}

// DeleteSchemaRegistrySubjectVersion deletes a schema registry subject version.
func (s *Service) DeleteSchemaRegistrySubjectVersion(ctx context.Context, subjectName, version string, deletePermanently bool) (*SchemaRegistryDeleteSubjectVersionResponse, error) {
res, err := s.kafkaSvc.SchemaService.DeleteSubjectVersion(ctx, subjectName, version, deletePermanently)
if err != nil {
return nil, err
}
return &SchemaRegistryDeleteSubjectVersionResponse{DeletedVersion: res.Version}, nil
}
2 changes: 2 additions & 0 deletions backend/pkg/console/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ type Servicer interface {
GetTopicsOverview(ctx context.Context) ([]*TopicSummary, error)
GetAllTopicNames(ctx context.Context, metadata *kmsg.MetadataResponse) ([]string, error)
GetTopicDetails(ctx context.Context, topicNames []string) ([]TopicDetails, *rest.Error)

GetSchemaRegistryMode(ctx context.Context) (*SchemaRegistryMode, error)
GetSchemaRegistryConfig(ctx context.Context) (*SchemaRegistryConfig, error)
GetSchemaRegistrySubjects(ctx context.Context) ([]SchemaRegistrySubject, error)
GetSchemaRegistrySubjectDetails(ctx context.Context, subjectName string, version string) (*SchemaRegistrySubjectDetails, error)
DeleteSchemaRegistrySubject(ctx context.Context, subjectName string, deletePermanently bool) (*SchemaRegistryDeleteSubjectResponse, error)
DeleteSchemaRegistrySubjectVersion(ctx context.Context, subject, version string, deletePermanently bool) (*SchemaRegistryDeleteSubjectVersionResponse, error)
}
34 changes: 33 additions & 1 deletion backend/pkg/schema/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,46 @@ func (c *Client) DeleteSubject(ctx context.Context, subject string, deletePerman
if res.IsError() {
restErr, ok := res.Error().(*RestError)
if !ok {
return nil, fmt.Errorf("get config for subject failed: Status code %d", res.StatusCode())
return nil, fmt.Errorf("delete subject failed: Status code %d", res.StatusCode())
}
return nil, restErr
}

return &DeleteSubjectResponse{deletedVersions}, nil
}

// DeleteSubjectVersionResponse describes the response to deleting a subject version.
type DeleteSubjectVersionResponse struct {
Version int
}

// DeleteSubjectVersion deletes a specific version of the subject. Unless you delete permanently,
// this only deletes the version, leaving the schema ID intact and making it still possible to
// decode data using the schema ID.
func (c *Client) DeleteSubjectVersion(ctx context.Context, subject, version string, deletePermanently bool) (*DeleteSubjectVersionResponse, error) {
var deletedVersion int
res, err := c.client.R().
SetContext(ctx).
SetResult(&deletedVersion).
SetPathParam("subject", subject).
SetPathParam("version", version).
SetQueryParam("permanent", strconv.FormatBool(deletePermanently)).
Delete("/subjects/{subject}/versions/{version}")
if err != nil {
return nil, fmt.Errorf("delete subject version failed: %w", err)
}

if res.IsError() {
restErr, ok := res.Error().(*RestError)
if !ok {
return nil, fmt.Errorf("delete subject version failed: Status code %d", res.StatusCode())
}
return nil, restErr
}

return &DeleteSubjectVersionResponse{deletedVersion}, nil
}

// GetSchemaTypes returns supported types (AVRO, PROTOBUF, JSON)
func (c *Client) GetSchemaTypes(ctx context.Context) ([]string, error) {
var supportedTypes []string
Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/schema/client_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const (
// CodeSubjectNotFound is the returned error code when the requested subject
// does not exist.
CodeSubjectNotFound = 40401

// CodeVersionNotFound is the returned error code when the requested version
// does not exist.
CodeVersionNotFound = 40402

// CodeSchemaNotFound is the returned error code when the requested schema
// does not exist.
CodeSchemaNotFound = 40403
Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/schema/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ func (s *Service) DeleteSubject(ctx context.Context, subject string, deletePerma
return s.registryClient.DeleteSubject(ctx, subject, deletePermanently)
}

// DeleteSubjectVersion deletes a specific version for a schema registry subject.
func (s *Service) DeleteSubjectVersion(ctx context.Context, subject, version string, deletePermanently bool) (*DeleteSubjectVersionResponse, error) {
return s.registryClient.DeleteSubjectVersion(ctx, subject, version, deletePermanently)
}

// ParseAvroSchemaWithReferences parses an avro schema that potentially has references
// to other schemas. References will be resolved by requesting and parsing them
// recursively. If any of the referenced schemas can't be fetched or parsed an
Expand Down

0 comments on commit 11a9909

Please sign in to comment.