From cd5abfc67f3d6f8555190f60e45119135cd56936 Mon Sep 17 00:00:00 2001 From: Bazen Date: Sun, 28 Jan 2024 18:46:49 +0200 Subject: [PATCH] Bug fixes & Refactoring (#194) --- .../FullFlowTests.cs | 9 +- .../AvroValidatorTest.cs | 42 +- .../MemphisClientTest.cs | 3 +- .../Validators/GraphqlValidatorTest.cs | 19 +- .../Validators/JsonValidatorTest.cs | 48 +- .../Validators/ProtoBufValidatorTest.cs | 80 ++-- .../Validators/ValidatorTestHelper.cs | 27 ++ .../Constants/MemphisConstants.cs | 22 +- .../Consumer/IMemphisConsumer.cs | 9 +- .../Consumer/MemphisConsumer.cs | 7 +- src/Memphis.Client/MemphisClient.Consumer.cs | 123 ++++++ src/Memphis.Client/MemphisClient.Station.cs | 181 ++++++++ src/Memphis.Client/MemphisClient.cs | 418 +----------------- .../Models/Response/SchemaUpdateInit.cs | 7 - .../Validators/AvroValidator.cs | 21 +- .../Validators/GraphqlValidator.cs | 34 +- .../Validators/ISchemaValidator.cs | 2 +- .../Validators/JsonValidator.cs | 31 +- .../Validators/ProtoBufValidator.cs | 49 +- .../Validators/SchemaValidator.cs | 38 +- .../ProtoBufValidatorTests.cs | 60 ++- .../ProtoBufEvalMissingDependencyException.cs | 8 + src/ProtoBufEval/RuntimeEnvironment.cs | 8 +- 23 files changed, 685 insertions(+), 561 deletions(-) create mode 100644 src/Memphis.Client.UnitTests/Validators/ValidatorTestHelper.cs create mode 100644 src/Memphis.Client/MemphisClient.Consumer.cs create mode 100644 src/ProtoBufEval/ProtoBufEvalMissingDependencyException.cs diff --git a/src/Memphis.Client.IntegrationTests/FullFlowTests.cs b/src/Memphis.Client.IntegrationTests/FullFlowTests.cs index 53364cf..0d56d1b 100644 --- a/src/Memphis.Client.IntegrationTests/FullFlowTests.cs +++ b/src/Memphis.Client.IntegrationTests/FullFlowTests.cs @@ -1,4 +1,5 @@ using System.Collections.Specialized; +using System.Diagnostics; using System.Text; using Memphis.Client.Consumer; using Memphis.Client.Producer; @@ -201,9 +202,11 @@ public static async Task CountMessagesConsumedByGroup(MemphisConsumer consu args.MessageList.ForEach(msg => msg.Ack()); }; - _ = consumer1.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey }); - _ = consumer2.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey }); - await Task.Delay(TimeSpan.FromSeconds(10)); + await Task.WhenAny( + consumer1.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey }), + consumer2.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey }), + Task.Delay(TimeSpan.FromSeconds(30))); + return count; } } diff --git a/src/Memphis.Client.UnitTests/AvroValidatorTest.cs b/src/Memphis.Client.UnitTests/AvroValidatorTest.cs index 67d55c8..501bb82 100644 --- a/src/Memphis.Client.UnitTests/AvroValidatorTest.cs +++ b/src/Memphis.Client.UnitTests/AvroValidatorTest.cs @@ -1,17 +1,11 @@ -using System.Collections.Concurrent; -using System.Text; -using GraphQL.Types; +using Memphis.Client.Constants; using Memphis.Client.Exception; -using Memphis.Client.UnitTests.Validators.TestData; using Memphis.Client.Validators; -using Xunit; namespace Memphis.Client.UnitTests.Validators; public class AvroValidatorTest { - - private readonly ISchemaValidator _validator; public AvroValidatorTest() @@ -24,7 +18,13 @@ public AvroValidatorTest() [MemberData(nameof(AvroValidatorTestData.ValidSchema), MemberType = typeof(AvroValidatorTestData))] public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string validSchema) { - var actual = _validator.ParseAndStore("vaid-schema-001", validSchema); + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + "valid-schema-001", + validSchema, + MemphisSchemaTypes.AVRO + ); + + var actual = _validator.AddOrUpdateSchema(schemaUpdate); Assert.True(actual); } @@ -34,7 +34,13 @@ public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string val [MemberData(nameof(AvroValidatorTestData.InvalidSchema), MemberType = typeof(AvroValidatorTestData))] public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string invalidSchema) { - var actual = _validator.ParseAndStore("invalid-schema-001", invalidSchema); + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + "invalid-schema-001", + invalidSchema, + MemphisSchemaTypes.AVRO + ); + + var actual = _validator.AddOrUpdateSchema(schemaUpdate); Assert.False(actual); } @@ -48,10 +54,16 @@ public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string [MemberData(nameof(AvroValidatorTestData.ValidSchemaDetail), MemberType = typeof(AvroValidatorTestData))] public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string schemaKey, string schema, byte[] msg) { - _validator.ParseAndStore(schemaKey, schema); + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + schemaKey, + schema, + MemphisSchemaTypes.AVRO + ); + + _validator.AddOrUpdateSchema(schemaUpdate); var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(msg, schemaKey)); - + Assert.Null(exception); } @@ -59,7 +71,13 @@ public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string [MemberData(nameof(AvroValidatorTestData.InvalidSchemaDetail), MemberType = typeof(AvroValidatorTestData))] public async Task ShouldDoThrow_WhenValidateAsync_WhereInvalidDataPassed(string schemaKey, string schema, byte[] msg) { - _validator.ParseAndStore(schemaKey, schema); + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + schemaKey, + schema, + MemphisSchemaTypes.AVRO + ); + + _validator.AddOrUpdateSchema(schemaUpdate); await Assert.ThrowsAsync( () => _validator.ValidateAsync(msg, schemaKey)); diff --git a/src/Memphis.Client.UnitTests/MemphisClientTest.cs b/src/Memphis.Client.UnitTests/MemphisClientTest.cs index 9882585..9b4c452 100644 --- a/src/Memphis.Client.UnitTests/MemphisClientTest.cs +++ b/src/Memphis.Client.UnitTests/MemphisClientTest.cs @@ -1,6 +1,7 @@ using System.Collections.Concurrent; using System.Text; using System.Threading.Tasks; +using Memphis.Client.Constants; using Memphis.Client.Exception; using Memphis.Client.Models.Response; using Memphis.Client.Validators; @@ -135,7 +136,7 @@ type Query { schemaUpdateDictionaryMock.TryAdd(internalStationNameForGraphql, new SchemaUpdateInit() { SchemaName = "test-schema-01", - SchemaType = SchemaUpdateInit.SchemaTypes.GRAPHQL, + SchemaType = MemphisSchemaTypes.GRAPH_QL, ActiveVersion = new ProducerSchemaUpdateVersion() { Content = graphqlSchemaStr, diff --git a/src/Memphis.Client.UnitTests/Validators/GraphqlValidatorTest.cs b/src/Memphis.Client.UnitTests/Validators/GraphqlValidatorTest.cs index 7ebdef3..7392b17 100644 --- a/src/Memphis.Client.UnitTests/Validators/GraphqlValidatorTest.cs +++ b/src/Memphis.Client.UnitTests/Validators/GraphqlValidatorTest.cs @@ -1,11 +1,9 @@ using System.Collections.Concurrent; -using System.Reflection; using System.Text; -using System.Threading.Tasks; using GraphQL.Types; +using Memphis.Client.Constants; using Memphis.Client.Exception; using Memphis.Client.Validators; -using Xunit; using Xunit.Abstractions; namespace Memphis.Client.UnitTests.Validators @@ -34,9 +32,14 @@ type Query { sayHello : String } "; + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + "test-schema-001", + schemaStr, + MemphisSchemaTypes.GRAPH_QL + ); // when: - var actual = _sut.ParseAndStore("test-schema-001", schemaStr); + var actual = _sut.AddOrUpdateSchema(schemaUpdate); // then: @@ -52,9 +55,15 @@ public void ShouldReturnFalse_WhenParseAndStore_WhereInValidSchemaPassed() sayHelwlo : String } "; + + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + "test-schema-001", + schemaStr, + MemphisSchemaTypes.GRAPH_QL + ); // when: - var actual = _sut.ParseAndStore("test-schema-001", schemaStr); + var actual = _sut.AddOrUpdateSchema(schemaUpdate); // then: diff --git a/src/Memphis.Client.UnitTests/Validators/JsonValidatorTest.cs b/src/Memphis.Client.UnitTests/Validators/JsonValidatorTest.cs index ef3a3b6..3a71370 100644 --- a/src/Memphis.Client.UnitTests/Validators/JsonValidatorTest.cs +++ b/src/Memphis.Client.UnitTests/Validators/JsonValidatorTest.cs @@ -1,11 +1,7 @@ -using System.Collections.Concurrent; -using System.Text; -using GraphQL.Types; +using Memphis.Client.Constants; using Memphis.Client.Exception; using Memphis.Client.UnitTests.Validators.TestData; using Memphis.Client.Validators; -using NJsonSchema; -using Xunit; namespace Memphis.Client.UnitTests.Validators { @@ -20,10 +16,16 @@ public JsonValidatorTest() #region JsonValidatorTest.ParseAndStore [Theory] - [MemberData(nameof(JsonValidatorTestData.ValidSchema),MemberType = typeof(JsonValidatorTestData))] + [MemberData(nameof(JsonValidatorTestData.ValidSchema), MemberType = typeof(JsonValidatorTestData))] public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string validSchema) { - var actual = _sut.ParseAndStore("test-schema-001", validSchema); + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + "test-schema-001", + validSchema, + MemphisSchemaTypes.JSON + ); + + var actual = _sut.AddOrUpdateSchema(schemaUpdate); Assert.True(actual); } @@ -33,7 +35,13 @@ public void ShouldReturnTrue_WhenParseAndStore_WhereValidSchemaPassed(string val [MemberData(nameof(JsonValidatorTestData.InvalidSchema), MemberType = typeof(JsonValidatorTestData))] public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string invalidSchema) { - var actual = _sut.ParseAndStore("test-schema-001", invalidSchema); + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + "test-schema-001", + invalidSchema, + MemphisSchemaTypes.JSON + ); + + var actual = _sut.AddOrUpdateSchema(schemaUpdate); Assert.False(actual); } @@ -45,21 +53,33 @@ public void ShouldReturnFalse_WhenParseAndStore_WhereInvalidSchemaPassed(string [Theory] [MemberData(nameof(JsonValidatorTestData.ValidSchemaDetail), MemberType = typeof(JsonValidatorTestData))] - public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string schemaKey,string schema, byte[] msg) + public async Task ShouldDoSuccess_WhenValidateAsync_WhereValidDataPassed(string schemaKey, string schema, byte[] msg) { - _sut.ParseAndStore(schemaKey, schema); + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + schemaKey, + schema, + MemphisSchemaTypes.JSON + ); + + _sut.AddOrUpdateSchema(schemaUpdate); await _sut.ValidateAsync(msg, schemaKey); } [Theory] [MemberData(nameof(JsonValidatorTestData.InvalidSchemaDetail), MemberType = typeof(JsonValidatorTestData))] - public async Task ShouldDoThrow_WhenValidateAsync_WhereInvalidDataPassed(string schemaKey,string schema, byte[] msg) + public async Task ShouldDoThrow_WhenValidateAsync_WhereInvalidDataPassed(string schemaKey, string schema, byte[] msg) { - _sut.ParseAndStore(schemaKey, schema); + var schemaUpdate = ValidatorTestHelper.GetSchemaUpdateInit( + schemaKey, + schema, + MemphisSchemaTypes.JSON + ); - await Assert.ThrowsAsync( - () => _sut.ValidateAsync(msg, schemaKey)); + _sut.AddOrUpdateSchema(schemaUpdate); + + await Assert.ThrowsAsync( + () => _sut.ValidateAsync(msg, schemaKey)); } [Theory] diff --git a/src/Memphis.Client.UnitTests/Validators/ProtoBufValidatorTest.cs b/src/Memphis.Client.UnitTests/Validators/ProtoBufValidatorTest.cs index 6baff79..c3576d9 100644 --- a/src/Memphis.Client.UnitTests/Validators/ProtoBufValidatorTest.cs +++ b/src/Memphis.Client.UnitTests/Validators/ProtoBufValidatorTest.cs @@ -1,42 +1,34 @@ -using System.Text; +using Memphis.Client.Constants; using Memphis.Client.Exception; +using Memphis.Client.Models.Response; using Memphis.Client.Validators; using ProtoBuf; namespace Memphis.Client.UnitTests; - - - [ProtoContract] -public class Test +file class ValidModel { [ProtoMember(1, Name = @"field1")] - [global::System.ComponentModel.DefaultValue("")] public string Field1 { get; set; } = ""; [ProtoMember(2, Name = @"field2")] - [System.ComponentModel.DefaultValue("")] public string Field2 { get; set; } = ""; [ProtoMember(3, Name = @"field3")] public int Field3 { get; set; } - } [ProtoContract] -public class InvalidTestModel +public class InvalidModel { - [ProtoMember(1, Name = @"field1")] - [global::System.ComponentModel.DefaultValue("")] public string Field1 { get; set; } = ""; - [ProtoMember(3, Name = @"field3")] - public int Field3 { get; set; } - + [ProtoMember(2, Name = @"field2")] + public string Field2 { get; set; } = ""; } public class ProtoBufValidatorTests @@ -46,23 +38,13 @@ public class ProtoBufValidatorTests public ProtoBufValidatorTests() { _validator = new ProtoBufValidator(); - _validator.ParseAndStore( - "samplepbf", - """ - { - "version_number": 1, - "descriptor": "CmsKEXNhbXBsZXBiZl8xLnByb3RvIk4KBFRlc3QSFgoGZmllbGQxGAEgASgJUgZmaWVsZDESFgoGZmllbGQyGAIgASgJUgZmaWVsZDISFgoGZmllbGQzGAMgASgFUgZmaWVsZDNiBnByb3RvMw==", - "schema_content": "syntax = \"proto3\";\nmessage Test {\n string field1 = 1;\n string field2 = 2;\n int32 field3 = 3;\n}", - "message_struct_name": "Test" - } - """ - ); + _validator.AddOrUpdateSchema(TestSchema()); } [Fact] public async Task GivenValidPayload_WhenValidate_ThenNoError() { - var validData = new Test + var validData = new ValidModel { Field1 = "AwesomeFirst", Field2 = "SecondField", @@ -70,28 +52,56 @@ public async Task GivenValidPayload_WhenValidate_ThenNoError() }; var message = ConvertToProtoBuf(validData); - - var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(message, "samplepbf")); - Assert.Null(exception); + var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(message, "testschema")); + Assert.Null(exception); } [Fact] public async Task GivenInvalidPayload_WhenValidate_ThenHasError() { - var message = Convert.FromBase64String("CgJmb3JtYWwxCgRmZWlsZDIKCAk="); - - await Assert.ThrowsAsync( - () => _validator.ValidateAsync(message, "samplepbf")); - } + var invalidData = new InvalidModel + { + Field1 = "AwesomeFirst", + Field2 = "SecondField" + }; + var message = ConvertToProtoBuf(invalidData); + var exception = await Record.ExceptionAsync(async () => await _validator.ValidateAsync(message, "testschema")); + Assert.NotNull(exception); + Assert.IsType(exception); + } private static byte[] ConvertToProtoBuf(TData obj) where TData : class { using var stream = new MemoryStream(); - ProtoBuf.Serializer.Serialize(stream, obj); + Serializer.Serialize(stream, obj); return stream.ToArray(); } + + private static SchemaUpdateInit TestSchema() + { + var schemaUpdate = new SchemaUpdateInit + { + SchemaName = "testschema", + ActiveVersion = new ProducerSchemaUpdateVersion + { + VersionNumber = "1", + Descriptor = "CmQKEnRlc3RzY2hlbWFfMS5wcm90byJOCgRUZXN0EhYKBmZpZWxkMRgBIAIoCVIGZmllbGQxEhYKBmZpZWxkMhgCIAIoCVIGZmllbGQyEhYKBmZpZWxkMxgDIAIoBVIGZmllbGQz", + Content = """ + syntax = "proto2"; + message Test { + required string field1 = 1; + required string field2 = 2; + required int32 field3 = 3; + } + """, + MessageStructName = "Test" + }, + SchemaType = MemphisSchemaTypes.PROTO_BUF + }; + return schemaUpdate; + } } \ No newline at end of file diff --git a/src/Memphis.Client.UnitTests/Validators/ValidatorTestHelper.cs b/src/Memphis.Client.UnitTests/Validators/ValidatorTestHelper.cs new file mode 100644 index 0000000..6d62dae --- /dev/null +++ b/src/Memphis.Client.UnitTests/Validators/ValidatorTestHelper.cs @@ -0,0 +1,27 @@ +using Memphis.Client.Constants; +using Memphis.Client.Models.Response; + +namespace Memphis.Client.UnitTests.Validators; + +internal class ValidatorTestHelper +{ + public static SchemaUpdateInit GetSchemaUpdateInit( + string schemaName, + string schema, + string schemaType + ) + { + return new SchemaUpdateInit + { + SchemaName = schemaName, + ActiveVersion = new ProducerSchemaUpdateVersion + { + VersionNumber = "1", + Descriptor = string.Empty, + Content = schema, + MessageStructName = string.Empty + }, + SchemaType = schemaType + }; + } +} \ No newline at end of file diff --git a/src/Memphis.Client/Constants/MemphisConstants.cs b/src/Memphis.Client/Constants/MemphisConstants.cs index f16076a..67fb320 100644 --- a/src/Memphis.Client/Constants/MemphisConstants.cs +++ b/src/Memphis.Client/Constants/MemphisConstants.cs @@ -1,3 +1,5 @@ +using Memphis.Client.Validators; + namespace Memphis.Client.Constants; internal class MemphisStations { @@ -31,22 +33,29 @@ internal class MemphisSubjects public const string SDK_CLIENTS_UPDATE = "$memphis_sdk_clients_updates"; public const string MEMPHIS_SCHEMA_VERSE_DLS = "$memphis_schemaverse_dls"; public const string SCHEMA_CREATION = "$memphis_schema_creations"; - - // not available yes - public const string SCHEMA_DESTRUCTION = ""; - public const string FUNCTIONS_UPDATE = "$memphis_functions_updates_"; - public const string NACKED_DLS = "$memphis_nacked_dls"; } -public static class MemphisSchemaTypes +internal static class MemphisSchemaTypes { public const string NONE = ""; public const string JSON = "json"; public const string GRAPH_QL = "graphql"; public const string PROTO_BUF = "protobuf"; internal const string AVRO = "avro"; + + internal static ValidatorType ToValidator(this string schemaType) + { + return schemaType switch + { + JSON => ValidatorType.JSON, + GRAPH_QL => ValidatorType.GRAPHQL, + PROTO_BUF => ValidatorType.PROTOBUF, + AVRO => ValidatorType.AVRO, + _ => throw new MemphisException($"Schema type: {schemaType} is not supported") + }; + } } internal static class MemphisSdkClientUpdateTypes @@ -68,7 +77,6 @@ internal static class MemphisRequestVersions { public const int LastProducerCreationRequestVersion = 4; public const int LastProducerDestroyRequestVersion = 1; - public const int LastConsumerCreationRequestVersion = 4; public const int LastConsumerDestroyRequestVersion = 1; } \ No newline at end of file diff --git a/src/Memphis.Client/Consumer/IMemphisConsumer.cs b/src/Memphis.Client/Consumer/IMemphisConsumer.cs index d5aa174..f4d9c02 100644 --- a/src/Memphis.Client/Consumer/IMemphisConsumer.cs +++ b/src/Memphis.Client/Consumer/IMemphisConsumer.cs @@ -1,6 +1,4 @@ -using Memphis.Client.Core; - -namespace Memphis.Client.Consumer; +namespace Memphis.Client.Consumer; public interface IMemphisConsumer : IDisposable { @@ -42,7 +40,4 @@ public interface IMemphisConsumer : IDisposable /// /// Task DestroyAsync(int timeoutRetry = 5); -} - - - +} \ No newline at end of file diff --git a/src/Memphis.Client/Consumer/MemphisConsumer.cs b/src/Memphis.Client/Consumer/MemphisConsumer.cs index 4c70609..a2d4d3e 100644 --- a/src/Memphis.Client/Consumer/MemphisConsumer.cs +++ b/src/Memphis.Client/Consumer/MemphisConsumer.cs @@ -6,10 +6,8 @@ public sealed class MemphisConsumer : IMemphisConsumer { public event EventHandler MessageReceived; public event EventHandler DlsMessageReceived; - internal string InternalStationName { get; private set; } internal string Key => $"{InternalStationName}_{_consumerOptions.RealName}"; - private ISyncSubscription _dlsSubscription; private readonly MemphisClient _memphisClient; private readonly MemphisConsumerOptions _consumerOptions; @@ -18,13 +16,11 @@ public sealed class MemphisConsumer : IMemphisConsumer private readonly CancellationTokenSource _cancellationTokenSource; private bool _subscriptionActive; private readonly int _pingConsumerIntervalMs; - /// /// Messages in DLS station will have a partition number of -1. This does not indicate the actual partition number of the message. /// Instead, it indicates that the message is in the DLS station. /// private const int DlsMessagePartitionNumber = -1; - private int[] _partitions; internal StationPartitionResolver PartitionResolver { get; set; } internal int[] Partitions @@ -412,6 +408,9 @@ CancellationToken cancellationToken _consumerOptions, partitionNumber )); + + if (memphisMessages is null || !memphisMessages.Any()) + return; MessageReceived?.Invoke(this, new MemphisMessageHandlerEventArgs(memphisMessages, consumerContext, null)); } diff --git a/src/Memphis.Client/MemphisClient.Consumer.cs b/src/Memphis.Client/MemphisClient.Consumer.cs new file mode 100644 index 0000000..811e53c --- /dev/null +++ b/src/Memphis.Client/MemphisClient.Consumer.cs @@ -0,0 +1,123 @@ +using Memphis.Client.Consumer; + +namespace Memphis.Client; + +public partial class MemphisClient +{ + /// + /// Create Consumer for station + /// + /// options used to customize the behaviour of consumer + /// An object connected to the station from consuming data + public async Task CreateConsumer(MemphisConsumerOptions consumerOptions, int timeoutRetry = 5, CancellationToken cancellationToken = default) + { + EnsureBatchSizeIsValid(consumerOptions.BatchSize); + + if (_brokerConnection.IsClosed()) + { + throw MemphisExceptions.DeadConnectionException; + } + + consumerOptions.RealName = consumerOptions.ConsumerName.ToLower(); + + if (consumerOptions.GenerateUniqueSuffix) + { + consumerOptions.ConsumerName = $"{consumerOptions.ConsumerName}_{MemphisUtil.GetUniqueKey(8)}"; + } + + if (string.IsNullOrEmpty(consumerOptions.ConsumerGroup)) + { + consumerOptions.ConsumerGroup = consumerOptions.ConsumerName; + } + + try + { + var createConsumerModel = new CreateConsumerRequest + { + ConsumerName = consumerOptions.ConsumerName, + StationName = consumerOptions.StationName, + ConnectionId = _connectionId, + ConsumerType = "application", + ConsumerGroup = consumerOptions.ConsumerGroup, + MaxAckTimeMs = consumerOptions.MaxAckTimeMs, + MaxMsgCountForDelivery = consumerOptions.MaxMsgDeliveries, + UserName = _userName, + StartConsumeFromSequence = consumerOptions.StartConsumeFromSequence, + LastMessages = consumerOptions.LastMessages, + RequestVersion = MemphisRequestVersions.LastConsumerCreationRequestVersion, + ApplicationId = ApplicationId, + SdkLang = ".NET" + }; + + var createConsumerModelJson = JsonSerDes.PrepareJsonString(createConsumerModel); + + byte[] createConsumerReqBytes = Encoding.UTF8.GetBytes(createConsumerModelJson); + + Msg createConsumerResp = await RequestAsync(MemphisStations.MEMPHIS_CONSUMER_CREATIONS, createConsumerReqBytes, timeoutRetry, cancellationToken); + var responseStr = Encoding.UTF8.GetString(createConsumerResp.Data); + var createConsumerResponse = JsonConvert.DeserializeObject(responseStr); + + if (createConsumerResponse is null) + { + if (!string.IsNullOrEmpty(responseStr)) + { + throw new MemphisException(responseStr); + } + + var oldconsumer = new MemphisConsumer(this, consumerOptions); + _consumerCache.AddOrUpdate(oldconsumer.Key, oldconsumer, (_, _) => oldconsumer); + + return oldconsumer; + } + if (!string.IsNullOrEmpty(createConsumerResponse.Error)) + { + throw new MemphisException(responseStr); + } + + var consumer = new MemphisConsumer(this, consumerOptions, createConsumerResponse.PartitionsUpdate.PartitionsList); + _consumerCache.AddOrUpdate(consumer.Key, consumer, (_, _) => consumer); + + await ListenForSchemaUpdate(consumerOptions.StationName); + + return consumer; + + } + catch (MemphisException) + { + throw; + } + catch (System.Exception e) + { + throw MemphisExceptions.FailedToCreateConsumerException(e); + } + } + + /// + /// Create a new consumer + /// + /// Fetch message options + /// MemphisConsumer + public async Task CreateConsumer(FetchMessageOptions fetchMessageOptions, int timeoutRetry = 5, CancellationToken cancellationToken = default) + { + return await CreateConsumer(new MemphisConsumerOptions + { + StationName = fetchMessageOptions.StationName, + ConsumerName = fetchMessageOptions.ConsumerName, + ConsumerGroup = fetchMessageOptions.ConsumerGroup, + BatchSize = fetchMessageOptions.BatchSize, + BatchMaxTimeToWaitMs = fetchMessageOptions.BatchMaxTimeToWaitMs, + MaxAckTimeMs = fetchMessageOptions.MaxAckTimeMs, + MaxMsgDeliveries = fetchMessageOptions.MaxMsgDeliveries, + GenerateUniqueSuffix = fetchMessageOptions.GenerateUniqueSuffix, + StartConsumeFromSequence = fetchMessageOptions.StartConsumeFromSequence, + LastMessages = fetchMessageOptions.LastMessages, + }, timeoutRetry, cancellationToken); + } + + internal IConsumerContext GetConsumerContext(string streamName, string durableName) + { + var streamContext = _brokerConnection.GetStreamContext(streamName); + var consumerContext = streamContext.GetConsumerContext(durableName); + return consumerContext; + } +} \ No newline at end of file diff --git a/src/Memphis.Client/MemphisClient.Station.cs b/src/Memphis.Client/MemphisClient.Station.cs index 0057c79..3ebe2c7 100644 --- a/src/Memphis.Client/MemphisClient.Station.cs +++ b/src/Memphis.Client/MemphisClient.Station.cs @@ -10,6 +10,187 @@ public partial class MemphisClient internal ConcurrentDictionary FunctionDetails { get => _functionDetails; } + /// + /// Create Station + /// + /// options used to customize the parameters of station + /// cancellation token + /// An object representing the created station + public async Task CreateStation(StationOptions stationOptions, int timeoutRetry = 5, CancellationToken cancellationToken = default) + { + if (_brokerConnection.IsClosed()) + { + throw MemphisExceptions.DeadConnectionException; + } + + try + { + var createStationModel = new CreateStationRequest() + { + StationName = stationOptions.Name, + RetentionType = stationOptions.RetentionType, + RetentionValue = stationOptions.RetentionValue, + StorageType = stationOptions.StorageType, + Replicas = stationOptions.Replicas, + IdempotencyWindowsInMs = stationOptions.IdempotenceWindowMs, + SchemaName = stationOptions.SchemaName, + DlsConfiguration = new DlsConfiguration() + { + Poison = stationOptions.SendPoisonMessageToDls, + SchemaVerse = stationOptions.SendSchemaFailedMessageToDls, + }, + UserName = _userName, + TieredStorageEnabled = stationOptions.TieredStorageEnabled, + PartitionsNumber = stationOptions.PartitionsNumber, + DlsStation = stationOptions.DlsStation + }; + + var createStationModelJson = JsonSerDes.PrepareJsonString(createStationModel); + + byte[] createStationReqBytes = Encoding.UTF8.GetBytes(createStationModelJson); + + Msg createStationResp = await RequestAsync(MemphisStations.MEMPHIS_STATION_CREATIONS, createStationReqBytes, timeoutRetry, cancellationToken); + string errResp = Encoding.UTF8.GetString(createStationResp.Data); + + if (!string.IsNullOrEmpty(errResp)) + { + if (errResp.Contains("already exist")) + { + return new MemphisStation(this, stationOptions.Name); + } + + throw new MemphisException(errResp); + } + + return new MemphisStation(this, stationOptions.Name); + } + catch (MemphisException) + { + throw; + } + catch (System.Exception e) + { + throw MemphisExceptions.FailedToCreateStationException(e); + } + } + + /// + /// Attach schema to an existing station + /// + /// station name + /// schema name + /// + [Obsolete("This method is depricated. call EnforceSchema instead.")] + public async Task AttachSchema(string stationName, string schemaName) + { + await EnforceSchema(stationName, schemaName); + } + + /// + /// Applies schema to an existing station + /// + /// station name + /// schema name + /// + public async Task EnforceSchema(string stationName, string schemaName, int timeoutRetry = 5, CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(stationName)) + { + throw new ArgumentException($"{nameof(stationName)} cannot be null or empty"); + } + + if (string.IsNullOrEmpty(schemaName)) + { + throw new ArgumentException($"{nameof(schemaName)} cannot be null or empty"); + } + + try + { + var attachSchemaRequestModel = new AttachSchemaRequest() + { + SchemaName = schemaName, + StationName = stationName, + UserName = _userName + }; + + var attachSchemaModelJson = JsonSerDes.PrepareJsonString(attachSchemaRequestModel); + + byte[] attachSchemaReqBytes = Encoding.UTF8.GetBytes(attachSchemaModelJson); + + Msg attachSchemaResp = await RequestAsync(MemphisStations.MEMPHIS_SCHEMA_ATTACHMENTS, attachSchemaReqBytes, timeoutRetry, cancellationToken); + string errResp = Encoding.UTF8.GetString(attachSchemaResp.Data); + + if (!string.IsNullOrEmpty(errResp)) + { + throw new MemphisException(errResp); + } + } + catch (MemphisException) + { + throw; + } + catch (System.Exception e) + { + throw MemphisExceptions.FailedToAttachSchemaException(e); + } + } + + + /// + /// DetachSchema Schema from station + /// + /// station name + /// No object or value is returned by this method when it completes. + public async Task DetachSchema(string stationName, int timeoutRetry = 5, CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(stationName)) + { + throw new ArgumentException($"{nameof(stationName)} cannot be null or empty"); + } + + try + { + var detachSchemaRequestModel = new DetachSchemaRequest() + { + StationName = stationName, + UserName = _userName + }; + + var detachSchemaModelJson = JsonSerDes.PrepareJsonString(detachSchemaRequestModel); + + byte[] detachSchemaReqBytes = Encoding.UTF8.GetBytes(detachSchemaModelJson); + + Msg detachSchemaResp = await RequestAsync(MemphisStations.MEMPHIS_SCHEMA_DETACHMENTS, detachSchemaReqBytes, timeoutRetry, cancellationToken); + string errResp = Encoding.UTF8.GetString(detachSchemaResp.Data); + + if (!string.IsNullOrEmpty(errResp)) + { + throw new MemphisException(errResp); + } + } + catch (MemphisException) + { + throw; + } + catch (System.Exception e) + { + throw MemphisExceptions.FailedToAttachSchemaException(e); + } + } + + + /// + /// Create Station + /// + /// station name + /// cancellation token + /// An object representing the created station + public async Task CreateStation(string stationName, int timeoutRetry = 5, CancellationToken cancellationToken = default) + { + return await CreateStation(new StationOptions { Name = stationName }, timeoutRetry, cancellationToken); + } + + private Task ListenForFunctionUpdate(string stationName, int stationVersion, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(stationName) || diff --git a/src/Memphis.Client/MemphisClient.cs b/src/Memphis.Client/MemphisClient.cs index 3a198db..a90b4ff 100644 --- a/src/Memphis.Client/MemphisClient.cs +++ b/src/Memphis.Client/MemphisClient.cs @@ -152,274 +152,6 @@ PublishOptions options return await _jetStreamContext.PublishAsync(message, options); } - /// - /// Create Consumer for station - /// - /// options used to customize the behaviour of consumer - /// An object connected to the station from consuming data - public async Task CreateConsumer(MemphisConsumerOptions consumerOptions, int timeoutRetry = 5, CancellationToken cancellationToken = default) - { - EnsureBatchSizeIsValid(consumerOptions.BatchSize); - - if (_brokerConnection.IsClosed()) - { - throw MemphisExceptions.DeadConnectionException; - } - - consumerOptions.RealName = consumerOptions.ConsumerName.ToLower(); - - if (consumerOptions.GenerateUniqueSuffix) - { - consumerOptions.ConsumerName = $"{consumerOptions.ConsumerName}_{MemphisUtil.GetUniqueKey(8)}"; - } - - if (string.IsNullOrEmpty(consumerOptions.ConsumerGroup)) - { - consumerOptions.ConsumerGroup = consumerOptions.ConsumerName; - } - - try - { - var createConsumerModel = new CreateConsumerRequest - { - ConsumerName = consumerOptions.ConsumerName, - StationName = consumerOptions.StationName, - ConnectionId = _connectionId, - ConsumerType = "application", - ConsumerGroup = consumerOptions.ConsumerGroup, - MaxAckTimeMs = consumerOptions.MaxAckTimeMs, - MaxMsgCountForDelivery = consumerOptions.MaxMsgDeliveries, - UserName = _userName, - StartConsumeFromSequence = consumerOptions.StartConsumeFromSequence, - LastMessages = consumerOptions.LastMessages, - RequestVersion = MemphisRequestVersions.LastConsumerCreationRequestVersion, - ApplicationId = ApplicationId, - SdkLang = ".NET" - }; - - var createConsumerModelJson = JsonSerDes.PrepareJsonString(createConsumerModel); - - byte[] createConsumerReqBytes = Encoding.UTF8.GetBytes(createConsumerModelJson); - - Msg createConsumerResp = await RequestAsync(MemphisStations.MEMPHIS_CONSUMER_CREATIONS, createConsumerReqBytes, timeoutRetry, cancellationToken); - var responseStr = Encoding.UTF8.GetString(createConsumerResp.Data); - var createConsumerResponse = JsonConvert.DeserializeObject(responseStr); - - if (createConsumerResponse is null) - { - if (!string.IsNullOrEmpty(responseStr)) - { - throw new MemphisException(responseStr); - } - - var oldconsumer = new MemphisConsumer(this, consumerOptions); - _consumerCache.AddOrUpdate(oldconsumer.Key, oldconsumer, (_, _) => oldconsumer); - - return oldconsumer; - } - if (!string.IsNullOrEmpty(createConsumerResponse.Error)) - { - throw new MemphisException(responseStr); - } - - var consumer = new MemphisConsumer(this, consumerOptions, createConsumerResponse.PartitionsUpdate.PartitionsList); - _consumerCache.AddOrUpdate(consumer.Key, consumer, (_, _) => consumer); - - await ListenForSchemaUpdate(consumerOptions.StationName); - - return consumer; - - } - catch (MemphisException) - { - throw; - } - catch (System.Exception e) - { - throw MemphisExceptions.FailedToCreateConsumerException(e); - } - } - - - /// - /// Create Station - /// - /// options used to customize the parameters of station - /// cancellation token - /// An object representing the created station - public async Task CreateStation(StationOptions stationOptions, int timeoutRetry = 5, CancellationToken cancellationToken = default) - { - if (_brokerConnection.IsClosed()) - { - throw MemphisExceptions.DeadConnectionException; - } - - try - { - var createStationModel = new CreateStationRequest() - { - StationName = stationOptions.Name, - RetentionType = stationOptions.RetentionType, - RetentionValue = stationOptions.RetentionValue, - StorageType = stationOptions.StorageType, - Replicas = stationOptions.Replicas, - IdempotencyWindowsInMs = stationOptions.IdempotenceWindowMs, - SchemaName = stationOptions.SchemaName, - DlsConfiguration = new DlsConfiguration() - { - Poison = stationOptions.SendPoisonMessageToDls, - SchemaVerse = stationOptions.SendSchemaFailedMessageToDls, - }, - UserName = _userName, - TieredStorageEnabled = stationOptions.TieredStorageEnabled, - PartitionsNumber = stationOptions.PartitionsNumber, - DlsStation = stationOptions.DlsStation - }; - - var createStationModelJson = JsonSerDes.PrepareJsonString(createStationModel); - - byte[] createStationReqBytes = Encoding.UTF8.GetBytes(createStationModelJson); - - Msg createStationResp = await RequestAsync(MemphisStations.MEMPHIS_STATION_CREATIONS, createStationReqBytes, timeoutRetry, cancellationToken); - string errResp = Encoding.UTF8.GetString(createStationResp.Data); - - if (!string.IsNullOrEmpty(errResp)) - { - if (errResp.Contains("already exist")) - { - return new MemphisStation(this, stationOptions.Name); - } - - throw new MemphisException(errResp); - } - - return new MemphisStation(this, stationOptions.Name); - } - catch (MemphisException) - { - throw; - } - catch (System.Exception e) - { - throw MemphisExceptions.FailedToCreateStationException(e); - } - } - - /// - /// Create Station - /// - /// station name - /// cancellation token - /// An object representing the created station - public async Task CreateStation(string stationName, int timeoutRetry = 5, CancellationToken cancellationToken = default) - { - return await CreateStation(new StationOptions { Name = stationName }, timeoutRetry, cancellationToken); - } - - /// - /// Applies schema to an existing station - /// - /// station name - /// schema name - /// - public async Task EnforceSchema(string stationName, string schemaName, int timeoutRetry = 5, CancellationToken cancellationToken = default) - { - if (string.IsNullOrEmpty(stationName)) - { - throw new ArgumentException($"{nameof(stationName)} cannot be null or empty"); - } - - if (string.IsNullOrEmpty(schemaName)) - { - throw new ArgumentException($"{nameof(schemaName)} cannot be null or empty"); - } - - try - { - var attachSchemaRequestModel = new AttachSchemaRequest() - { - SchemaName = schemaName, - StationName = stationName, - UserName = _userName - }; - - var attachSchemaModelJson = JsonSerDes.PrepareJsonString(attachSchemaRequestModel); - - byte[] attachSchemaReqBytes = Encoding.UTF8.GetBytes(attachSchemaModelJson); - - Msg attachSchemaResp = await RequestAsync(MemphisStations.MEMPHIS_SCHEMA_ATTACHMENTS, attachSchemaReqBytes, timeoutRetry, cancellationToken); - string errResp = Encoding.UTF8.GetString(attachSchemaResp.Data); - - if (!string.IsNullOrEmpty(errResp)) - { - throw new MemphisException(errResp); - } - } - catch (MemphisException) - { - throw; - } - catch (System.Exception e) - { - throw MemphisExceptions.FailedToAttachSchemaException(e); - } - - } - - /// - /// Attach schema to an existing station - /// - /// station name - /// schema name - /// - [Obsolete("This method is depricated. call EnforceSchema instead.")] - public async Task AttachSchema(string stationName, string schemaName) - { - await EnforceSchema(stationName, schemaName); - } - - /// - /// DetachSchema Schema from station - /// - /// station name - /// No object or value is returned by this method when it completes. - public async Task DetachSchema(string stationName, int timeoutRetry = 5, CancellationToken cancellationToken = default) - { - if (string.IsNullOrEmpty(stationName)) - { - throw new ArgumentException($"{nameof(stationName)} cannot be null or empty"); - } - - try - { - var detachSchemaRequestModel = new DetachSchemaRequest() - { - StationName = stationName, - UserName = _userName - }; - - var detachSchemaModelJson = JsonSerDes.PrepareJsonString(detachSchemaRequestModel); - - byte[] detachSchemaReqBytes = Encoding.UTF8.GetBytes(detachSchemaModelJson); - - Msg detachSchemaResp = await RequestAsync(MemphisStations.MEMPHIS_SCHEMA_DETACHMENTS, detachSchemaReqBytes, timeoutRetry, cancellationToken); - string errResp = Encoding.UTF8.GetString(detachSchemaResp.Data); - - if (!string.IsNullOrEmpty(errResp)) - { - throw new MemphisException(errResp); - } - } - catch (MemphisException) - { - throw; - } - catch (System.Exception e) - { - throw MemphisExceptions.FailedToAttachSchemaException(e); - } - } - /// /// Creates schema from the specified file path. In case schema is already exist a new version will be created /// @@ -529,46 +261,15 @@ internal string GetStationSchemaType(string internalStationName) internal async Task ValidateMessageAsync(byte[] message, string internalStationName, string producerName) { - if (!_schemaUpdateDictionary.TryGetValue(internalStationName, - out SchemaUpdateInit schemaUpdateInit)) - { + if (!_schemaUpdateDictionary.TryGetValue(internalStationName, out SchemaUpdateInit schemaUpdateInit) || + schemaUpdateInit is null) return; - } try { - switch (schemaUpdateInit.SchemaType) - { - case SchemaUpdateInit.SchemaTypes.JSON: - { - if (_schemaValidators.TryGetValue(ValidatorType.JSON, out ISchemaValidator schemaValidator)) - { - await schemaValidator.ValidateAsync(message, schemaUpdateInit.SchemaName); - } - - break; - } - case SchemaUpdateInit.SchemaTypes.GRAPHQL: - { - if (_schemaValidators.TryGetValue(ValidatorType.GRAPHQL, out ISchemaValidator schemaValidator)) - { - await schemaValidator.ValidateAsync(message, schemaUpdateInit.SchemaName); - } - - break; - } - case SchemaUpdateInit.SchemaTypes.PROTOBUF: - { - if (_schemaValidators.TryGetValue(ValidatorType.PROTOBUF, out ISchemaValidator schemaValidator)) - { - await schemaValidator.ValidateAsync(message, schemaUpdateInit.SchemaName); - } - - break; - } - default: - throw new NotImplementedException($"Schema of type: {schemaUpdateInit.SchemaType} not implemented"); - } + var validatorType = MemphisSchemaTypes.ToValidator(schemaUpdateInit.SchemaType); + if (_schemaValidators.TryGetValue(validatorType, out ISchemaValidator validator)) + await validator.ValidateAsync(message, schemaUpdateInit.SchemaName); } catch (MemphisSchemaValidationException e) { @@ -596,15 +297,15 @@ internal async Task NotifyRemoveConsumer(string stationName) private async Task RemoveFromSchemaUpdateListener(string stationName) { var internalStationName = MemphisUtil.GetInternalName(stationName); - if (_stationSchemaUpdateListeners.TryGetValue(internalStationName, out int updateListenrsCount)) + if (_stationSchemaUpdateListeners.TryGetValue(internalStationName, out int updateListenersCount)) { - if (updateListenrsCount == 0) + if (updateListenersCount == 0) { return; } - var updateListenerCountAfterRemove = updateListenrsCount - 1; - _stationSchemaUpdateListeners.TryUpdate(internalStationName, updateListenerCountAfterRemove, updateListenrsCount); + var updateListenerCountAfterRemove = updateListenersCount - 1; + _stationSchemaUpdateListeners.TryUpdate(internalStationName, updateListenerCountAfterRemove, updateListenersCount); if (updateListenerCountAfterRemove <= 0) { @@ -627,35 +328,6 @@ private async Task RemoveFromSchemaUpdateListener(string stationName) } } - /// - /// Create a new consumer - /// - /// Fetch message options - /// MemphisConsumer - public async Task CreateConsumer(FetchMessageOptions fetchMessageOptions, int timeoutRetry = 5, CancellationToken cancellationToken = default) - { - return await CreateConsumer(new MemphisConsumerOptions - { - StationName = fetchMessageOptions.StationName, - ConsumerName = fetchMessageOptions.ConsumerName, - ConsumerGroup = fetchMessageOptions.ConsumerGroup, - BatchSize = fetchMessageOptions.BatchSize, - BatchMaxTimeToWaitMs = fetchMessageOptions.BatchMaxTimeToWaitMs, - MaxAckTimeMs = fetchMessageOptions.MaxAckTimeMs, - MaxMsgDeliveries = fetchMessageOptions.MaxMsgDeliveries, - GenerateUniqueSuffix = fetchMessageOptions.GenerateUniqueSuffix, - StartConsumeFromSequence = fetchMessageOptions.StartConsumeFromSequence, - LastMessages = fetchMessageOptions.LastMessages, - }, timeoutRetry, cancellationToken); - } - - internal IConsumerContext GetConsumerContext(string streamName, string durableName) - { - var streamContext = _brokerConnection.GetStreamContext(streamName); - var consumerContext = streamContext.GetConsumerContext(durableName); - return consumerContext; - } - internal async Task RemoveStation(MemphisStation station, int timeoutRetry = 5, CancellationToken cancellationToken = default) { @@ -785,65 +457,17 @@ private async Task ProcessAndStoreSchemaUpdate(string internalStationName, Schem schemaUpdate, (key, _) => schemaUpdate); - switch (schemaUpdate.SchemaType) + var validatorType = MemphisSchemaTypes.ToValidator(schemaUpdate.SchemaType); + if (_schemaValidators.TryGetValue(validatorType, out ISchemaValidator schemaValidatorCache)) { - case SchemaUpdateInit.SchemaTypes.JSON: - { - if (_schemaValidators.TryGetValue(ValidatorType.JSON, out ISchemaValidator schemaValidator)) - { - bool isDone = schemaValidator.ParseAndStore( - schemaUpdate.SchemaName, - schemaUpdate.ActiveVersion?.Content); - - if (!isDone) - { - //TODO raise notification regarding unable to parse schema pushed by Memphis - throw new InvalidOperationException($"Unable to parse and store " + - $"schema: {schemaUpdate?.SchemaName}, type: {schemaUpdate?.SchemaType}" + - $"in local cache"); - } - } - - break; - } - case SchemaUpdateInit.SchemaTypes.GRAPHQL: - { - if (_schemaValidators.TryGetValue(ValidatorType.GRAPHQL, out ISchemaValidator schemaValidator)) - { - bool isDone = schemaValidator.ParseAndStore( - schemaUpdate.SchemaName, - schemaUpdate.ActiveVersion?.Content); - - if (!isDone) - { - //TODO raise notification regarding unable to parse schema pushed by Memphis - throw new InvalidOperationException($"Unable to parse and store " + - $"schema: {schemaUpdate?.SchemaName}, type: {schemaUpdate?.SchemaType}" + - $"in local cache"); - } - } - - break; - } - case SchemaUpdateInit.SchemaTypes.PROTOBUF: - { - if (_schemaValidators.TryGetValue(ValidatorType.PROTOBUF, out ISchemaValidator schemaValidator)) - { - bool isDone = schemaValidator.ParseAndStore( - schemaUpdate.SchemaName, - JsonConvert.SerializeObject(schemaUpdate.ActiveVersion)); - - if (!isDone) - { - //TODO raise notification regarding unable to parse schema pushed by Memphis - throw new InvalidOperationException($"Unable to parse and store " + - $"schema: {schemaUpdate?.SchemaName}, type: {schemaUpdate?.SchemaType}" + - $"in local cache"); - } - } - - break; - } + var schemaStored = schemaValidatorCache.AddOrUpdateSchema(schemaUpdate); + if (!schemaStored) + { + //TODO raise notification regarding unable to parse schema pushed by Memphis + throw new InvalidOperationException($"Unable to parse and store " + + $"schema: {schemaUpdate?.SchemaName}, type: {schemaUpdate?.SchemaType}" + + $"in local cache"); + } } } @@ -1025,6 +649,10 @@ void SyncSdkClientUpdate() } } } + catch when (!IsConnected()) + { + // Connection is closed + } catch (System.Exception exception) { throw new MemphisException(exception.Message, exception); diff --git a/src/Memphis.Client/Models/Response/SchemaUpdateInit.cs b/src/Memphis.Client/Models/Response/SchemaUpdateInit.cs index 6756edd..4606df9 100644 --- a/src/Memphis.Client/Models/Response/SchemaUpdateInit.cs +++ b/src/Memphis.Client/Models/Response/SchemaUpdateInit.cs @@ -13,11 +13,4 @@ internal sealed class SchemaUpdateInit [DataMember(Name = "type")] public string SchemaType { get; set; } - - internal static class SchemaTypes - { - public const string PROTOBUF = "protobuf"; - public const string JSON = "json"; - public const string GRAPHQL = "graphql"; - } } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/AvroValidator.cs b/src/Memphis.Client/Validators/AvroValidator.cs index ef5d931..36de17e 100644 --- a/src/Memphis.Client/Validators/AvroValidator.cs +++ b/src/Memphis.Client/Validators/AvroValidator.cs @@ -28,7 +28,26 @@ public Task ValidateAsync(byte[] messageToValidate, string schemaKey) } } - protected override string Parse(string schemaData, string schemaName) + public bool AddOrUpdateSchema(SchemaUpdateInit schemaUpdate) + { + if (!IsSchemaUpdateValid(schemaUpdate)) + return false; + + try + { + var schemeName = schemaUpdate.SchemaName; + var schemaData = schemaUpdate.ActiveVersion.Content; + var newSchema = Parse(schemaData, schemeName); + _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); + return true; + } + catch + { + return false; + } + } + + private string Parse(string schemaData, string schemaName) { try { diff --git a/src/Memphis.Client/Validators/GraphqlValidator.cs b/src/Memphis.Client/Validators/GraphqlValidator.cs index 791d8b5..065413f 100644 --- a/src/Memphis.Client/Validators/GraphqlValidator.cs +++ b/src/Memphis.Client/Validators/GraphqlValidator.cs @@ -1,22 +1,17 @@ -using System.Linq; -using GraphQL; +using GraphQL; using GraphQL.Types; namespace Memphis.Client.Validators; -internal class GraphqlValidator : ISchemaValidator +internal class GraphqlValidator : SchemaValidator, ISchemaValidator { private readonly IDocumentExecuter _documentExecutor; - private readonly ConcurrentDictionary _schemaCache; - public GraphqlValidator() { - this._documentExecutor = new DocumentExecuter(); - this._schemaCache = new ConcurrentDictionary(); + _documentExecutor = new DocumentExecuter(); } - public async Task ValidateAsync(byte[] messageToValidate, string schemaName) { if (_schemaCache.TryGetValue(schemaName, out ISchema schemaObj)) @@ -42,33 +37,22 @@ public async Task ValidateAsync(byte[] messageToValidate, string schemaName) throw new MemphisSchemaValidationException($"Schema: {schemaName} not found in local cache"); } - public bool ParseAndStore(string schemeName, string schemaData) + public bool AddOrUpdateSchema(SchemaUpdateInit schemaUpdate) { - if (string.IsNullOrEmpty(schemeName)) - { - throw new ArgumentException($"Invalid value provided for {schemeName}"); - } - - if (string.IsNullOrEmpty(schemaData)) - { - throw new ArgumentException($"Invalid value provided for {schemaData}"); - } + if (!IsSchemaUpdateValid(schemaUpdate)) + return false; try { + var schemeName = schemaUpdate.SchemaName; + var schemaData = schemaUpdate.ActiveVersion.Content; var newSchema = Schema.For(schemaData); _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); - return true; } - catch (System.Exception) + catch { return false; } } - - public void RemoveSchema(string schemaName) - { - _schemaCache.TryRemove(schemaName, out ISchema schemaObj); - } } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/ISchemaValidator.cs b/src/Memphis.Client/Validators/ISchemaValidator.cs index 806c47d..46a8cd3 100644 --- a/src/Memphis.Client/Validators/ISchemaValidator.cs +++ b/src/Memphis.Client/Validators/ISchemaValidator.cs @@ -4,7 +4,7 @@ internal interface ISchemaValidator { Task ValidateAsync(byte[] messageToValidate, string schemaAsStr); - bool ParseAndStore(string schemeName, string schemaData); + bool AddOrUpdateSchema(SchemaUpdateInit schemaUpdate); void RemoveSchema(string schemaName); } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/JsonValidator.cs b/src/Memphis.Client/Validators/JsonValidator.cs index 4df571f..c6ce9d9 100644 --- a/src/Memphis.Client/Validators/JsonValidator.cs +++ b/src/Memphis.Client/Validators/JsonValidator.cs @@ -1,14 +1,9 @@ -using System.Linq; using NJsonSchema; namespace Memphis.Client.Validators; internal class JsonValidator : SchemaValidator, ISchemaValidator { - protected override JsonSchema Parse(string schemaData, string _) - { - return JsonSchema.FromJsonAsync(schemaData).GetAwaiter().GetResult(); - } public Task ValidateAsync(byte[] messageToValidate, string schemaAsStr) { @@ -25,7 +20,7 @@ public Task ValidateAsync(byte[] messageToValidate, string schemaAsStr) { sb.AppendLine(error.ToString()); } - + throw new MemphisSchemaValidationException($"Schema validation has failed: \n {sb.ToString()}"); } catch (System.Exception ex) @@ -33,4 +28,28 @@ public Task ValidateAsync(byte[] messageToValidate, string schemaAsStr) throw new MemphisSchemaValidationException($"Schema validation has failed: \n {ex.Message}", ex); } } + + public bool AddOrUpdateSchema(SchemaUpdateInit schemaUpdate) + { + if (!IsSchemaUpdateValid(schemaUpdate)) + return false; + + try + { + var schemeName = schemaUpdate.SchemaName; + var schemaData = schemaUpdate.ActiveVersion.Content; + var newSchema = Parse(schemaData, schemeName); + _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); + return true; + } + catch + { + return false; + } + } + + private JsonSchema Parse(string schemaData, string _) + { + return JsonSchema.FromJsonAsync(schemaData).GetAwaiter().GetResult(); + } } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/ProtoBufValidator.cs b/src/Memphis.Client/Validators/ProtoBufValidator.cs index 156034c..f7fbd74 100644 --- a/src/Memphis.Client/Validators/ProtoBufValidator.cs +++ b/src/Memphis.Client/Validators/ProtoBufValidator.cs @@ -1,21 +1,23 @@ -namespace Memphis.Client.Validators; + +namespace Memphis.Client.Validators; #nullable disable internal class ProtoBufSchema { - public ProtoBufSchema(string name, string activeVersion) + public ProtoBufSchema(string name, string activeVersionBase64) { SchemaName = name; - ActiveSchemaVersion = activeVersion; + ActiveSchemaVersionBase64 = activeVersionBase64; } - public string ActiveSchemaVersion { get; set; } + public string ActiveSchemaVersionBase64 { get; set; } public string SchemaName { get; set; } } #nullable enable internal class ProtoBufValidator : SchemaValidator, ISchemaValidator { + public async Task ValidateAsync(byte[] messageToValidate, string schemaAsStr) { if (!_schemaCache.TryGetValue(schemaAsStr, out var protoBufSchema)) @@ -24,10 +26,10 @@ public async Task ValidateAsync(byte[] messageToValidate, string schemaAsStr) try { var result = await ProtoBufEval.ProtoBufValidator.Validate( - base64Data: Convert.ToBase64String(messageToValidate), - activeSchemaVersionBase64: Convert.ToBase64String(Encoding.UTF8.GetBytes(protoBufSchema.ActiveSchemaVersion)), + base64Data: Convert.ToBase64String(messageToValidate), + activeSchemaVersionBase64: protoBufSchema.ActiveSchemaVersionBase64, schemaName: protoBufSchema.SchemaName); - if(result.HasError) + if (result.HasError) throw new MemphisSchemaValidationException($"Schema validation has failed: \n {result.Error}"); } @@ -37,8 +39,37 @@ public async Task ValidateAsync(byte[] messageToValidate, string schemaAsStr) } } - protected override ProtoBufSchema Parse(string schemaData, string schemaName) + public bool AddOrUpdateSchema(SchemaUpdateInit schemaUpdate) + { + if (!IsSchemaUpdateValid(schemaUpdate)) + return false; + + try + { + var schemeName = schemaUpdate.SchemaName; + var newSchema = Parse(schemaUpdate.ActiveVersion, schemeName); + _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); + return true; + } + catch + { + return false; + } + + } + + private ProtoBufSchema Parse(ProducerSchemaUpdateVersion activeVersion, string schemaName) { - return new(name: schemaName, activeVersion: schemaData); + var avj = JsonConvert.SerializeObject(new + { + version_number = Convert.ToInt32(activeVersion.VersionNumber), + descriptor = activeVersion.Descriptor, + schema_content = activeVersion.Content, + message_struct_name = activeVersion.MessageStructName + }); + + var av64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(avj)); + + return new(name: schemaName, activeVersionBase64: av64); } } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/SchemaValidator.cs b/src/Memphis.Client/Validators/SchemaValidator.cs index c58c2b5..df0c63a 100644 --- a/src/Memphis.Client/Validators/SchemaValidator.cs +++ b/src/Memphis.Client/Validators/SchemaValidator.cs @@ -6,38 +6,24 @@ internal abstract class SchemaValidator public SchemaValidator() { - this._schemaCache = new ConcurrentDictionary(); + _schemaCache = new ConcurrentDictionary(); } - public bool ParseAndStore(string schemeName, string schemaData) + public void RemoveSchema(string schemaName) { - if (string.IsNullOrEmpty(schemeName)) - { - throw new ArgumentException($"Invalid value provided for {schemeName}"); - } - - if (string.IsNullOrEmpty(schemaData)) - { - throw new ArgumentException($"Invalid value provided for {schemaData}"); - } + _schemaCache.TryRemove(schemaName, out TSchema _); + } - try - { - var newSchema = Parse(schemaData, schemeName); - _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); + protected bool IsSchemaUpdateValid(SchemaUpdateInit schemaUpdate) + { + if (schemaUpdate is null || + schemaUpdate is { ActiveVersion: null }) + return false; - return true; - } - catch (System.Exception) - { + if (string.IsNullOrEmpty(schemaUpdate.ActiveVersion.Content) || + string.IsNullOrEmpty(schemaUpdate.SchemaName)) return false; - } - } - public void RemoveSchema(string schemaName) - { - _schemaCache.TryRemove(schemaName, out TSchema _); + return true; } - - protected abstract TSchema Parse(string schemaData, string schemaName); } \ No newline at end of file diff --git a/src/ProtoBufEval.Tests/ProtoBufValidatorTests.cs b/src/ProtoBufEval.Tests/ProtoBufValidatorTests.cs index 2488d44..c8ceb96 100644 --- a/src/ProtoBufEval.Tests/ProtoBufValidatorTests.cs +++ b/src/ProtoBufEval.Tests/ProtoBufValidatorTests.cs @@ -1,3 +1,4 @@ +using Newtonsoft.Json; using ProtoBuf; using System.Text; @@ -24,18 +25,31 @@ public class Test [ProtoContract] -public class InvalidTestModel +file class ValidModel { [ProtoMember(1, Name = @"field1")] - [global::System.ComponentModel.DefaultValue("")] public string Field1 { get; set; } = ""; + [ProtoMember(2, Name = @"field2")] + public string Field2 { get; set; } = ""; + [ProtoMember(3, Name = @"field3")] public int Field3 { get; set; } +} + + +[ProtoContract] +public class InvalidModel +{ + [ProtoMember(1, Name = @"field1")] + public string Field1 { get; set; } = ""; + [ProtoMember(2, Name = @"field2")] + public string Field2 { get; set; } = ""; } + public class ProtoBufValidatorTests { [Theory] @@ -92,6 +106,36 @@ public async Task GivenInvalidPayload_WhenValidate_ThenHasError(string activeSch } + [Fact] + public async Task GivenInvalidData_WhenValidate_ThenHasError() + { + var base64InvalidData = ConvertProtoBufToBase64(new InvalidModel + { + Field1 = "AwesomeFirst", + Field2 = "SecondField" + }); + var activeSchemaVersionBase64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(ActiveVersion())); + + var result = await ProtoBufValidator.Validate(base64InvalidData, activeSchemaVersionBase64, "testschema"); + + Assert.True(result.HasError); + } + + [Fact] + public async Task GivenValidData_WhenValidate_ThenHasError() + { + var base64InvalidData = ConvertProtoBufToBase64(new ValidModel + { + Field1 = "AwesomeFirst", + Field2 = "SecondField", + Field3 = 333, + }); + var activeSchemaVersionBase64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(ActiveVersion())); + + var result = await ProtoBufValidator.Validate(base64InvalidData, activeSchemaVersionBase64, "testschema"); + + Assert.False(result.HasError); + } private static string ConvertProtoBufToBase64(TData obj) where TData : class { @@ -99,4 +143,16 @@ private static string ConvertProtoBufToBase64(TData obj) where TData : cl ProtoBuf.Serializer.Serialize(stream, obj); return Convert.ToBase64String(stream.ToArray()); } + + private static string ActiveVersion() + { + return JsonConvert.SerializeObject(new + { + version_number = 1, + descriptor = "CmQKEnRlc3RzY2hlbWFfMS5wcm90byJOCgRUZXN0EhYKBmZpZWxkMRgBIAIoCVIGZmllbGQxEhYKBmZpZWxkMhgCIAIoCVIGZmllbGQyEhYKBmZpZWxkMxgDIAIoBVIGZmllbGQz", + schema_content = "syntax = \"proto2\";\nmessage Test {\n required string field1 = 1;\n required string field2 = 2;\n required int32 field3 = 3;\n}", + message_struct_name = "Test" + }); + } + } \ No newline at end of file diff --git a/src/ProtoBufEval/ProtoBufEvalMissingDependencyException.cs b/src/ProtoBufEval/ProtoBufEvalMissingDependencyException.cs new file mode 100644 index 0000000..fa84cc5 --- /dev/null +++ b/src/ProtoBufEval/ProtoBufEvalMissingDependencyException.cs @@ -0,0 +1,8 @@ +namespace ProtoBufEval; + +internal class ProtoBufEvalMissingDependencyException : Exception +{ + public ProtoBufEvalMissingDependencyException(string message) : base(message) + { + } +} \ No newline at end of file diff --git a/src/ProtoBufEval/RuntimeEnvironment.cs b/src/ProtoBufEval/RuntimeEnvironment.cs index 4f918b9..a936fd6 100644 --- a/src/ProtoBufEval/RuntimeEnvironment.cs +++ b/src/ProtoBufEval/RuntimeEnvironment.cs @@ -1,5 +1,4 @@ using System.IO.Compression; -using System.Reflection; using System.Runtime.InteropServices; namespace ProtoBufEval; @@ -42,6 +41,7 @@ public static string NativeBinary var compressedBinary = RuntimeInformation.IsOSPlatform(OSPlatform.OSX) ? Path.Combine(NativeBinariesDir, $"{OS}.zip") : Path.Combine(NativeBinariesDir, $"{OS}-{Arch}.zip"); + EnsureDependencyExists(compressedBinary); var binaryFilePath = Path.Combine(binaryDir, BinaryName); if (File.Exists(binaryFilePath)) File.Delete(binaryFilePath); @@ -95,4 +95,10 @@ private static string Arch }; } } + + private static void EnsureDependencyExists(string path) + { + if (!File.Exists(path)) + throw new ProtoBufEvalMissingDependencyException($"Missing dependency for ProtoBufEval: {path}"); + } } \ No newline at end of file