Skip to content

Commit

Permalink
avro validation (#189)
Browse files Browse the repository at this point in the history
* avro validation

* fixing linting issue

---------

Co-authored-by: RJ Nowling <[email protected]>
  • Loading branch information
Big-Vi and rnowling-memphis authored Jul 31, 2023
1 parent b0bcc97 commit 3bdbac4
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 7 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ In cases where extra performance is needed the recommended way is to create a pr
and produce messages by using the produce function of it
```python
await memphis.produce(station_name='test_station_py', producer_name='prod_py',
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or bytearray/dict (schema validated station - avro schema)
generate_random_suffix=False, #defaults to false
ack_wait_sec=15, # defaults to 15
headers=headers, # default to {}
Expand All @@ -242,7 +242,7 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py',
With creating a producer
```python
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema)
ack_wait_sec=15) # defaults to 15
```

Expand All @@ -252,7 +252,7 @@ await producer.produce(
headers= Headers()
headers.add("key", "value")
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema)
headers=headers) # default to {}
```

Expand Down
15 changes: 12 additions & 3 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self):
self.proto_msgs = {}
self.graphql_schemas = {}
self.json_schemas = {}
self.avro_schemas = {}
self.cluster_configurations = {}
self.station_schemaverse_to_dls = {}
self.update_configurations_sub = {}
Expand Down Expand Up @@ -449,6 +450,14 @@ async def producer(
"active_version"
]["schema_content"]
)
elif (
self.schema_updates_data[internal_station_name]["type"] == "avro"
):
schema = self.schema_updates_data[internal_station_name][
"active_version"
]["schema_content"]
self.avro_schemas[internal_station_name] = json.loads(
schema)
producer = Producer(self, producer_name, station_name, real_name)
map_key = internal_station_name + "_" + real_name
self.producers_map[map_key] = producer
Expand Down Expand Up @@ -633,7 +642,7 @@ async def produce(
Args:
station_name (str): station name to produce messages into.
producer_name (str): name for the producer.
message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema or bytearray/dict (schema validated station - avro schema))
generate_random_suffix (bool): false by default, if true concatenate a random suffix to producer's name
ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15.
headers (dict, optional): Message headers, defaults to {}.
Expand Down Expand Up @@ -729,11 +738,11 @@ async def create_schema(self, schema_name, schema_type, schema_path):
"""Creates a new schema.
Args:.
schema_name (str): the name of the schema.
schema_type (str): the type of the schema json / graphql / protobuf.
schema_type (str): the type of the schema json / graphql / protobuf / avro.
schema_path (str): the path for the schema file
"""

if schema_type not in {'json', 'graphql', 'protobuf'}:
if schema_type not in {'json', 'graphql', 'protobuf', 'avro'}:
raise MemphisError("schema type not supported" + type)

try:
Expand Down
27 changes: 27 additions & 0 deletions memphis/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from graphql import validate as validate_graphql
from jsonschema import validate
import google.protobuf.json_format as protobuf_json_format
import fastavro
from memphis.exceptions import MemphisError, MemphisSchemaError
from memphis.headers import Headers
from memphis.utils import get_internal_name
Expand Down Expand Up @@ -44,6 +45,9 @@ async def validate_msg(self, message):
if schema_type == "graphql":
message = self.validate_graphql(message)
return message
if schema_type == "avro":
message = self.validate_avro_schema(message)
return message
if hasattr(message, "SerializeToString"):
msg_to_send = message.SerializeToString()
return msg_to_send
Expand Down Expand Up @@ -153,6 +157,28 @@ def validate_graphql(self, message):
e = "Invalid message format, expected GraphQL"
raise MemphisSchemaError("Schema validation has failed: " + str(e))


def validate_avro_schema(self, message):
try:
if isinstance(message, bytearray):
try:
message_obj = json.loads(message)
except Exception as e:
raise Exception("Expecting Avro format: " + str(e))
elif isinstance(message, dict):
message_obj = message
message = bytearray(json.dumps(message_obj).encode("utf-8"))
else:
raise Exception("Unsupported message type")

fastavro.validate(
message_obj,
self.connection.avro_schemas[self.internal_station_name],
)
return message
except fastavro.validation.ValidationError as e:
raise MemphisSchemaError("Schema validation has failed: " + str(e))

# pylint: disable=R0913
async def produce(
self,
Expand All @@ -172,6 +198,7 @@ async def produce(
- bytearray/dict (schema validated station - json schema)
- string/bytearray/graphql.language.ast.DocumentNode
(schema validated station - graphql schema)
- bytearray/dict (schema validated station - avro schema)
ack_wait_sec (int, optional): max time in seconds to wait for an ack from the broker.
Defaults to 15 sec.
headers (dict, optional): message headers, defaults to {}.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
url="https://github.com/memphisdev/memphis.py",
download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.6.tar.gz",
keywords=["message broker", "devtool", "streaming", "data"],
install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"],
install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core", "fastavro"],
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
Expand Down

0 comments on commit 3bdbac4

Please sign in to comment.