Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avro schema ui and validation #1107

Merged
merged 5 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,12 @@ func createTables(MetadataDbClient MetadataStorage) error {
ALTER TABLE schemas DROP CONSTRAINT IF EXISTS name;
ALTER TABLE schemas DROP CONSTRAINT IF EXISTS schemas_name_tenant_name_key;
ALTER TABLE schemas ADD CONSTRAINT schemas_name_tenant_name_key UNIQUE(name, tenant_name);
ALTER TYPE enum_type ADD VALUE 'avro';
END IF;
END $$;`

schemasTable := `
CREATE TYPE enum_type AS ENUM ('json', 'graphql', 'protobuf');
CREATE TYPE enum_type AS ENUM ('json', 'graphql', 'protobuf', 'avro');
CREATE TABLE IF NOT EXISTS schemas(
id SERIAL NOT NULL,
name VARCHAR NOT NULL,
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/docker/docker v20.10.24+incompatible
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/graph-gophers/graphql-go v1.5.0
github.com/hamba/avro/v2 v2.12.0
idanasulin2706 marked this conversation as resolved.
Show resolved Hide resolved
github.com/jackc/pgx/v5 v5.3.1
github.com/santhosh-tekuri/jsonschema/v5 v5.1.0
github.com/slack-go/slack v0.11.4
Expand Down Expand Up @@ -94,6 +95,7 @@ require (
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/memphisdev/memphis.go v1.0.4
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/spdystream v0.2.0 // indirect
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMNMPSVXA1yc=
github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
github.com/hamba/avro/v2 v2.12.0 h1:QZvbrfOfHQ7kZnlxRdwRU0opSf9ZrqlzpKzJuIUjIjU=
github.com/hamba/avro/v2 v2.12.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -280,6 +282,8 @@ github.com/memphisdev/memphis.go v1.0.4 h1:cftOPl+XRf3zdnY8egLny3NVhtFwwaqEsN67z
github.com/memphisdev/memphis.go v1.0.4/go.mod h1:VsFe2Wrght9LnzP2JWRA+tDeJS/12+3xxREWYieV6FQ=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 h1:HfkjXDfhgVaN5rmueG8cL8KKeFNecRCXFhaJ2qZ5SKA=
Expand Down
20 changes: 14 additions & 6 deletions server/memphis_handlers_schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/graph-gophers/graphql-go"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/santhosh-tekuri/jsonschema/v5"
"github.com/hamba/avro/v2"
)

type SchemasHandler struct{ S *Server }
Expand Down Expand Up @@ -75,6 +76,14 @@ func validateGraphqlSchemaContent(schemaContent string) error {
return nil
}

func validateAvroSchemaContent(schemaContent string) error {
_, err := avro.Parse(schemaContent)
if err != nil {
return fmt.Errorf("your Avro file is invalid: %v", err.Error())
}
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to
return fmt.Errorf("your Avro file is invalid: %v", err.Error())

}

func generateProtobufDescriptor(schemaName string, schemaVersionNum int, schemaContent string) ([]byte, error) {
filename := fmt.Sprintf("%v_%v.proto", schemaName, schemaVersionNum)
descFilename := fmt.Sprintf("%v_%v_desc", schemaName, schemaVersionNum)
Expand Down Expand Up @@ -113,13 +122,9 @@ func validateSchemaName(schemaName string) error {
func validateSchemaType(schemaType string) error {
invalidTypeErrStr := "unsupported schema type"
invalidTypeErr := errors.New(invalidTypeErrStr)
invalidSupportTypeErrStr := "avro is not supported at this time"
invalidSupportTypeErr := errors.New(invalidSupportTypeErrStr)

if schemaType == "protobuf" || schemaType == "json" || schemaType == "graphql" {
if schemaType == "protobuf" || schemaType == "json" || schemaType == "graphql" || schemaType == "avro" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to here but we have the "GetFilterDetails" endpoint in which you should also return the Avro type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Yep. missed that one.

return nil
} else if schemaType == "avro" {
return invalidSupportTypeErr
} else {
return invalidTypeErr
}
Expand Down Expand Up @@ -147,7 +152,10 @@ func validateSchemaContent(schemaContent, schemaType string) error {
return err
}
case "avro":
break
err := validateAvroSchemaContent(schemaContent)
if err != nil {
return err
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers_user_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ func (umh UserMgmtHandler) GetFilterDetails(c *gin.Context) {
return
}

schemaType := []string{"protobuf", "json", "graphql"}
schemaType := []string{"protobuf", "json", "graphql", "avro"}
usage := []string{"used", "not used"}
c.IndentedJSON(200, gin.H{"tags": tags, "users": users, "type": schemaType, "usage": usage})
return
Expand Down
1 change: 1 addition & 0 deletions ui_src/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"ajv": "^8.11.2",
"ajv-draft-04": "^1.0.0",
"antd": "4.23.1",
"avro-js": "^1.11.2",
"axios": "^0.25.0",
"buffer": "^6.0.3",
"chart.js": "^2.9.4",
Expand Down
30 changes: 22 additions & 8 deletions ui_src/src/domain/schema/components/createSchema/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { Context } from '../../../../hooks/store';
import Input from '../../../../components/Input';
import Modal from '../../../../components/modal';
import AttachStationModal from '../attachStationModal';
const avro = require('avro-js')

loader.init();
loader.config({ monaco });
Expand Down Expand Up @@ -73,15 +74,14 @@ const schemaTypes = [
},
{
id: 4,
value: 'avro',
label: 'Avro (Coming soon)',
value: 'Avro',
label: 'Avro',
description: (
<span>
The popular. Apache Avro™ is the leading serialization format for record data, and first choice for streaming data pipelines. It offers excellent schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note saying that at the moment it is only available here to create schemas but they are not enforced yet on the SDK.
Also, make sure that in case of an Avro schema attached to a station not causing issues for connected clients or for clients who trying to produce/consume data

Copy link
Contributor Author

@Big-Vi Big-Vi Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note saying that at the moment it is only available here to create schemas but they are not enforced yet on the SDK. Also, make sure that in case of an Avro schema attached to a station not causing issues for connected clients or for clients who trying to produce/consume data

I can't find any better way to prevent connected clients from error out if the Avro schema is attached to the station. Can we update all the SDKs and merge all PRs at the same time? I'm all ears if you have any other suggestion.
I already worked on Go, JS, and Python SDKs. I can create a pull request today or tomorrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So sure please open the PRs on the SDKs and we will merge this one only after it won't crash clients

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After merging the PRs of the SDKs please also update the readme file on this repo, you will find there a sdk features table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So sure please open the PRs on the SDKs and we will merge this one only after it won't crash clients

Sweet as. Just to let you know I'm only familiar with the above mentioned languages. If somebody can work on the remaining SDKs(Java, .Net, and Rust), that would be great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After merging the PRs of the SDKs please also update the readme file on this repo, you will find there a sdk features table

Yep. Sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So sure please open the PRs on the SDKs and we will merge this one only after it won't crash clients

Sweet as. Just to let you know I'm only familiar with the above mentioned languages. If somebody can work on the remaining SDKs(Java, .Net, and Rust), that would be great.

Sure would you mind to open issues on those repositories?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'm on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So sure please open the PRs on the SDKs and we will merge this one only after it won't crash clients

Sweet as. Just to let you know I'm only familiar with the above mentioned languages. If somebody can work on the remaining SDKs(Java, .Net, and Rust), that would be great.

Sure would you mind to open issues on those repositories?

Issues opened:
superstreamlabs/memphis.java#42
superstreamlabs/memphis.net#105

For Rust, there's already an open issue. Not sure i need to open another one.
turulix/memphis-rust-community#15

evolution.
</span>
),
disabled: true
)
}
];

Expand All @@ -96,15 +96,15 @@ message Test {
}`
},
Avro: {
language: 'avro',
language: 'json', // Avro stores the data definition in JSON format.
value: `{
"type": "record",
"namespace": "com.example",
"name": "test-schema",
"name": "test_schema",
"fields": [
{ "name": "username", "type": "string", "default": "-2" },
{ "name": "age", "type": "int", "default": "none" },
{ "name": "phone", "type": "int", "default": "NONE" },
{ "name": "age", "type": "int" },
{ "name": "phone", "type": "long" },
{ "name": "country", "type": "string", "default": "NONE" }
]
}`
Expand Down Expand Up @@ -355,6 +355,17 @@ function CreateSchema({ createNew }) {
}
};

const validateAvroSchema = (value) => {
try {
avro.parse(value);
setValidateSuccess('');
setValidateError('');
} catch (error) {
setValidateSuccess('');
setValidateError('Your schema is invalid');
}
};

const checkContent = (value) => {
const { type } = formFields;
if (value === ' ' || value === '') {
Expand All @@ -368,6 +379,8 @@ function CreateSchema({ createNew }) {
validateJsonSchema(value);
} else if (type === 'GraphQL') {
validateGraphQlSchema(value);
} else if (type === 'Avro') {
validateAvroSchema(value);
}
}
};
Expand Down Expand Up @@ -536,6 +549,7 @@ function CreateSchema({ createNew }) {
{formFields?.type === 'Protobuf' && schemaContentEditor}
{formFields?.type === 'Json' && schemaContentEditor}
{formFields?.type === 'GraphQL' && schemaContentEditor}
{formFields?.type === 'Avro' && schemaContentEditor}
</Form.Item>
<div className={validateError || validateSuccess ? (validateSuccess ? 'validate-note success' : 'validate-note error') : 'validate-note'}>
{validateError && <ErrorOutlineRounded />}
Expand Down
20 changes: 17 additions & 3 deletions ui_src/src/domain/schema/components/schemaDetails/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import OverflowTip from '../../../../components/tooltip/overflowtip';
import { validate, parse, buildASTSchema } from 'graphql';
import SegmentButton from '../../../../components/segmentButton';
import AttachStationModal from '../attachStationModal';
const avro = require('avro-js')

loader.init();
loader.config({ monaco });
Expand Down Expand Up @@ -258,6 +259,17 @@ function SchemaDetails({ schemaName, closeDrawer }) {
}
};

const validateAvroSchema = (value) => {
try {
avro.parse(value);
setValidateSuccess('');
setValidateError('');
} catch (error) {
setValidateSuccess('');
setValidateError('Your schema is invalid');
}
};

const checkContent = (value) => {
const { type } = schemaDetails;
if (value === ' ' || value === '') {
Expand All @@ -271,6 +283,8 @@ function SchemaDetails({ schemaName, closeDrawer }) {
validateJsonSchema(value);
} else if (type === 'graphql') {
validateGraphQlSchema(value);
} else if (type === 'avro') {
validateAvroSchema(value);
}
}
};
Expand Down Expand Up @@ -441,7 +455,7 @@ function SchemaDetails({ schemaName, closeDrawer }) {
fontSize: '14px',
fontFamily: 'Inter'
}}
language={schemaDetails?.type === 'protobuf' ? 'proto' : schemaDetails?.type}
language={schemaDetails?.type === 'protobuf' ? 'proto' : schemaDetails?.type === 'avro' ? 'json' : schemaDetails?.type}
height="calc(100% - 104px)"
defaultValue={versionSelected?.schema_content}
value={newVersion}
Expand All @@ -465,15 +479,15 @@ function SchemaDetails({ schemaName, closeDrawer }) {
fontSize: '14px',
fontFamily: 'Inter'
}}
language={schemaDetails?.type === 'protobuf' ? 'proto' : schemaDetails?.type}
language={schemaDetails?.type === 'protobuf' ? 'proto' : schemaDetails?.type === 'avro' ? 'json' : schemaDetails?.type}
height="calc(100% - 100px)"
value={versionSelected?.schema_content}
/>
)}
{isDiff === 'Yes' && (
<DiffEditor
height="calc(100% - 100px)"
language={schemaDetails?.type === 'protobuf' ? 'proto' : schemaDetails?.type}
language={schemaDetails?.type === 'protobuf' ? 'proto' : schemaDetails?.type === 'avro' ? 'json' : schemaDetails?.type}
original={currentVersion?.schema_content}
modified={versionSelected?.schema_content}
options={{
Expand Down