diff --git a/src/Memphis.Client.UnitTests/AvroValidatorTest.cs b/src/Memphis.Client.UnitTests/AvroValidatorTest.cs new file mode 100644 index 0000000..67d55c8 --- /dev/null +++ b/src/Memphis.Client.UnitTests/AvroValidatorTest.cs @@ -0,0 +1,79 @@ +using System.Collections.Concurrent; +using System.Text; +using GraphQL.Types; +using Memphis.Client.Exception; +using Memphis.Client.UnitTests.Validators.TestData; +using Memphis.Client.Validators; +using Xunit; + +namespace Memphis.Client.UnitTests.Validators; + +public class AvroValidatorTest +{ + + + private readonly ISchemaValidator _validator; + + public AvroValidatorTest() + { + _validator = new AvroValidator(); + } + + #region AvroValidatorTest.ParseAndStore + [Theory] + [MemberData(nameof(AvroValidatorTestData.ValidSchema), MemberType = typeof(AvroValidatorTestData))] + public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string validSchema) + { + var actual = _validator.ParseAndStore("vaid-schema-001", validSchema); + + Assert.True(actual); + } + + + [Theory] + [MemberData(nameof(AvroValidatorTestData.InvalidSchema), MemberType = typeof(AvroValidatorTestData))] + public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string invalidSchema) + { + var actual = _validator.ParseAndStore("invalid-schema-001", invalidSchema); + + Assert.False(actual); + } + + #endregion + + + #region AvroValidatorTest.ValidateAsync + + [Theory] + [MemberData(nameof(AvroValidatorTestData.ValidSchemaDetail), MemberType = typeof(AvroValidatorTestData))] + public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string schemaKey, string schema, byte[] msg) + { + _validator.ParseAndStore(schemaKey, schema); + + var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(msg, schemaKey)); + + Assert.Null(exception); + } + + [Theory] + [MemberData(nameof(AvroValidatorTestData.InvalidSchemaDetail), MemberType = typeof(AvroValidatorTestData))] + public async Task ShouldDoThrow_WhenValidateAsync_WhereInvalidDataPassed(string schemaKey, string schema, byte[] msg) + { + _validator.ParseAndStore(schemaKey, schema); + + await Assert.ThrowsAsync( + () => _validator.ValidateAsync(msg, schemaKey)); + } + + [Theory] + [MemberData(nameof(AvroValidatorTestData.Message), MemberType = typeof(AvroValidatorTestData))] + public async Task ShouldDoThrow_WhenValidateAsync_WhereSchemaNotFoundInCache(byte[] msg) + { + var nonexistentSchema = Guid.NewGuid().ToString(); + + await Assert.ThrowsAsync( + () => _validator.ValidateAsync(msg, nonexistentSchema)); + } + + #endregion +} diff --git a/src/Memphis.Client.UnitTests/Validators/TestData/AvroValidatorTestData.cs b/src/Memphis.Client.UnitTests/Validators/TestData/AvroValidatorTestData.cs new file mode 100644 index 0000000..3a69982 --- /dev/null +++ b/src/Memphis.Client.UnitTests/Validators/TestData/AvroValidatorTestData.cs @@ -0,0 +1,75 @@ +using System.ComponentModel; +using System.Runtime.Serialization; +using System.Text; +using SolTechnology.Avro; +using SolTechnology.Avro.Infrastructure.Attributes; + +namespace Memphis.Client.UnitTests; + + +public class ValidUserModel +{ + [DataMember(Name = "name")] + public string Name { get; set; } = null!; + + [DataMember(Name = "age")] + public long Age { get; set; } +} + +public class InvalidUserModel +{ + + [DataMember(Name = "email")] + public string Email { get; set; } = null!; +} + + +public class AvroValidatorTestData +{ + public static List ValidSchema => new() + { + new object[] { validUser }, + }; + + public static List InvalidSchema => new() + { + new object[] { invalidSch1 }, + }; + + public static List Message => new() + { + new object[] { validUserData }, + }; + + public static List ValidSchemaDetail => new() + { + new object[] { "valid-simple-msg", validUser, validUserData }, + }; + + public static List InvalidSchemaDetail => new() + { + new object[] { "invalid-simple-msg", validUser, invalidMsg }, + }; + + + private static readonly string validUser = AvroConvert.GenerateSchema(typeof(ValidUserModel)); + + + private const string invalidSch1 = "This is invalid avro schema"; + + + private readonly static byte[] validUserData = AvroConvert.Serialize(new ValidUserModel + { + Name = "John Doe", + Age = 30 + }); + + + private readonly static byte[] invalidMsg = AvroConvert.Serialize(new InvalidUserModel + { + Email = "test@web.com" + }); + +} + + diff --git a/src/Memphis.Client/Constants/MemphisConstants.cs b/src/Memphis.Client/Constants/MemphisConstants.cs index e98d434..d8b51c5 100644 --- a/src/Memphis.Client/Constants/MemphisConstants.cs +++ b/src/Memphis.Client/Constants/MemphisConstants.cs @@ -47,8 +47,6 @@ public static class MemphisSchemaTypes public const string JSON = "json"; public const string GRAPH_QL = "graphql"; public const string PROTO_BUF = "protobuf"; - - // currently unsupported internal const string AVRO = "avro"; } diff --git a/src/Memphis.Client/Helper/MessageSerializer.cs b/src/Memphis.Client/Helper/MessageSerializer.cs new file mode 100644 index 0000000..a74b18c --- /dev/null +++ b/src/Memphis.Client/Helper/MessageSerializer.cs @@ -0,0 +1,40 @@ +using System; +using System.IO; +using System.Text; +using Memphis.Client.Constants; +using Memphis.Client.Exception; +using Newtonsoft.Json; +using SolTechnology.Avro; + +namespace Memphis.Client; + +public class MessageSerializer +{ + /// + /// Serialize the object to the specified schema type. The supported schema types are: json, graphql, protobuf, avro + /// + /// + /// + /// + /// + /// + public static byte[] Serialize(T obj, string schemaType) where T : class + { + + return schemaType switch + { + MemphisSchemaTypes.JSON or + MemphisSchemaTypes.GRAPH_QL => Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(obj)), + MemphisSchemaTypes.PROTO_BUF => SerializeProtoBuf(obj), + MemphisSchemaTypes.AVRO => AvroConvert.Serialize(obj), + _ => throw new MemphisException("Unsupported schema type, the supported schema types are: json, graphql, protobuf, avro"), + }; + + static byte[] SerializeProtoBuf(TData obj) where TData : class + { + using var stream = new MemoryStream(); + ProtoBuf.Serializer.Serialize(stream, obj); + return stream.ToArray(); + } + } +} \ No newline at end of file diff --git a/src/Memphis.Client/IMemphisClient.cs b/src/Memphis.Client/IMemphisClient.cs index 2b3dbc5..ae6ace4 100644 --- a/src/Memphis.Client/IMemphisClient.cs +++ b/src/Memphis.Client/IMemphisClient.cs @@ -9,6 +9,7 @@ using Memphis.Client.Station; namespace Memphis.Client; +#pragma warning disable CS8625 public interface IMemphisClient : IDisposable { @@ -126,4 +127,6 @@ Task ProduceAsync( Task CreateConsumer(FetchMessageOptions fetchMessageOptions); -} \ No newline at end of file +} + +#pragma warning restore CS8625 \ No newline at end of file diff --git a/src/Memphis.Client/Memphis.Client.csproj b/src/Memphis.Client/Memphis.Client.csproj index 34441b7..c0f32ed 100644 --- a/src/Memphis.Client/Memphis.Client.csproj +++ b/src/Memphis.Client/Memphis.Client.csproj @@ -17,6 +17,7 @@ + diff --git a/src/Memphis.Client/MemphisClient.cs b/src/Memphis.Client/MemphisClient.cs index a8ee948..ecc9e57 100644 --- a/src/Memphis.Client/MemphisClient.cs +++ b/src/Memphis.Client/MemphisClient.cs @@ -591,9 +591,8 @@ static void EnsureSchemaTypeIsValid(string type) case MemphisSchemaTypes.JSON: case MemphisSchemaTypes.PROTO_BUF: case MemphisSchemaTypes.GRAPH_QL: - return; case MemphisSchemaTypes.AVRO: - throw new MemphisException("Avro schema type is not supported at this time"); + return; default: throw new MemphisException("Unsupported schema type"); } @@ -622,6 +621,18 @@ static void HandleSchemaCreationErrorResponse(byte[] responseBytes) } } + internal string GetStationSchemaType(string internalStationName) + { + if (_schemaUpdateDictionary.TryGetValue(internalStationName, + out ProducerSchemaUpdateInit schemaUpdateInit)) + { + return schemaUpdateInit.SchemaType; + } + + return string.Empty; + } + + internal async Task ValidateMessageAsync(byte[] message, string internalStationName, string producerName) { if (!_schemaUpdateDictionary.TryGetValue(internalStationName, @@ -926,6 +937,11 @@ private void RegisterSchemaValidators() { throw new InvalidOperationException($"Unable to register schema validator: {nameof(ProtoBufValidator)}"); } + + if (!_schemaValidators.TryAdd(ValidatorType.AVRO, new AvroValidator())) + { + throw new InvalidOperationException($"Unable to register schema validator: {nameof(AvroValidator)}"); + } } private async Task ListenForSchemaUpdate(string internalStationName, ProducerSchemaUpdateInit schemaUpdateInit) @@ -1023,8 +1039,8 @@ void SyncSdkClientUpdate() try { _sdkClientUpdateSemaphore.WaitAsync(); - bool sdkClientShouldUpdate = sdkClientUpdate.Update ?? false; - switch (sdkClientUpdate.Type) + bool sdkClientShouldUpdate = sdkClientUpdate!.Update ?? false; + switch (sdkClientUpdate!.Type) { case MemphisSdkClientUpdateTypes.SEND_NOTIFICATION: _clusterConfigurations.AddOrUpdate(sdkClientUpdate.Type, sdkClientShouldUpdate, (key, _) => sdkClientShouldUpdate); diff --git a/src/Memphis.Client/Producer/MemphisProducer.cs b/src/Memphis.Client/Producer/MemphisProducer.cs index bf9444b..98e7f85 100644 --- a/src/Memphis.Client/Producer/MemphisProducer.cs +++ b/src/Memphis.Client/Producer/MemphisProducer.cs @@ -142,15 +142,27 @@ async Task ReInitializeProducerAndRetry(byte[] message, NameValueCollection head public async Task ProduceAsync(T message, NameValueCollection headers, int ackWaitMs = 15_000, string? messageId = default) { - string encodedMessage = IsPrimitiveType(message) ? - message.ToString() : - JsonConvert.SerializeObject(message); await ProduceAsync( - Encoding.UTF8.GetBytes(encodedMessage), + SerializeMessage(message), headers, ackWaitMs, messageId); + + byte[] SerializeMessage(T message) + { + if(IsPrimitiveType(message)) + return Encoding.UTF8.GetBytes(message.ToString()); + var schemaType = _memphisClient.GetStationSchemaType(_internalStationName); + return schemaType switch + { + MemphisSchemaTypes.JSON or + MemphisSchemaTypes.GRAPH_QL or + MemphisSchemaTypes.PROTO_BUF or + MemphisSchemaTypes.AVRO => MessageSerializer.Serialize(message!, schemaType), + _ => Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), + }; + } } /// diff --git a/src/Memphis.Client/Validators/AvroValidator.cs b/src/Memphis.Client/Validators/AvroValidator.cs new file mode 100644 index 0000000..2fc23a1 --- /dev/null +++ b/src/Memphis.Client/Validators/AvroValidator.cs @@ -0,0 +1,48 @@ +using System.Threading.Tasks; +using SolTechnology.Avro; + +using Memphis.Client.Exception; +using System.Runtime.Serialization; +using Newtonsoft.Json; + +namespace Memphis.Client.Validators; + +internal class AnotherUserModel +{ + [DataMember(Name = "name")] + public string Name { get; set; } = null!; + + [DataMember(Name = "age")] + public long Age { get; set; } +} + +internal class AvroValidator : SchemaValidator, ISchemaValidator +{ + public Task ValidateAsync(byte[] messageToValidate, string schemaKey) + { + if (!_schemaCache.TryGetValue(schemaKey, out var schemaObj)) + throw new MemphisSchemaValidationException($"Schema: {schemaKey} not found in local cache"); + try + { + _ = AvroConvert.Avro2Json(messageToValidate, schemaObj); + return Task.CompletedTask; + } + catch (System.Exception exception) + { + throw new MemphisSchemaValidationException($"Schema validation has failed: \n {exception.Message}", exception); + } + } + + protected override string Parse(string schemaData, string schemaName) + { + try + { + _ = AvroConvert.GenerateModel(schemaData); + return schemaData; + } + catch (System.Exception exception) + { + throw new MemphisException($"Schema parsing has failed: \n {exception.Message}", exception); + } + } +} \ No newline at end of file diff --git a/src/Memphis.Client/Validators/ValidatorType.cs b/src/Memphis.Client/Validators/ValidatorType.cs index d24b83a..8d71ada 100644 --- a/src/Memphis.Client/Validators/ValidatorType.cs +++ b/src/Memphis.Client/Validators/ValidatorType.cs @@ -4,6 +4,7 @@ internal enum ValidatorType { GRAPHQL = 1, JSON = 2, - PROTOBUF = 3 + PROTOBUF = 3, + AVRO = 4 } } \ No newline at end of file