Skip to content

Commit

Permalink
Bug fixes & Refactoring (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbazen authored Jan 28, 2024
1 parent 18105cc commit cd5abfc
Show file tree
Hide file tree
Showing 23 changed files with 685 additions and 561 deletions.
9 changes: 6 additions & 3 deletions src/Memphis.Client.IntegrationTests/FullFlowTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Specialized;
using System.Diagnostics;
using System.Text;
using Memphis.Client.Consumer;
using Memphis.Client.Producer;
Expand Down Expand Up @@ -201,9 +202,11 @@ public static async Task<int> CountMessagesConsumedByGroup(MemphisConsumer consu
args.MessageList.ForEach(msg => msg.Ack());
};

_ = consumer1.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey });
_ = consumer2.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey });
await Task.Delay(TimeSpan.FromSeconds(10));
await Task.WhenAny(
consumer1.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey }),
consumer2.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey }),
Task.Delay(TimeSpan.FromSeconds(30)));

return count;
}
}
42 changes: 30 additions & 12 deletions src/Memphis.Client.UnitTests/AvroValidatorTest.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
using System.Collections.Concurrent;
using System.Text;
using GraphQL.Types;
using Memphis.Client.Constants;
using Memphis.Client.Exception;
using Memphis.Client.UnitTests.Validators.TestData;
using Memphis.Client.Validators;
using Xunit;

namespace Memphis.Client.UnitTests.Validators;

public class AvroValidatorTest
{


private readonly ISchemaValidator _validator;

public AvroValidatorTest()
Expand All @@ -24,7 +18,13 @@ public AvroValidatorTest()
[MemberData(nameof(AvroValidatorTestData.ValidSchema), MemberType = typeof(AvroValidatorTestData))]
public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string validSchema)
{
var actual = _validator.ParseAndStore("vaid-schema-001", validSchema);
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
"valid-schema-001",
validSchema,
MemphisSchemaTypes.AVRO
);

var actual = _validator.AddOrUpdateSchema(schemaUpdate);

Assert.True(actual);
}
Expand All @@ -34,7 +34,13 @@ public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string val
[MemberData(nameof(AvroValidatorTestData.InvalidSchema), MemberType = typeof(AvroValidatorTestData))]
public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string invalidSchema)
{
var actual = _validator.ParseAndStore("invalid-schema-001", invalidSchema);
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
"invalid-schema-001",
invalidSchema,
MemphisSchemaTypes.AVRO
);

var actual = _validator.AddOrUpdateSchema(schemaUpdate);

Assert.False(actual);
}
Expand All @@ -48,18 +54,30 @@ public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string
[MemberData(nameof(AvroValidatorTestData.ValidSchemaDetail), MemberType = typeof(AvroValidatorTestData))]
public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string schemaKey, string schema, byte[] msg)
{
_validator.ParseAndStore(schemaKey, schema);
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
schemaKey,
schema,
MemphisSchemaTypes.AVRO
);

_validator.AddOrUpdateSchema(schemaUpdate);

var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(msg, schemaKey));

Assert.Null(exception);
}

[Theory]
[MemberData(nameof(AvroValidatorTestData.InvalidSchemaDetail), MemberType = typeof(AvroValidatorTestData))]
public async Task ShouldDoThrow_WhenValidateAsync_WhereInvalidDataPassed(string schemaKey, string schema, byte[] msg)
{
_validator.ParseAndStore(schemaKey, schema);
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
schemaKey,
schema,
MemphisSchemaTypes.AVRO
);

_validator.AddOrUpdateSchema(schemaUpdate);

await Assert.ThrowsAsync<MemphisSchemaValidationException>(
() => _validator.ValidateAsync(msg, schemaKey));
Expand Down
3 changes: 2 additions & 1 deletion src/Memphis.Client.UnitTests/MemphisClientTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;
using Memphis.Client.Constants;
using Memphis.Client.Exception;
using Memphis.Client.Models.Response;
using Memphis.Client.Validators;
Expand Down Expand Up @@ -135,7 +136,7 @@ type Query {
schemaUpdateDictionaryMock.TryAdd(internalStationNameForGraphql, new SchemaUpdateInit()
{
SchemaName = "test-schema-01",
SchemaType = SchemaUpdateInit.SchemaTypes.GRAPHQL,
SchemaType = MemphisSchemaTypes.GRAPH_QL,
ActiveVersion = new ProducerSchemaUpdateVersion()
{
Content = graphqlSchemaStr,
Expand Down
19 changes: 14 additions & 5 deletions src/Memphis.Client.UnitTests/Validators/GraphqlValidatorTest.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
using System.Collections.Concurrent;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using GraphQL.Types;
using Memphis.Client.Constants;
using Memphis.Client.Exception;
using Memphis.Client.Validators;
using Xunit;
using Xunit.Abstractions;

namespace Memphis.Client.UnitTests.Validators
Expand Down Expand Up @@ -34,9 +32,14 @@ type Query {
sayHello : String
}
";
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
"test-schema-001",
schemaStr,
MemphisSchemaTypes.GRAPH_QL
);
// when:

var actual = _sut.ParseAndStore("test-schema-001", schemaStr);
var actual = _sut.AddOrUpdateSchema(schemaUpdate);

// then:

Expand All @@ -52,9 +55,15 @@ public void ShouldReturnFalse_WhenParseAndStore_WhereInValidSchemaPassed()
sayHelwlo : String
}
";

var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
"test-schema-001",
schemaStr,
MemphisSchemaTypes.GRAPH_QL
);
// when:

var actual = _sut.ParseAndStore("test-schema-001", schemaStr);
var actual = _sut.AddOrUpdateSchema(schemaUpdate);

// then:

Expand Down
48 changes: 34 additions & 14 deletions src/Memphis.Client.UnitTests/Validators/JsonValidatorTest.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
using System.Collections.Concurrent;
using System.Text;
using GraphQL.Types;
using Memphis.Client.Constants;
using Memphis.Client.Exception;
using Memphis.Client.UnitTests.Validators.TestData;
using Memphis.Client.Validators;
using NJsonSchema;
using Xunit;

namespace Memphis.Client.UnitTests.Validators
{
Expand All @@ -20,10 +16,16 @@ public JsonValidatorTest()

#region JsonValidatorTest.ParseAndStore
[Theory]
[MemberData(nameof(JsonValidatorTestData.ValidSchema),MemberType = typeof(JsonValidatorTestData))]
[MemberData(nameof(JsonValidatorTestData.ValidSchema), MemberType = typeof(JsonValidatorTestData))]
public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string validSchema)
{
var actual = _sut.ParseAndStore("test-schema-001", validSchema);
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
"test-schema-001",
validSchema,
MemphisSchemaTypes.JSON
);

var actual = _sut.AddOrUpdateSchema(schemaUpdate);

Assert.True(actual);
}
Expand All @@ -33,7 +35,13 @@ public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string val
[MemberData(nameof(JsonValidatorTestData.InvalidSchema), MemberType = typeof(JsonValidatorTestData))]
public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string invalidSchema)
{
var actual = _sut.ParseAndStore("test-schema-001", invalidSchema);
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
"test-schema-001",
invalidSchema,
MemphisSchemaTypes.JSON
);

var actual = _sut.AddOrUpdateSchema(schemaUpdate);

Assert.False(actual);
}
Expand All @@ -45,21 +53,33 @@ public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string

[Theory]
[MemberData(nameof(JsonValidatorTestData.ValidSchemaDetail), MemberType = typeof(JsonValidatorTestData))]
public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string schemaKey,string schema, byte[] msg)
public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string schemaKey, string schema, byte[] msg)
{
_sut.ParseAndStore(schemaKey, schema);
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
schemaKey,
schema,
MemphisSchemaTypes.JSON
);

_sut.AddOrUpdateSchema(schemaUpdate);

await _sut.ValidateAsync(msg, schemaKey);
}

[Theory]
[MemberData(nameof(JsonValidatorTestData.InvalidSchemaDetail), MemberType = typeof(JsonValidatorTestData))]
public async Task ShouldDoThrow_WhenValidateAsync_WhereInvalidDataPassed(string schemaKey,string schema, byte[] msg)
public async Task ShouldDoThrow_WhenValidateAsync_WhereInvalidDataPassed(string schemaKey, string schema, byte[] msg)
{
_sut.ParseAndStore(schemaKey, schema);
var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit(
schemaKey,
schema,
MemphisSchemaTypes.JSON
);

await Assert.ThrowsAsync<MemphisSchemaValidationException>(
() => _sut.ValidateAsync(msg, schemaKey));
_sut.AddOrUpdateSchema(schemaUpdate);

await Assert.ThrowsAsync<MemphisSchemaValidationException>(
() => _sut.ValidateAsync(msg, schemaKey));
}

[Theory]
Expand Down
80 changes: 45 additions & 35 deletions src/Memphis.Client.UnitTests/Validators/ProtoBufValidatorTest.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,34 @@
using System.Text;
using Memphis.Client.Constants;
using Memphis.Client.Exception;
using Memphis.Client.Models.Response;
using Memphis.Client.Validators;
using ProtoBuf;

namespace Memphis.Client.UnitTests;




[ProtoContract]
public class Test
file class ValidModel
{

[ProtoMember(1, Name = @"field1")]
[global::System.ComponentModel.DefaultValue("")]
public string Field1 { get; set; } = "";

[ProtoMember(2, Name = @"field2")]
[System.ComponentModel.DefaultValue("")]
public string Field2 { get; set; } = "";

[ProtoMember(3, Name = @"field3")]
public int Field3 { get; set; }

}


[ProtoContract]
public class InvalidTestModel
public class InvalidModel
{

[ProtoMember(1, Name = @"field1")]
[global::System.ComponentModel.DefaultValue("")]
public string Field1 { get; set; } = "";

[ProtoMember(3, Name = @"field3")]
public int Field3 { get; set; }

[ProtoMember(2, Name = @"field2")]
public string Field2 { get; set; } = "";
}

public class ProtoBufValidatorTests
Expand All @@ -46,52 +38,70 @@ public class ProtoBufValidatorTests
public ProtoBufValidatorTests()
{
_validator = new ProtoBufValidator();
_validator.ParseAndStore(
"samplepbf",
"""
{
"version_number": 1,
"descriptor": "CmsKEXNhbXBsZXBiZl8xLnByb3RvIk4KBFRlc3QSFgoGZmllbGQxGAEgASgJUgZmaWVsZDESFgoGZmllbGQyGAIgASgJUgZmaWVsZDISFgoGZmllbGQzGAMgASgFUgZmaWVsZDNiBnByb3RvMw==",
"schema_content": "syntax = \"proto3\";\nmessage Test {\n string field1 = 1;\n string field2 = 2;\n int32 field3 = 3;\n}",
"message_struct_name": "Test"
}
"""
);
_validator.AddOrUpdateSchema(TestSchema());
}

[Fact]
public async Task GivenValidPayload_WhenValidate_ThenNoError()
{
var validData = new Test
var validData = new ValidModel
{
Field1 = "AwesomeFirst",
Field2 = "SecondField",
Field3 = 333,
};
var message = ConvertToProtoBuf(validData);


var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(message, "samplepbf"));

Assert.Null(exception);
var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(message, "testschema"));

Assert.Null(exception);
}

[Fact]
public async Task GivenInvalidPayload_WhenValidate_ThenHasError()
{
var message = Convert.FromBase64String("CgJmb3JtYWwxCgRmZWlsZDIKCAk=");

await Assert.ThrowsAsync<MemphisSchemaValidationException>(
() => _validator.ValidateAsync(message, "samplepbf"));
}
var invalidData = new InvalidModel
{
Field1 = "AwesomeFirst",
Field2 = "SecondField"
};
var message = ConvertToProtoBuf(invalidData);

var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(message, "testschema"));

Assert.NotNull(exception);
Assert.IsType<MemphisSchemaValidationException>(exception);
}

private static byte[] ConvertToProtoBuf<TData>(TData obj) where TData : class
{
using var stream = new MemoryStream();
ProtoBuf.Serializer.Serialize(stream, obj);
Serializer.Serialize(stream, obj);
return stream.ToArray();
}

private static SchemaUpdateInit TestSchema()
{
var schemaUpdate = new SchemaUpdateInit
{
SchemaName = "testschema",
ActiveVersion = new ProducerSchemaUpdateVersion
{
VersionNumber = "1",
Descriptor = "CmQKEnRlc3RzY2hlbWFfMS5wcm90byJOCgRUZXN0EhYKBmZpZWxkMRgBIAIoCVIGZmllbGQxEhYKBmZpZWxkMhgCIAIoCVIGZmllbGQyEhYKBmZpZWxkMxgDIAIoBVIGZmllbGQz",
Content = """
syntax = "proto2";
message Test {
required string field1 = 1;
required string field2 = 2;
required int32 field3 = 3;
}
""",
MessageStructName = "Test"
},
SchemaType = MemphisSchemaTypes.PROTO_BUF
};
return schemaUpdate;
}
}
Loading

0 comments on commit cd5abfc

Please sign in to comment.