Skip to content

Commit

Permalink
backend: start serde service tests and minor fixes and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Aug 15, 2023
1 parent 124b4d4 commit 82ed01f
Show file tree
Hide file tree
Showing 8 changed files with 461 additions and 6 deletions.
4 changes: 4 additions & 0 deletions backend/pkg/msgpack/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func NewService(cfg config.Msgpack) (*Service, error) {

// IsTopicAllowed validates if a topicName is permitted as per the config regexes.
func (s *Service) IsTopicAllowed(topicName string) bool {
if !s.cfg.Enabled {
return false
}

isAllowed := false
for _, regex := range s.AllowedTopicsExpr {
if regex.MatchString(topicName) {
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/proto/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (s *Service) GetMessageDescriptor(topicName string, property RecordProperty
// 1. Otherwise check if the user has configured a mapping to a local proto type for this topic and record type
mapping, exists := s.mappingsByTopic[topicName]
if !exists {
return nil, fmt.Errorf("no prototype found for the given topic. Check your configured protobuf mappings")
return nil, fmt.Errorf("no prototype found for the given topic '%s'. Check your configured protobuf mappings", topicName)
}

protoTypeURL := ""
Expand Down
4 changes: 4 additions & 0 deletions backend/pkg/schema/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func (s *Service) compileProtoSchemas(schema SchemaVersionedResponse, schemaRepo
return descriptors[0], nil
}

func (s *Service) IsEnabled() bool {
return s.cfg.Enabled
}

// GetAvroSchemaByID loads the schema by the given schemaID and tries to parse the schema
// contents to an avro.Schema, so that it can be used for decoding Avro encoded messages.
func (s *Service) GetAvroSchemaByID(schemaID uint32) (avro.Schema, error) {
Expand Down
4 changes: 4 additions & 0 deletions backend/pkg/serde/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (d AvroSerde) DeserializePayload(record *kgo.Record, payloadType payloadTyp
return RecordPayload{}, fmt.Errorf("no schema registry configured")
}

if !d.SchemaSvc.IsEnabled() {
return RecordPayload{}, fmt.Errorf("schema registry configuration disabled")
}

payload := payloadFromRecord(record, payloadType)

if len(payload) <= 5 {
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/serde/msgpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type MsgPackSerde struct {
}

func (MsgPackSerde) Name() PayloadEncoding {
return payloadEncodingAvro
return payloadEncodingMsgPack
}

func (d MsgPackSerde) DeserializePayload(record *kgo.Record, payloadType payloadType) (RecordPayload, error) {
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/serde/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestProtobufSerde_DeserializePayload(t *testing.T) {
payloadType: payloadTypeKey,
validationFunc: func(t *testing.T, payload RecordPayload, err error) {
require.Error(t, err)
assert.Equal(t, "failed to get message descriptor for payload: no prototype found for the given topic. Check your configured protobuf mappings", err.Error())
assert.Equal(t, "failed to get message descriptor for payload: no prototype found for the given topic 'protobuf_serde_test_orders_123'. Check your configured protobuf mappings", err.Error())
},
},
}
Expand Down
6 changes: 3 additions & 3 deletions backend/pkg/serde/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewService(srService *schema.Service, protSvc *proto.Service, msgPackServic

// DeserializeRecord tries to deserialize a Kafka record into a struct that
// can be processed by the Frontend.
func (s *Service) DeserializeRecord(record *kgo.Record, opts deserializationOptions) *Record {
func (s *Service) DeserializeRecord(record *kgo.Record, opts DeserializationOptions) *Record {
// 1. Test if it's a known binary Format
if record.Topic == "__consumer_offsets" {
rec, err := s.deserializeConsumerOffset(record)
Expand Down Expand Up @@ -112,9 +112,9 @@ func (s *Service) deserializePayload(record *kgo.Record, payloadType payloadType
}
}

// deserializationOptions that can be provided by the requester to influence
// DeserializationOptions that can be provided by the requester to influence
// the deserialization.
type deserializationOptions struct {
type DeserializationOptions struct {
// KeyEncoding may be specified by the frontend to indicate that this
// encoding type shall be used to deserialize the key. This is helpful,
// if the requester knows that a primitive type like int16 is used, which couldn't
Expand Down
Loading

0 comments on commit 82ed01f

Please sign in to comment.