From 3bdbac4fe1233b065e0734386511ef42a7a23a75 Mon Sep 17 00:00:00 2001 From: Vignesh Murugan Date: Tue, 1 Aug 2023 04:18:13 +0530 Subject: [PATCH] avro validation (#189) * avro validation * fixing linting issue --------- Co-authored-by: RJ Nowling <130711295+rnowling-memphis@users.noreply.github.com> --- README.md | 6 +++--- memphis/memphis.py | 15 ++++++++++++--- memphis/producer.py | 27 +++++++++++++++++++++++++++ setup.py | 2 +- 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 1a9da6f..aaf42a1 100644 --- a/README.md +++ b/README.md @@ -229,7 +229,7 @@ In cases where extra performance is needed the recommended way is to create a pr and produce messages by using the produce function of it ```python await memphis.produce(station_name='test_station_py', producer_name='prod_py', - message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or bytearray/dict (schema validated station - avro schema) generate_random_suffix=False, #defaults to false ack_wait_sec=15, # defaults to 15 headers=headers, # default to {} @@ -242,7 +242,7 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py', With creating a producer ```python await producer.produce( - message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema) ack_wait_sec=15) # defaults to 15 ``` @@ -252,7 +252,7 @@ await producer.produce( headers= Headers() headers.add("key", "value") await producer.produce( - message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema) headers=headers) # default to {} ``` diff --git a/memphis/memphis.py b/memphis/memphis.py index 5dbe6a4..0807f22 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -48,6 +48,7 @@ def __init__(self): self.proto_msgs = {} self.graphql_schemas = {} self.json_schemas = {} + self.avro_schemas = {} self.cluster_configurations = {} self.station_schemaverse_to_dls = {} self.update_configurations_sub = {} @@ -449,6 +450,14 @@ async def producer( "active_version" ]["schema_content"] ) + elif ( + self.schema_updates_data[internal_station_name]["type"] == "avro" + ): + schema = self.schema_updates_data[internal_station_name][ + "active_version" + ]["schema_content"] + self.avro_schemas[internal_station_name] = json.loads( + schema) producer = Producer(self, producer_name, station_name, real_name) map_key = internal_station_name + "_" + real_name self.producers_map[map_key] = producer @@ -633,7 +642,7 @@ async def produce( Args: station_name (str): station name to produce messages into. producer_name (str): name for the producer. - message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema or bytearray/dict (schema validated station - avro schema)) generate_random_suffix (bool): false by default, if true concatenate a random suffix to producer's name ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15. headers (dict, optional): Message headers, defaults to {}. @@ -729,11 +738,11 @@ async def create_schema(self, schema_name, schema_type, schema_path): """Creates a new schema. Args:. schema_name (str): the name of the schema. - schema_type (str): the type of the schema json / graphql / protobuf. + schema_type (str): the type of the schema json / graphql / protobuf / avro. schema_path (str): the path for the schema file """ - if schema_type not in {'json', 'graphql', 'protobuf'}: + if schema_type not in {'json', 'graphql', 'protobuf', 'avro'}: raise MemphisError("schema type not supported" + type) try: diff --git a/memphis/producer.py b/memphis/producer.py index 1609e5f..ab4bd34 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -11,6 +11,7 @@ from graphql import validate as validate_graphql from jsonschema import validate import google.protobuf.json_format as protobuf_json_format +import fastavro from memphis.exceptions import MemphisError, MemphisSchemaError from memphis.headers import Headers from memphis.utils import get_internal_name @@ -44,6 +45,9 @@ async def validate_msg(self, message): if schema_type == "graphql": message = self.validate_graphql(message) return message + if schema_type == "avro": + message = self.validate_avro_schema(message) + return message if hasattr(message, "SerializeToString"): msg_to_send = message.SerializeToString() return msg_to_send @@ -153,6 +157,28 @@ def validate_graphql(self, message): e = "Invalid message format, expected GraphQL" raise MemphisSchemaError("Schema validation has failed: " + str(e)) + + def validate_avro_schema(self, message): + try: + if isinstance(message, bytearray): + try: + message_obj = json.loads(message) + except Exception as e: + raise Exception("Expecting Avro format: " + str(e)) + elif isinstance(message, dict): + message_obj = message + message = bytearray(json.dumps(message_obj).encode("utf-8")) + else: + raise Exception("Unsupported message type") + + fastavro.validate( + message_obj, + self.connection.avro_schemas[self.internal_station_name], + ) + return message + except fastavro.validation.ValidationError as e: + raise MemphisSchemaError("Schema validation has failed: " + str(e)) + # pylint: disable=R0913 async def produce( self, @@ -172,6 +198,7 @@ async def produce( - bytearray/dict (schema validated station - json schema) - string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + - bytearray/dict (schema validated station - avro schema) ack_wait_sec (int, optional): max time in seconds to wait for an ack from the broker. Defaults to 15 sec. headers (dict, optional): message headers, defaults to {}. diff --git a/setup.py b/setup.py index 7261c02..2231170 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ url="https://github.com/memphisdev/memphis.py", download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.6.tar.gz", keywords=["message broker", "devtool", "streaming", "data"], - install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"], + install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core", "fastavro"], classifiers=[ "Development Status :: 4 - Beta", "Intended Audience :: Developers",