Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avro validation #189

Merged
merged 4 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ In cases where extra performance is needed the recommended way is to create a pr
and produce messages by using the produce function of it
```python
await memphis.produce(station_name='test_station_py', producer_name='prod_py',
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or bytearray/dict (schema validated station - avro schema)
generate_random_suffix=False, #defaults to false
ack_wait_sec=15, # defaults to 15
headers=headers, # default to {}
Expand All @@ -242,7 +242,7 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py',
With creating a producer
```python
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema)
ack_wait_sec=15) # defaults to 15
```

Expand All @@ -252,7 +252,7 @@ await producer.produce(
headers= Headers()
headers.add("key", "value")
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema)
headers=headers) # default to {}
```

Expand Down
15 changes: 12 additions & 3 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self):
self.proto_msgs = {}
self.graphql_schemas = {}
self.json_schemas = {}
self.avro_schemas = {}
self.cluster_configurations = {}
self.station_schemaverse_to_dls = {}
self.update_configurations_sub = {}
Expand Down Expand Up @@ -449,6 +450,14 @@ async def producer(
"active_version"
]["schema_content"]
)
elif (
self.schema_updates_data[internal_station_name]["type"] == "avro"
):
schema = self.schema_updates_data[internal_station_name][
"active_version"
]["schema_content"]
self.avro_schemas[internal_station_name] = json.loads(
schema)
producer = Producer(self, producer_name, station_name, real_name)
map_key = internal_station_name + "_" + real_name
self.producers_map[map_key] = producer
Expand Down Expand Up @@ -633,7 +642,7 @@ async def produce(
Args:
station_name (str): station name to produce messages into.
producer_name (str): name for the producer.
message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema or bytearray/dict (schema validated station - avro schema))
generate_random_suffix (bool): false by default, if true concatenate a random suffix to producer's name
ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15.
headers (dict, optional): Message headers, defaults to {}.
Expand Down Expand Up @@ -729,11 +738,11 @@ async def create_schema(self, schema_name, schema_type, schema_path):
"""Creates a new schema.
Args:.
schema_name (str): the name of the schema.
schema_type (str): the type of the schema json / graphql / protobuf.
schema_type (str): the type of the schema json / graphql / protobuf / avro.
schema_path (str): the path for the schema file
"""

if schema_type not in {'json', 'graphql', 'protobuf'}:
if schema_type not in {'json', 'graphql', 'protobuf', 'avro'}:
raise MemphisError("schema type not supported" + type)

try:
Expand Down
27 changes: 27 additions & 0 deletions memphis/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from graphql import validate as validate_graphql
from jsonschema import validate
import google.protobuf.json_format as protobuf_json_format
import fastavro
from memphis.exceptions import MemphisError, MemphisSchemaError
from memphis.headers import Headers
from memphis.utils import get_internal_name
Expand Down Expand Up @@ -44,6 +45,9 @@ async def validate_msg(self, message):
if schema_type == "graphql":
message = self.validate_graphql(message)
return message
if schema_type == "avro":
message = self.validate_avro_schema(message)
return message
if hasattr(message, "SerializeToString"):
msg_to_send = message.SerializeToString()
return msg_to_send
Expand Down Expand Up @@ -153,6 +157,28 @@ def validate_graphql(self, message):
e = "Invalid message format, expected GraphQL"
raise MemphisSchemaError("Schema validation has failed: " + str(e))


def validate_avro_schema(self, message):
try:
if isinstance(message, bytearray):
try:
message_obj = json.loads(message)
except Exception as e:
raise Exception("Expecting Avro format: " + str(e))
elif isinstance(message, dict):
message_obj = message
message = bytearray(json.dumps(message_obj).encode("utf-8"))
else:
raise Exception("Unsupported message type")

fastavro.validate(
message_obj,
self.connection.avro_schemas[self.internal_station_name],
)
return message
except fastavro.validation.ValidationError as e:
raise MemphisSchemaError("Schema validation has failed: " + str(e))

# pylint: disable=R0913
async def produce(
self,
Expand All @@ -172,6 +198,7 @@ async def produce(
- bytearray/dict (schema validated station - json schema)
- string/bytearray/graphql.language.ast.DocumentNode
(schema validated station - graphql schema)
- bytearray/dict (schema validated station - avro schema)
ack_wait_sec (int, optional): max time in seconds to wait for an ack from the broker.
Defaults to 15 sec.
headers (dict, optional): message headers, defaults to {}.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
url="https://github.com/memphisdev/memphis.py",
download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.6.tar.gz",
keywords=["message broker", "devtool", "streaming", "data"],
install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"],
install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core", "fastavro"],
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
Expand Down