Skip to content

Commit

Permalink
backend: add endpoint to retrieve schema types
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 14, 2023
1 parent 11a9909 commit 261e523
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 1 deletion.
22 changes: 21 additions & 1 deletion backend/pkg/api/handle_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ func (api *API) handleGetSchemaRegistryMode() http.HandlerFunc {
}
}

func (api *API) handleGetSchemaRegistrySchemaTypes() http.HandlerFunc {
if !api.Cfg.Kafka.Schema.Enabled {
return api.handleSchemaRegistryNotConfigured()
}

return func(w http.ResponseWriter, r *http.Request) {
res, err := api.ConsoleSvc.GetSchemaRegistrySchemaTypes(r.Context())
if err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusBadGateway,
Message: fmt.Sprintf("Failed to retrieve schema registry types from the schema registry: %v", err.Error()),
IsSilent: false,
})
return
}
rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}

func (api *API) handleGetSchemaRegistryConfig() http.HandlerFunc {
if !api.Cfg.Kafka.Schema.Enabled {
return api.handleSchemaRegistryNotConfigured()
Expand All @@ -64,7 +84,7 @@ func (api *API) handleGetSchemaRegistryConfig() http.HandlerFunc {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusBadGateway,
Message: fmt.Sprintf("Failed to retrieve schema registry config from the schema registry: %v", err.Error()),
Message: fmt.Sprintf("Failed to retrieve schema registry types from the schema registry: %v", err.Error()),
IsSilent: false,
})
return
Expand Down
1 change: 1 addition & 0 deletions backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (api *API) routes() *chi.Mux {
r.Get("/schema-registry/mode", api.handleGetSchemaRegistryMode())
r.Get("/schema-registry/config", api.handleGetSchemaRegistryConfig())
r.Get("/schema-registry/subjects", api.handleGetSchemaSubjects())
r.Get("/schema-registry/schemas/types", api.handleGetSchemaRegistrySchemaTypes())
r.Delete("/schema-registry/subjects/{subject}", api.handleDeleteSubject())
r.Delete("/schema-registry/subjects/{subject}/versions/{version}", api.handleDeleteSubjectVersion())
r.Get("/schema-registry/subjects/{subject}/versions/{version}", api.handleGetSchemaSubjectDetails())
Expand Down
14 changes: 14 additions & 0 deletions backend/pkg/console/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,17 @@ func (s *Service) DeleteSchemaRegistrySubjectVersion(ctx context.Context, subjec
}
return &SchemaRegistryDeleteSubjectVersionResponse{DeletedVersion: res.Version}, nil
}

// SchemaRegistrySchemaTypes describe the schema types that are supported by the schema registry.
type SchemaRegistrySchemaTypes struct {
SchemaTypes []string `json:"schemaTypes"`
}

// GetSchemaRegistrySchemaTypes returns the supported schema types.
func (s *Service) GetSchemaRegistrySchemaTypes(ctx context.Context) (*SchemaRegistrySchemaTypes, error) {
res, err := s.kafkaSvc.SchemaService.GetSchemaTypes(ctx)
if err != nil {
return nil, err
}
return &SchemaRegistrySchemaTypes{SchemaTypes: res}, nil
}
1 change: 1 addition & 0 deletions backend/pkg/console/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ type Servicer interface {
GetSchemaRegistrySubjectDetails(ctx context.Context, subjectName string, version string) (*SchemaRegistrySubjectDetails, error)
DeleteSchemaRegistrySubject(ctx context.Context, subjectName string, deletePermanently bool) (*SchemaRegistryDeleteSubjectResponse, error)
DeleteSchemaRegistrySubjectVersion(ctx context.Context, subject, version string, deletePermanently bool) (*SchemaRegistryDeleteSubjectVersionResponse, error)
GetSchemaRegistrySchemaTypes(ctx context.Context) (*SchemaRegistrySchemaTypes, error)
}

0 comments on commit 261e523

Please sign in to comment.