Skip to content

Commit

Permalink
backend: set encoding in output in streamProgressReporter OnMessage()
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Aug 22, 2023
1 parent 440c5a6 commit fe55f00
Showing 1 changed file with 23 additions and 3 deletions.
26 changes: 23 additions & 3 deletions backend/pkg/api/stream_progress_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/redpanda-data/console/backend/pkg/console"
"github.com/redpanda-data/console/backend/pkg/kafka"
v1alpha "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/console/v1alpha"
"github.com/redpanda-data/console/backend/pkg/serde"
)

// streamProgressReporter is in charge of sending status updates and messages regularly to the frontend.
Expand Down Expand Up @@ -89,10 +90,29 @@ func (p *streamProgressReporter) OnMessageConsumed(size int64) {
func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {
encoding := v1alpha.PayloadEncoding_PAYLOAD_ENCODING_BINARY

// TODO set encoding on output
// switch message.Value.RecognizedEncoding {
switch message.Value.Encoding {
case serde.PayloadEncodingNone:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_NONE
case serde.PayloadEncodingAvro:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_AVRO
case serde.PayloadEncodingProtobuf:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF
case serde.PayloadEncodingJSON:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_JSON
case serde.PayloadEncodingXML:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_XML
case serde.PayloadEncodingText:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_TEXT
case serde.PayloadEncodingUtf8WithControlChars:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_UTF8
case serde.PayloadEncodingMsgPack:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_MESSAGE_PACK
case serde.PayloadEncodingSmile:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_SMILE
case serde.PayloadEncodingConsumerOffsets:
encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_CONSUMER_OFFSETS
}

// }
data := &v1alpha.ListMessagesResponse_DataMessage{
Value: &v1alpha.KafkaRecordPayload{
OriginalPayload: message.Value.OriginalPayload,
Expand Down

0 comments on commit fe55f00

Please sign in to comment.