Skip to content

Commit

Permalink
Added avro schema support (#116)
Browse files Browse the repository at this point in the history
* Added avro schema support

* Added MessageSerializer
  • Loading branch information
tbazen committed Aug 7, 2023
1 parent 277cbe5 commit 6b6c75e
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 12 deletions.
79 changes: 79 additions & 0 deletions src/Memphis.Client.UnitTests/AvroValidatorTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System.Collections.Concurrent;
using System.Text;
using GraphQL.Types;
using Memphis.Client.Exception;
using Memphis.Client.UnitTests.Validators.TestData;
using Memphis.Client.Validators;
using Xunit;

namespace Memphis.Client.UnitTests.Validators;

public class AvroValidatorTest
{


private readonly ISchemaValidator _validator;

public AvroValidatorTest()
{
_validator = new AvroValidator();
}

#region AvroValidatorTest.ParseAndStore
[Theory]
[MemberData(nameof(AvroValidatorTestData.ValidSchema), MemberType = typeof(AvroValidatorTestData))]
public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string validSchema)
{
var actual = _validator.ParseAndStore("vaid-schema-001", validSchema);

Assert.True(actual);
}


[Theory]
[MemberData(nameof(AvroValidatorTestData.InvalidSchema), MemberType = typeof(AvroValidatorTestData))]
public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string invalidSchema)
{
var actual = _validator.ParseAndStore("invalid-schema-001", invalidSchema);

Assert.False(actual);
}

#endregion


#region AvroValidatorTest.ValidateAsync

[Theory]
[MemberData(nameof(AvroValidatorTestData.ValidSchemaDetail), MemberType = typeof(AvroValidatorTestData))]
public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string schemaKey, string schema, byte[] msg)
{
_validator.ParseAndStore(schemaKey, schema);

var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(msg, schemaKey));

Assert.Null(exception);
}

[Theory]
[MemberData(nameof(AvroValidatorTestData.InvalidSchemaDetail), MemberType = typeof(AvroValidatorTestData))]
public async Task ShouldDoThrow_WhenValidateAsync_WhereInvalidDataPassed(string schemaKey, string schema, byte[] msg)
{
_validator.ParseAndStore(schemaKey, schema);

await Assert.ThrowsAsync<MemphisSchemaValidationException>(
() => _validator.ValidateAsync(msg, schemaKey));
}

[Theory]
[MemberData(nameof(AvroValidatorTestData.Message), MemberType = typeof(AvroValidatorTestData))]
public async Task ShouldDoThrow_WhenValidateAsync_WhereSchemaNotFoundInCache(byte[] msg)
{
var nonexistentSchema = Guid.NewGuid().ToString();

await Assert.ThrowsAsync<MemphisSchemaValidationException>(
() => _validator.ValidateAsync(msg, nonexistentSchema));
}

#endregion
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System.ComponentModel;
using System.Runtime.Serialization;
using System.Text;
using SolTechnology.Avro;
using SolTechnology.Avro.Infrastructure.Attributes;

namespace Memphis.Client.UnitTests;


public class ValidUserModel
{
[DataMember(Name = "name")]
public string Name { get; set; } = null!;

[DataMember(Name = "age")]
public long Age { get; set; }
}

public class InvalidUserModel
{

[DataMember(Name = "email")]
public string Email { get; set; } = null!;
}


public class AvroValidatorTestData
{
public static List<object[]> ValidSchema => new()
{
new object[] { validUser },
};

public static List<object[]> InvalidSchema => new()
{
new object[] { invalidSch1 },
};

public static List<object[]> Message => new()
{
new object[] { validUserData },
};

public static List<object[]> ValidSchemaDetail => new()
{
new object[] { "valid-simple-msg", validUser, validUserData },
};

public static List<object[]> InvalidSchemaDetail => new()
{
new object[] { "invalid-simple-msg", validUser, invalidMsg },
};


private static readonly string validUser = AvroConvert.GenerateSchema(typeof(ValidUserModel));


private const string invalidSch1 = "This is invalid avro schema";


private readonly static byte[] validUserData = AvroConvert.Serialize(new ValidUserModel
{
Name = "John Doe",
Age = 30
});


private readonly static byte[] invalidMsg = AvroConvert.Serialize(new InvalidUserModel
{
Email = "[email protected]"
});

}


2 changes: 0 additions & 2 deletions src/Memphis.Client/Constants/MemphisConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public static class MemphisSchemaTypes
public const string JSON = "json";
public const string GRAPH_QL = "graphql";
public const string PROTO_BUF = "protobuf";

// currently unsupported
internal const string AVRO = "avro";
}

Expand Down
40 changes: 40 additions & 0 deletions src/Memphis.Client/Helper/MessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.IO;
using System.Text;
using Memphis.Client.Constants;
using Memphis.Client.Exception;
using Newtonsoft.Json;
using SolTechnology.Avro;

namespace Memphis.Client;

public class MessageSerializer
{
/// <summary>
/// Serialize the object to the specified schema type. The supported schema types are: json, graphql, protobuf, avro
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="obj"></param>
/// <param name="schemaType"></param>
/// <returns></returns>
/// <exception cref="MemphisException"></exception>
public static byte[] Serialize<T>(T obj, string schemaType) where T : class
{

return schemaType switch
{
MemphisSchemaTypes.JSON or
MemphisSchemaTypes.GRAPH_QL => Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(obj)),
MemphisSchemaTypes.PROTO_BUF => SerializeProtoBuf(obj),
MemphisSchemaTypes.AVRO => AvroConvert.Serialize(obj),
_ => throw new MemphisException("Unsupported schema type, the supported schema types are: json, graphql, protobuf, avro"),
};

static byte[] SerializeProtoBuf<TData>(TData obj) where TData : class
{
using var stream = new MemoryStream();
ProtoBuf.Serializer.Serialize(stream, obj);
return stream.ToArray();
}
}
}
5 changes: 4 additions & 1 deletion src/Memphis.Client/IMemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Memphis.Client.Station;

namespace Memphis.Client;
#pragma warning disable CS8625

public interface IMemphisClient : IDisposable
{
Expand Down Expand Up @@ -126,4 +127,6 @@ Task ProduceAsync<T>(
Task<MemphisConsumer> CreateConsumer(FetchMessageOptions fetchMessageOptions);


}
}

#pragma warning restore CS8625
1 change: 1 addition & 0 deletions src/Memphis.Client/Memphis.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AvroConvert" Version="3.3.6" />
<PackageReference Include="GraphQL" Version="7.2.1" />
<PackageReference Include="GraphQL.SystemTextJson" Version="7.2.1" />
<PackageReference Include="NATS.Client" Version="1.0.2" />
Expand Down
24 changes: 20 additions & 4 deletions src/Memphis.Client/MemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,8 @@ static void EnsureSchemaTypeIsValid(string type)
case MemphisSchemaTypes.JSON:
case MemphisSchemaTypes.PROTO_BUF:
case MemphisSchemaTypes.GRAPH_QL:
return;
case MemphisSchemaTypes.AVRO:
throw new MemphisException("Avro schema type is not supported at this time");
return;
default:
throw new MemphisException("Unsupported schema type");
}
Expand Down Expand Up @@ -622,6 +621,18 @@ static void HandleSchemaCreationErrorResponse(byte[] responseBytes)
}
}

internal string GetStationSchemaType(string internalStationName)
{
if (_schemaUpdateDictionary.TryGetValue(internalStationName,
out ProducerSchemaUpdateInit schemaUpdateInit))
{
return schemaUpdateInit.SchemaType;
}

return string.Empty;
}


internal async Task ValidateMessageAsync(byte[] message, string internalStationName, string producerName)
{
if (!_schemaUpdateDictionary.TryGetValue(internalStationName,
Expand Down Expand Up @@ -926,6 +937,11 @@ private void RegisterSchemaValidators()
{
throw new InvalidOperationException($"Unable to register schema validator: {nameof(ProtoBufValidator)}");
}

if (!_schemaValidators.TryAdd(ValidatorType.AVRO, new AvroValidator()))
{
throw new InvalidOperationException($"Unable to register schema validator: {nameof(AvroValidator)}");
}
}

private async Task ListenForSchemaUpdate(string internalStationName, ProducerSchemaUpdateInit schemaUpdateInit)
Expand Down Expand Up @@ -1023,8 +1039,8 @@ void SyncSdkClientUpdate()
try
{
_sdkClientUpdateSemaphore.WaitAsync();
bool sdkClientShouldUpdate = sdkClientUpdate.Update ?? false;
switch (sdkClientUpdate.Type)
bool sdkClientShouldUpdate = sdkClientUpdate!.Update ?? false;
switch (sdkClientUpdate!.Type)
{
case MemphisSdkClientUpdateTypes.SEND_NOTIFICATION:
_clusterConfigurations.AddOrUpdate(sdkClientUpdate.Type, sdkClientShouldUpdate, (key, _) => sdkClientShouldUpdate);
Expand Down
20 changes: 16 additions & 4 deletions src/Memphis.Client/Producer/MemphisProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,27 @@ async Task ReInitializeProducerAndRetry(byte[] message, NameValueCollection head
public async Task ProduceAsync<T>(T message, NameValueCollection headers, int ackWaitMs = 15_000,
string? messageId = default)
{
string encodedMessage = IsPrimitiveType(message) ?
message.ToString() :
JsonConvert.SerializeObject(message);

await ProduceAsync(
Encoding.UTF8.GetBytes(encodedMessage),
SerializeMessage(message),
headers,
ackWaitMs,
messageId);

byte[] SerializeMessage(T message)
{
if(IsPrimitiveType(message))
return Encoding.UTF8.GetBytes(message.ToString());
var schemaType = _memphisClient.GetStationSchemaType(_internalStationName);
return schemaType switch
{
MemphisSchemaTypes.JSON or
MemphisSchemaTypes.GRAPH_QL or
MemphisSchemaTypes.PROTO_BUF or
MemphisSchemaTypes.AVRO => MessageSerializer.Serialize<object>(message!, schemaType),
_ => Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)),
};
}
}

/// <summary>
Expand Down
48 changes: 48 additions & 0 deletions src/Memphis.Client/Validators/AvroValidator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System.Threading.Tasks;
using SolTechnology.Avro;

using Memphis.Client.Exception;
using System.Runtime.Serialization;
using Newtonsoft.Json;

namespace Memphis.Client.Validators;

internal class AnotherUserModel
{
[DataMember(Name = "name")]
public string Name { get; set; } = null!;

[DataMember(Name = "age")]
public long Age { get; set; }
}

internal class AvroValidator : SchemaValidator<string>, ISchemaValidator
{
public Task ValidateAsync(byte[] messageToValidate, string schemaKey)
{
if (!_schemaCache.TryGetValue(schemaKey, out var schemaObj))
throw new MemphisSchemaValidationException($"Schema: {schemaKey} not found in local cache");
try
{
_ = AvroConvert.Avro2Json(messageToValidate, schemaObj);
return Task.CompletedTask;
}
catch (System.Exception exception)
{
throw new MemphisSchemaValidationException($"Schema validation has failed: \n {exception.Message}", exception);
}
}

protected override string Parse(string schemaData, string schemaName)
{
try
{
_ = AvroConvert.GenerateModel(schemaData);
return schemaData;
}
catch (System.Exception exception)
{
throw new MemphisException($"Schema parsing has failed: \n {exception.Message}", exception);
}
}
}
3 changes: 2 additions & 1 deletion src/Memphis.Client/Validators/ValidatorType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ internal enum ValidatorType
{
GRAPHQL = 1,
JSON = 2,
PROTOBUF = 3
PROTOBUF = 3,
AVRO = 4
}
}

0 comments on commit 6b6c75e

Please sign in to comment.