From 65362199f13bdad8a0831541f53d92e1e95a8a37 Mon Sep 17 00:00:00 2001 From: Ivan Braiko <89481851+ISBronny@users.noreply.github.com> Date: Fri, 1 Sep 2023 12:14:17 +0300 Subject: [PATCH] Methods for working with compatibility (#2097) * Methods for working with compatibility * Review fix: Remove GetGlobalCompatibilityAsync * Update CHANGELOG and test --------- Co-authored-by: Anchit Jain --- CHANGELOG.md | 1 + .../CachedSchemaRegistryClient.cs | 12 ++++++ .../ISchemaRegistryClient.cs | 29 +++++++++++++ .../Rest/DataContracts/Compatibility.cs | 20 ++++++++- .../Rest/IRestService.cs | 4 +- .../Rest/RestService.cs | 26 +++++------- .../Tests/UpdateCompatibility.cs | 41 +++++++++++++++++++ 7 files changed, 114 insertions(+), 19 deletions(-) create mode 100644 test/Confluent.SchemaRegistry.IntegrationTests/Tests/UpdateCompatibility.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f1971a0c..9b78527c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Enhancements - Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042). +- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097). # 2.2.0 diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index fbf6d88b0..f757e113e 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -47,6 +47,8 @@ namespace Confluent.SchemaRegistry /// - /// - /// - + /// - + /// - /// public class CachedSchemaRegistryClient : ISchemaRegistryClient, IDisposable { @@ -524,6 +526,16 @@ public string ConstructKeySubjectName(string topic, string recordType = null) public string ConstructValueSubjectName(string topic, string recordType = null) => valueSubjectNameStrategy(new SerializationContext(MessageComponentType.Value, topic), recordType); + /// + public async Task GetCompatibilityAsync(string subject = null) + => await restService.GetCompatibilityAsync(subject) + .ConfigureAwait(continueOnCapturedContext: false); + + /// + public async Task UpdateCompatibilityAsync(Compatibility compatibility, string subject = null) + => await restService.UpdateCompatibilityAsync(subject, compatibility) + .ConfigureAwait(continueOnCapturedContext: false); + /// /// Releases unmanaged resources owned by this CachedSchemaRegistryClient instance. diff --git a/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs index 6788ab6e5..952ea61b4 100644 --- a/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs @@ -291,5 +291,34 @@ public interface ISchemaRegistryClient : IDisposable /// [Obsolete("SubjectNameStrategy should now be specified via serializer configuration. This method will be removed in a future release.")] string ConstructValueSubjectName(string topic, string recordType = null); + + + /// + /// If the subject is specified returns compatibility type for the specified subject. + /// Otherwise returns global compatibility type. + /// + /// + /// The subject to get the compatibility for. + /// + /// + /// Compatibility type. + /// + Task GetCompatibilityAsync(string subject = null); + + + /// + /// If the subject is specified sets compatibility type for the specified subject. + /// Otherwise sets global compatibility type. + /// + /// + /// The subject to set the compatibility for. + /// + /// + /// Compatibility type. + /// + /// + /// New compatibility type. + /// + Task UpdateCompatibilityAsync(Compatibility compatibility, string subject = null); } } diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs index 4a1296bbd..89d062e76 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs @@ -50,6 +50,24 @@ public enum Compatibility /// Full schema compatibility. /// [EnumMember(Value = "FULL")] - Full + Full, + + /// + /// Forward transitive schema compatibility. + /// + [EnumMember(Value = "FORWARD_TRANSITIVE")] + ForwardTransitive, + + /// + /// Backward transitive schema compatibility. + /// + [EnumMember(Value = "BACKWARD_TRANSITIVE")] + BackwardTransitive, + + /// + /// Full transitive schema compatibility. + /// + [EnumMember(Value = "FULL_TRANSITIVE")] + FullTransitive } } diff --git a/src/Confluent.SchemaRegistry/Rest/IRestService.cs b/src/Confluent.SchemaRegistry/Rest/IRestService.cs index a1a3d3fe5..7df234b2c 100644 --- a/src/Confluent.SchemaRegistry/Rest/IRestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/IRestService.cs @@ -29,15 +29,13 @@ namespace Confluent.SchemaRegistry internal interface IRestService : IDisposable { Task GetCompatibilityAsync(string subject); - Task GetGlobalCompatibilityAsync(); Task GetLatestSchemaAsync(string subject); Task GetSchemaAsync(int id, string format = null); Task GetSchemaAsync(string subject, int version); Task> GetSubjectsAsync(); Task> GetSubjectVersionsAsync(string subject); Task RegisterSchemaAsync(string subject, Schema schema, bool normalize); - Task SetCompatibilityAsync(string subject, Compatibility compatibility); - Task SetGlobalCompatibilityAsync(Compatibility compatibility); + Task UpdateCompatibilityAsync(string subject, Compatibility compatibility); Task TestCompatibilityAsync(string subject, int versionId, Schema schema); Task TestLatestCompatibilityAsync(string subject, Schema schema); Task LookupSchemaAsync(string subject, Schema schema, bool ignoreDeletedSchemas, bool normalize); diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index ccee8662d..9f7ec21ae 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -355,25 +355,21 @@ public async Task TestLatestCompatibilityAsync(string subject, Schema sche : (await RequestAsync($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/latest", HttpMethod.Post, schema) .ConfigureAwait(continueOnCapturedContext: false)).IsCompatible; - #endregion Compatibility + #endregion Compatibility #region Config - public async Task GetGlobalCompatibilityAsync() - => (await RequestAsync("config", HttpMethod.Get) - .ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel; - public async Task GetCompatibilityAsync(string subject) - => (await RequestAsync($"config/{WebUtility.UrlEncode(subject)}", HttpMethod.Get) - .ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel; - - public async Task SetGlobalCompatibilityAsync(Compatibility compatibility) - => await RequestAsync("config", HttpMethod.Put, new Config(compatibility)) - .ConfigureAwait(continueOnCapturedContext: false); - - public async Task SetCompatibilityAsync(string subject, Compatibility compatibility) - => await RequestAsync($"config/{WebUtility.UrlEncode(subject)}", HttpMethod.Put, new Config(compatibility)) - .ConfigureAwait(continueOnCapturedContext: false); + => (await RequestAsync( + string.IsNullOrEmpty(subject) ? "config" : $"config/{WebUtility.UrlEncode(subject)}", + HttpMethod.Get) + .ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel; + + public async Task UpdateCompatibilityAsync(string subject, Compatibility compatibility) + => (await RequestAsync( + string.IsNullOrEmpty(subject) ? "config" : $"config/{WebUtility.UrlEncode(subject)}", + HttpMethod.Put, new Config(compatibility)) + .ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel; #endregion Config diff --git a/test/Confluent.SchemaRegistry.IntegrationTests/Tests/UpdateCompatibility.cs b/test/Confluent.SchemaRegistry.IntegrationTests/Tests/UpdateCompatibility.cs new file mode 100644 index 000000000..de07b9d92 --- /dev/null +++ b/test/Confluent.SchemaRegistry.IntegrationTests/Tests/UpdateCompatibility.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace Confluent.SchemaRegistry.IntegrationTests; + +public static partial class Tests +{ + [Theory, MemberData(nameof(SchemaRegistryParameters))] + public static async Task UpdateCompatibility(Config config) + { + var sr = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = config.Server }); + + // Case 1: Subject is not specified + + var globalCompatibility = await sr.UpdateCompatibilityAsync(Compatibility.BackwardTransitive); + Assert.Equal(Compatibility.BackwardTransitive, globalCompatibility); + + Assert.Equal(Compatibility.BackwardTransitive, await sr.GetCompatibilityAsync()); + + // Case 2: Subject is specified + + var testSchema = + "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"Confluent.Kafka.Examples.AvroSpecific" + + "\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"i" + + "nt\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}"; + + + var topicName = Guid.NewGuid().ToString(); + var subject = + SubjectNameStrategy.Topic.ConstructKeySubjectName(topicName, "Confluent.Kafka.Examples.AvroSpecific.User"); + + await sr.RegisterSchemaAsync(subject, testSchema); + + var compatibility = await sr.UpdateCompatibilityAsync(Compatibility.FullTransitive, subject); + Assert.Equal(Compatibility.FullTransitive, compatibility); + + Assert.Equal(Compatibility.FullTransitive, await sr.GetCompatibilityAsync(subject)); + Assert.Equal(Compatibility.BackwardTransitive, await sr.GetCompatibilityAsync()); + } +} \ No newline at end of file