-
Notifications
You must be signed in to change notification settings - Fork 352
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
284 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
// Copyright 2023 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package serde | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
|
||
"github.com/twmb/franz-go/pkg/kgo" | ||
"github.com/zencoder/go-smile/smile" | ||
) | ||
|
||
var _ Serde = (*SmileSerde)(nil) | ||
|
||
type SmileSerde struct{} | ||
|
||
func (SmileSerde) Name() PayloadEncoding { | ||
return payloadEncodingSmile | ||
} | ||
|
||
func (SmileSerde) DeserializePayload(record *kgo.Record, payloadType payloadType) (RecordPayload, error) { | ||
payload := payloadFromRecord(record, payloadType) | ||
trimmed := bytes.TrimLeft(payload, " \t\r\n") | ||
|
||
if len(trimmed) == 0 { | ||
return RecordPayload{}, fmt.Errorf("after trimming whitespaces there was no characters left") | ||
} | ||
|
||
startsWithSmile := len(payload) > 3 && payload[0] == ':' && payload[1] == ')' && payload[2] == '\n' | ||
if !startsWithSmile { | ||
return RecordPayload{}, fmt.Errorf("first bytes indicate this it not valid Smile format") | ||
} | ||
|
||
obj, err := smile.DecodeToObject(payload) | ||
if err != nil { | ||
return RecordPayload{}, fmt.Errorf("failed to decode Smile payload: %w", err) | ||
} | ||
|
||
return RecordPayload{ | ||
ParsedPayload: obj, | ||
Encoding: payloadEncodingSmile, | ||
}, nil | ||
|
||
// obj, err := smile.DecodeToObject(payload) | ||
// if err == nil { | ||
// jsonBytes, err := json.Marshal(obj) | ||
// if err == nil { | ||
// return &deserializedPayload{ | ||
// Payload: normalizedPayload{ | ||
// Payload: jsonBytes, | ||
// RecognizedEncoding: messageEncodingSmile, | ||
// }, | ||
// IsPayloadNull: payload == nil, | ||
// Object: obj, | ||
// RecognizedEncoding: messageEncodingSmile, | ||
// Size: len(payload), | ||
// } | ||
// } | ||
// } | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
// Copyright 2023 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package serde | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"os" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
) | ||
|
||
func TestSmileSerde_DeserializePayload(t *testing.T) { | ||
serde := SmileSerde{} | ||
|
||
smileData, err := os.ReadFile("testdata/test2.smile") | ||
assert.NoError(t, err) | ||
assert.NotEmpty(t, smileData) | ||
|
||
tests := []struct { | ||
name string | ||
record *kgo.Record | ||
payloadType payloadType | ||
validationFunc func(t *testing.T, payload RecordPayload, err error) | ||
}{ | ||
{ | ||
name: "Valid Smile Object in value", | ||
record: &kgo.Record{ | ||
Value: smileData, | ||
}, | ||
payloadType: payloadTypeValue, | ||
validationFunc: func(t *testing.T, payload RecordPayload, err error) { | ||
require.NoError(t, err) | ||
assert.Nil(t, payload.Troubleshooting) | ||
assert.Nil(t, payload.SchemaID) | ||
assert.Equal(t, payloadEncodingSmile, payload.Encoding) | ||
|
||
fmt.Printf("type:%+T\n", payload.ParsedPayload) | ||
fmt.Printf("value:%#v\n", payload.ParsedPayload) | ||
|
||
jsonBytes, err := json.Marshal(payload.ParsedPayload) | ||
require.NoError(t, err) | ||
assert.Equal(t, `{"\"abKey\"":3,"foo":false}`, string(jsonBytes)) | ||
|
||
obj, ok := (payload.ParsedPayload).(map[string]any) | ||
require.Truef(t, ok, "parsed payload is not of type map[string]any") | ||
assert.Equal(t, false, obj["foo"]) | ||
}, | ||
}, | ||
{ | ||
name: "Invalid Smile", | ||
record: &kgo.Record{ | ||
Value: []byte(`this is no valid Smile`), | ||
}, | ||
payloadType: payloadTypeValue, | ||
validationFunc: func(t *testing.T, payload RecordPayload, err error) { | ||
assert.Error(t, err) | ||
}, | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
payload, err := serde.DeserializePayload(test.record, test.payloadType) | ||
test.validationFunc(t, payload, err) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
:) | ||
��"abKey"Ƃfoo"� |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
// Copyright 2023 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package serde | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"fmt" | ||
"strings" | ||
|
||
xj "github.com/basgys/goxml2json" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
) | ||
|
||
var _ Serde = (*XMLSerde)(nil) | ||
|
||
type XMLSerde struct{} | ||
|
||
func (XMLSerde) Name() PayloadEncoding { | ||
return payloadEncodingXML | ||
} | ||
|
||
func (XMLSerde) DeserializePayload(record *kgo.Record, payloadType payloadType) (RecordPayload, error) { | ||
payload := payloadFromRecord(record, payloadType) | ||
trimmed := bytes.TrimLeft(payload, " \t\r\n") | ||
|
||
if len(trimmed) == 0 { | ||
return RecordPayload{}, fmt.Errorf("after trimming whitespaces there was no character left") | ||
} | ||
|
||
startsWithXML := trimmed[0] == '<' | ||
if !startsWithXML { | ||
return RecordPayload{}, fmt.Errorf("first byte indicates this it not valid XML") | ||
} | ||
|
||
r := strings.NewReader(string(trimmed)) | ||
jsonPayload, err := xj.Convert(r) | ||
if err != nil { | ||
return RecordPayload{}, fmt.Errorf("error converting XML to JSON: %w", err) | ||
} | ||
|
||
var obj any | ||
err = json.Unmarshal(jsonPayload.Bytes(), &obj) | ||
if err != nil { | ||
return RecordPayload{}, fmt.Errorf("failed to parse JSON payload: %w", err) | ||
} | ||
|
||
return RecordPayload{ | ||
ParsedPayload: obj, | ||
Encoding: payloadEncodingXML, | ||
}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
// Copyright 2023 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package serde | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
) | ||
|
||
func TestXMLSerde_DeserializePayload(t *testing.T) { | ||
serde := XMLSerde{} | ||
|
||
tests := []struct { | ||
name string | ||
record *kgo.Record | ||
payloadType payloadType | ||
validationFunc func(t *testing.T, payload RecordPayload, err error) | ||
}{ | ||
{ | ||
name: "Valid XML Object in value", | ||
record: &kgo.Record{ | ||
Value: []byte(`<?xml version="1.0" encoding="UTF-8"?><name>John</name><age>30</age>`), | ||
}, | ||
payloadType: payloadTypeValue, | ||
validationFunc: func(t *testing.T, payload RecordPayload, err error) { | ||
require.NoError(t, err) | ||
assert.Nil(t, payload.Troubleshooting) | ||
assert.Nil(t, payload.SchemaID) | ||
assert.Equal(t, payloadEncodingXML, payload.Encoding) | ||
|
||
obj, ok := (payload.ParsedPayload).(map[string]any) | ||
require.Truef(t, ok, "parsed payload is not of type map[string]any") | ||
assert.Equal(t, "John", obj["name"]) | ||
assert.EqualValues(t, "30", obj["age"]) | ||
}, | ||
}, | ||
{ | ||
name: "Valid XML Object in key", | ||
record: &kgo.Record{ | ||
Key: []byte(`<?xml version="1.0" encoding="UTF-8"?><name>John</name><age>30</age>`), | ||
}, | ||
payloadType: payloadTypeKey, | ||
validationFunc: func(t *testing.T, payload RecordPayload, err error) { | ||
require.NoError(t, err) | ||
assert.Nil(t, payload.Troubleshooting) | ||
assert.Nil(t, payload.SchemaID) | ||
assert.Equal(t, payloadEncodingXML, payload.Encoding) | ||
}, | ||
}, | ||
{ | ||
name: "Invalid XML", | ||
record: &kgo.Record{ | ||
Value: []byte(`this is no valid XML`), | ||
}, | ||
payloadType: payloadTypeValue, | ||
validationFunc: func(t *testing.T, payload RecordPayload, err error) { | ||
assert.Error(t, err) | ||
}, | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
payload, err := serde.DeserializePayload(test.record, test.payloadType) | ||
test.validationFunc(t, payload, err) | ||
}) | ||
} | ||
} |