Skip to content

Commit

Permalink
avro schema validation (#105)
Browse files Browse the repository at this point in the history
* avro schema validation

* adding map[string]interface{} data capability

* readme update
  • Loading branch information
Big-Vi authored Aug 1, 2023
1 parent 4031cce commit dda267f
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 9 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ c.Produce("station_name_c_produce", "producer_name_a", []byte("Hey There!"), []m

Creating a producer first (receiver function of the producer struct).
```go
p.Produce("<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>", memphis.AckWaitSec(15)) // defaults to 15 seconds
p.Produce("<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>", memphis.AckWaitSec(15)) // defaults to 15 seconds
```

### Add headers
Expand All @@ -235,7 +235,7 @@ hdrs := memphis.Headers{}
hdrs.New()
err := hdrs.Add("key", "value")
p.Produce(
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
memphis.AckWaitSec(15),
memphis.MsgHeaders(hdrs) // defaults to empty
)
Expand All @@ -246,7 +246,7 @@ For better performance. The client won't block requests while waiting for an ack

```go
p.Produce(
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
memphis.AckWaitSec(15),
memphis.AsyncProduce()
)
Expand All @@ -257,7 +257,7 @@ Stations are idempotent by default for 2 minutes (can be configured), Idempotenc

```go
p.Produce(
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
memphis.AckWaitSec(15),
memphis.MsgId("343")
)
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ require (

require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/hamba/avro/v2 v2.13.0 // indirect
github.com/jhump/protoreflect v1.13.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nats-server/v2 v2.9.5 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
Expand Down Expand Up @@ -37,11 +38,14 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/graph-gophers/graphql-go v1.4.0 h1:JE9wveRTSXwJyjdRd6bOQ7Ob5bewTUQ58Jv4OiVdpdE=
github.com/graph-gophers/graphql-go v1.4.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMNMPSVXA1yc=
github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
github.com/hamba/avro/v2 v2.13.0 h1:QY2uX2yvJTW0OoMKelGShvq4v1hqab6CxJrPwh0fnj0=
github.com/hamba/avro/v2 v2.13.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
Expand All @@ -50,11 +54,20 @@ github.com/jhump/protoreflect v1.13.0 h1:zrrZqa7JAc2YGgPSzZZkmUXJ5G6NRPdxOg/9t7I
github.com/jhump/protoreflect v1.13.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/nats-server/v2 v2.9.5 h1:TlduKZ9YGoM0n34Lhm6AN0zRFOt/G3jTy9mPxXnE6dU=
github.com/nats-io/nats-server/v2 v2.9.5/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g=
Expand Down
6 changes: 1 addition & 5 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,9 @@ func validateName(name, objectType string) error {
func validateSchemaType(schemaType string) error {
invalidTypeErrStr := "unsupported schema type"
invalidTypeErr := errors.New(invalidTypeErrStr)
invalidSupportTypeErrStr := "avro is not supported at this time"
invalidSupportTypeErr := errors.New(invalidSupportTypeErrStr)

if schemaType == "protobuf" || schemaType == "json" || schemaType == "graphql" {
if schemaType == "protobuf" || schemaType == "json" || schemaType == "graphql" || schemaType == "avro" {
return nil
} else if schemaType == "avro" {
return invalidSupportTypeErr
} else {
return invalidTypeErr
}
Expand Down
6 changes: 6 additions & 0 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ func TestCreateSchema(t *testing.T) {
fmt.Println("json Created!!")
}

err = c.CreateSchema("sdk_test_schema_avro", "avro", "./test_schemas/test.avsc")
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("avro Created!!")
}
}
72 changes: 72 additions & 0 deletions station.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/hamba/avro/v2"
"github.com/nats-io/nats.go"

graphqlParse "github.com/graph-gophers/graphql-go"
Expand Down Expand Up @@ -315,6 +316,7 @@ type schemaDetails struct {
msgDescriptor protoreflect.MessageDescriptor
jsonSchema *jsonschema.Schema
graphQlSchema *graphqlParse.Schema
avroSchema avro.Schema
}

func (c *Conn) listenToSchemaUpdates(stationName string) error {
Expand Down Expand Up @@ -429,6 +431,10 @@ func (sd *schemaDetails) handleSchemaUpdateInit(sui SchemaUpdateInit) {
if err := sd.compileGraphQl(); err != nil {
log.Println(err.Error())
}
} else if sd.schemaType == "avro" {
if err := sd.compileAvroSchema(); err != nil {
log.Println(err.Error())
}
}
}

Expand Down Expand Up @@ -484,6 +490,15 @@ func (sd *schemaDetails) compileGraphQl() error {
return nil
}

func (sd *schemaDetails) compileAvroSchema() error {
sch, err := avro.Parse(sd.activeVersion.Content)
if err != nil {
return memphisError(err)
}
sd.avroSchema = sch
return nil
}

func (sd *schemaDetails) validateMsg(msg any) ([]byte, error) {
switch sd.schemaType {
case "protobuf":
Expand All @@ -492,6 +507,8 @@ func (sd *schemaDetails) validateMsg(msg any) ([]byte, error) {
return sd.validJsonSchemaMsg(msg)
case "graphql":
return sd.validateGraphQlMsg(msg)
case "avro":
return sd.validAvroSchemaMsg(msg)
default:
return nil, memphisError(errors.New("invalid schema type"))
}
Expand Down Expand Up @@ -618,3 +635,58 @@ func (sd *schemaDetails) validateGraphQlMsg(msg any) ([]byte, error) {
}
return msgBytes, nil
}

func (sd *schemaDetails) validAvroSchemaMsg(msg any) ([]byte, error) {
var (
msgBytes []byte
err error
message interface{}
)

if err != nil {
log.Fatal(err)
}

switch msg.(type) {
case []byte:
msgBytes = msg.([]byte)
if err := json.Unmarshal(msgBytes, &message); err != nil {
err = errors.New("Bad Avro format - " + err.Error())
return nil, memphisError(err)
}
case map[string]interface{}:
msgBytes, err = json.Marshal(msg)
if err != nil {
return nil, memphisError(err)
}
if err := json.Unmarshal(msgBytes, &message); err != nil {
err = errors.New("Bad Avro format - " + err.Error())
return nil, memphisError(err)
}

default:
msgType := reflect.TypeOf(msg).Kind()
if msgType == reflect.Struct {
msgBytes, err = avro.Marshal(sd.avroSchema, msg)
if err != nil {
return nil, memphisError(err)
}
if err := avro.Unmarshal(sd.avroSchema, msgBytes, &message); err != nil {
return nil, memphisError(err)
}
// Serialize it back after validation and unmarshalling
msgBytes, err = json.Marshal(message)
if err != nil {
return nil, memphisError(err)
}
} else {
return nil, memphisError(errors.New("unsupported message type"))
}
}

if _, err = avro.Marshal(sd.avroSchema, message); err != nil {
return msgBytes, memphisError(err)
}

return msgBytes, nil
}
11 changes: 11 additions & 0 deletions test_schemas/test.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"type": "record",
"namespace": "com.example",
"name": "test_schema",
"fields": [
{ "name": "username", "type": "string", "default": "-2" },
{ "name": "age", "type": "int" },
{ "name": "phone", "type": "long" },
{ "name": "country", "type": "string", "default": "NONE" }
]
}

0 comments on commit dda267f

Please sign in to comment.