From c6d6f141e815181a2235f1fd8c79a517a09509e9 Mon Sep 17 00:00:00 2001 From: Taylor Jacovich Date: Thu, 18 May 2023 14:55:25 -0400 Subject: [PATCH] Added avro_serializer (#4) * Added avro_serializer. * Removed unsued sys call from avro_serializer. * Bringing in line with isort formatting. --- SciXPipelineUtils/avro_serializer.py | 48 ++++++++++++++++++++++ pyproject.toml | 2 +- tests/test_avro_serializer.py | 59 ++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 SciXPipelineUtils/avro_serializer.py create mode 100644 tests/test_avro_serializer.py diff --git a/SciXPipelineUtils/avro_serializer.py b/SciXPipelineUtils/avro_serializer.py new file mode 100644 index 0000000..2d5cad8 --- /dev/null +++ b/SciXPipelineUtils/avro_serializer.py @@ -0,0 +1,48 @@ +import io + +import avro.io +import avro.schema +from avro.schema import parse + + +class AvroSerialHelper: + def __init__(self, schema, logger=None): + """ + :param schema: The AVRO schema (str) + :param logger: Application logger + """ + self.schema = parse(schema) + self.logger = logger + + def avro_serializer(self, msg): + """ + :param msg: the json representation of the AVRO message + :return: serialized message (bitstream) + """ + writer = avro.io.DatumWriter(self.schema) + bytes_writer = io.BytesIO() + encoder = avro.io.BinaryEncoder(bytes_writer) + try: + writer.write(msg, encoder) + return bytes_writer.getvalue() + + except avro.errors.AvroTypeException as e: + print("Failed to serialize request with error: {} \nStopping.".format(e)) + raise e + + def avro_deserializer(self, raw_bytes): + """ + :param raw_bytes: The raw bitstream of an incoming AVRO message + :returns: The json representation of the AVRO message + """ + if self.logger: + self.logger.debug(raw_bytes) + bytes_reader = io.BytesIO(raw_bytes) + decoder = avro.io.BinaryDecoder(bytes_reader) + reader = avro.io.DatumReader(self.schema) + try: + return reader.read(decoder) + except Exception as e: + if self.logger: + self.logger.exception(e) + raise e diff --git a/pyproject.toml b/pyproject.toml index 00936b4..b26edcb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ packages = [ dependencies = [ 'boto3==1.26.59', + 'avro==1.11.1', ] [project.urls] @@ -41,7 +42,6 @@ dev = [ 'moto==4.1.3', 'confluent-kafka==1.9.2', 'fastavro==1.7.2', - 'avro==1.11.1', ] [tool.pytest.ini_options] diff --git a/tests/test_avro_serializer.py b/tests/test_avro_serializer.py new file mode 100644 index 0000000..19558e5 --- /dev/null +++ b/tests/test_avro_serializer.py @@ -0,0 +1,59 @@ +import json +from unittest import TestCase + +import avro +import avro_serializer +import pytest + + +class mock_gRPC_avro_msg: + def value(self): + return { + "hash": "g425897fh3qp35890u54256342ewferht242546", + "id": None, + "task": "SYMBOL1", + "status": None, + "task_args": { + "ingest": None, + "ingest_type": "metadata", + "daterange": "2023-03-07", + "persistence": None, + }, + } + + def bitstream(self): + return b"\x00Ng425897fh3qp35890u54256342ewferht242546\x02\x00\x00\x00\x02\x10metadata\x02\x142023-03-07\x02" + + +class TestAvroSerializer(TestCase): + def test_avro_serialization(self): + with open("tests/stubdata/AVRO_schemas/TEMPLATEInputSchema.avsc") as f: + schema_json = json.load(f) + msg = mock_gRPC_avro_msg().value() + serializer = avro_serializer.AvroSerialHelper(json.dumps(schema_json)) + bitstream = serializer.avro_serializer(msg) + self.assertEqual(bitstream, mock_gRPC_avro_msg().bitstream()) + + def test_avro_serialization_failure(self): + with open("tests/stubdata/AVRO_schemas/TEMPLATEInputSchema.avsc") as f: + schema_json = json.load(f) + msg = {} + serializer = avro_serializer.AvroSerialHelper(json.dumps(schema_json)) + with pytest.raises(avro.errors.AvroTypeException): + serializer.avro_serializer(msg) + + def test_avro_deserialization(self): + with open("tests/stubdata/AVRO_schemas/TEMPLATEInputSchema.avsc") as f: + schema_json = json.load(f) + serializer = avro_serializer.AvroSerialHelper(json.dumps(schema_json)) + bitstream = mock_gRPC_avro_msg().bitstream() + msg = serializer.avro_deserializer(bitstream) + self.assertEqual(msg, mock_gRPC_avro_msg().value()) + + def test_avro_deserialization_failure(self): + with open("tests/stubdata/AVRO_schemas/TEMPLATEInputSchema.avsc") as f: + schema_json = json.load(f) + serializer = avro_serializer.AvroSerialHelper(json.dumps(schema_json)) + bitstream = b"" + with pytest.raises(Exception): + serializer.avro_deserializer(bitstream)