Skip to content

Commit

Permalink
Added avro_serializer (#4)
Browse files Browse the repository at this point in the history
* Added avro_serializer.

* Removed unsued sys call from avro_serializer.

* Bringing in line with isort formatting.
  • Loading branch information
tjacovich authored May 18, 2023
1 parent 8e84ed5 commit c6d6f14
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 1 deletion.
48 changes: 48 additions & 0 deletions SciXPipelineUtils/avro_serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import io

import avro.io
import avro.schema
from avro.schema import parse


class AvroSerialHelper:
def __init__(self, schema, logger=None):
"""
:param schema: The AVRO schema (str)
:param logger: Application logger
"""
self.schema = parse(schema)
self.logger = logger

def avro_serializer(self, msg):
"""
:param msg: the json representation of the AVRO message
:return: serialized message (bitstream)
"""
writer = avro.io.DatumWriter(self.schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
try:
writer.write(msg, encoder)
return bytes_writer.getvalue()

except avro.errors.AvroTypeException as e:
print("Failed to serialize request with error: {} \nStopping.".format(e))
raise e

def avro_deserializer(self, raw_bytes):
"""
:param raw_bytes: The raw bitstream of an incoming AVRO message
:returns: The json representation of the AVRO message
"""
if self.logger:
self.logger.debug(raw_bytes)
bytes_reader = io.BytesIO(raw_bytes)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(self.schema)
try:
return reader.read(decoder)
except Exception as e:
if self.logger:
self.logger.exception(e)
raise e
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ packages = [

dependencies = [
'boto3==1.26.59',
'avro==1.11.1',
]

[project.urls]
Expand All @@ -41,7 +42,6 @@ dev = [
'moto==4.1.3',
'confluent-kafka==1.9.2',
'fastavro==1.7.2',
'avro==1.11.1',
]

[tool.pytest.ini_options]
Expand Down
59 changes: 59 additions & 0 deletions tests/test_avro_serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
from unittest import TestCase

import avro
import avro_serializer
import pytest


class mock_gRPC_avro_msg:
def value(self):
return {
"hash": "g425897fh3qp35890u54256342ewferht242546",
"id": None,
"task": "SYMBOL1",
"status": None,
"task_args": {
"ingest": None,
"ingest_type": "metadata",
"daterange": "2023-03-07",
"persistence": None,
},
}

def bitstream(self):
return b"\x00Ng425897fh3qp35890u54256342ewferht242546\x02\x00\x00\x00\x02\x10metadata\x02\x142023-03-07\x02"


class TestAvroSerializer(TestCase):
def test_avro_serialization(self):
with open("tests/stubdata/AVRO_schemas/TEMPLATEInputSchema.avsc") as f:
schema_json = json.load(f)
msg = mock_gRPC_avro_msg().value()
serializer = avro_serializer.AvroSerialHelper(json.dumps(schema_json))
bitstream = serializer.avro_serializer(msg)
self.assertEqual(bitstream, mock_gRPC_avro_msg().bitstream())

def test_avro_serialization_failure(self):
with open("tests/stubdata/AVRO_schemas/TEMPLATEInputSchema.avsc") as f:
schema_json = json.load(f)
msg = {}
serializer = avro_serializer.AvroSerialHelper(json.dumps(schema_json))
with pytest.raises(avro.errors.AvroTypeException):
serializer.avro_serializer(msg)

def test_avro_deserialization(self):
with open("tests/stubdata/AVRO_schemas/TEMPLATEInputSchema.avsc") as f:
schema_json = json.load(f)
serializer = avro_serializer.AvroSerialHelper(json.dumps(schema_json))
bitstream = mock_gRPC_avro_msg().bitstream()
msg = serializer.avro_deserializer(bitstream)
self.assertEqual(msg, mock_gRPC_avro_msg().value())

def test_avro_deserialization_failure(self):
with open("tests/stubdata/AVRO_schemas/TEMPLATEInputSchema.avsc") as f:
schema_json = json.load(f)
serializer = avro_serializer.AvroSerialHelper(json.dumps(schema_json))
bitstream = b""
with pytest.raises(Exception):
serializer.avro_deserializer(bitstream)

0 comments on commit c6d6f14

Please sign in to comment.