Skip to content

Commit

Permalink
Methods for working with compatibility (#2097)
Browse files Browse the repository at this point in the history
* Methods for working with compatibility

* Review fix: Remove GetGlobalCompatibilityAsync

* Update CHANGELOG and test

---------

Co-authored-by: Anchit Jain <[email protected]>
  • Loading branch information
ISBronny and anchitj authored Sep 1, 2023
1 parent 5d38703 commit 6536219
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Enhancements

- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).


# 2.2.0
Expand Down
12 changes: 12 additions & 0 deletions src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ namespace Confluent.SchemaRegistry
/// - <see cref="CachedSchemaRegistryClient.GetSubjectVersionsAsync(string)" />
/// - <see cref="CachedSchemaRegistryClient.IsCompatibleAsync(string, Schema)" />
/// - <see cref="CachedSchemaRegistryClient.IsCompatibleAsync(string, string)" />
/// - <see cref="CachedSchemaRegistryClient.GetCompatibilityAsync(string)" />
/// - <see cref="CachedSchemaRegistryClient.UpdateCompatibilityAsync(Compatibility, string)" />
/// </summary>
public class CachedSchemaRegistryClient : ISchemaRegistryClient, IDisposable
{
Expand Down Expand Up @@ -524,6 +526,16 @@ public string ConstructKeySubjectName(string topic, string recordType = null)
public string ConstructValueSubjectName(string topic, string recordType = null)
=> valueSubjectNameStrategy(new SerializationContext(MessageComponentType.Value, topic), recordType);

/// <inheritdoc />
public async Task<Compatibility> GetCompatibilityAsync(string subject = null)
=> await restService.GetCompatibilityAsync(subject)
.ConfigureAwait(continueOnCapturedContext: false);

/// <inheritdoc />
public async Task<Compatibility> UpdateCompatibilityAsync(Compatibility compatibility, string subject = null)
=> await restService.UpdateCompatibilityAsync(subject, compatibility)
.ConfigureAwait(continueOnCapturedContext: false);


/// <summary>
/// Releases unmanaged resources owned by this CachedSchemaRegistryClient instance.
Expand Down
29 changes: 29 additions & 0 deletions src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,34 @@ public interface ISchemaRegistryClient : IDisposable
/// </returns>
[Obsolete("SubjectNameStrategy should now be specified via serializer configuration. This method will be removed in a future release.")]
string ConstructValueSubjectName(string topic, string recordType = null);


/// <summary>
/// If the subject is specified returns compatibility type for the specified subject.
/// Otherwise returns global compatibility type.
/// </summary>
/// <param name="subject">
/// The subject to get the compatibility for.
/// </param>
/// <returns>
/// Compatibility type.
/// </returns>
Task<Compatibility> GetCompatibilityAsync(string subject = null);


/// <summary>
/// If the subject is specified sets compatibility type for the specified subject.
/// Otherwise sets global compatibility type.
/// </summary>
/// <param name="subject">
/// The subject to set the compatibility for.
/// </param>
/// <param name="compatibility">
/// Compatibility type.
/// </param>
/// <returns>
/// New compatibility type.
/// </returns>
Task<Compatibility> UpdateCompatibilityAsync(Compatibility compatibility, string subject = null);
}
}
20 changes: 19 additions & 1 deletion src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ public enum Compatibility
/// Full schema compatibility.
/// </summary>
[EnumMember(Value = "FULL")]
Full
Full,

/// <summary>
/// Forward transitive schema compatibility.
/// </summary>
[EnumMember(Value = "FORWARD_TRANSITIVE")]
ForwardTransitive,

/// <summary>
/// Backward transitive schema compatibility.
/// </summary>
[EnumMember(Value = "BACKWARD_TRANSITIVE")]
BackwardTransitive,

/// <summary>
/// Full transitive schema compatibility.
/// </summary>
[EnumMember(Value = "FULL_TRANSITIVE")]
FullTransitive
}
}
4 changes: 1 addition & 3 deletions src/Confluent.SchemaRegistry/Rest/IRestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ namespace Confluent.SchemaRegistry
internal interface IRestService : IDisposable
{
Task<Compatibility> GetCompatibilityAsync(string subject);
Task<Compatibility> GetGlobalCompatibilityAsync();
Task<RegisteredSchema> GetLatestSchemaAsync(string subject);
Task<Schema> GetSchemaAsync(int id, string format = null);
Task<RegisteredSchema> GetSchemaAsync(string subject, int version);
Task<List<string>> GetSubjectsAsync();
Task<List<int>> GetSubjectVersionsAsync(string subject);
Task<int> RegisterSchemaAsync(string subject, Schema schema, bool normalize);
Task<Config> SetCompatibilityAsync(string subject, Compatibility compatibility);
Task<Config> SetGlobalCompatibilityAsync(Compatibility compatibility);
Task<Compatibility> UpdateCompatibilityAsync(string subject, Compatibility compatibility);
Task<bool> TestCompatibilityAsync(string subject, int versionId, Schema schema);
Task<bool> TestLatestCompatibilityAsync(string subject, Schema schema);
Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema schema, bool ignoreDeletedSchemas, bool normalize);
Expand Down
26 changes: 11 additions & 15 deletions src/Confluent.SchemaRegistry/Rest/RestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,25 +355,21 @@ public async Task<bool> TestLatestCompatibilityAsync(string subject, Schema sche
: (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/latest", HttpMethod.Post, schema)
.ConfigureAwait(continueOnCapturedContext: false)).IsCompatible;

#endregion Compatibility
#endregion Compatibility

#region Config

public async Task<Compatibility> GetGlobalCompatibilityAsync()
=> (await RequestAsync<Config>("config", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel;

public async Task<Compatibility> GetCompatibilityAsync(string subject)
=> (await RequestAsync<Config>($"config/{WebUtility.UrlEncode(subject)}", HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel;

public async Task<Config> SetGlobalCompatibilityAsync(Compatibility compatibility)
=> await RequestAsync<Config>("config", HttpMethod.Put, new Config(compatibility))
.ConfigureAwait(continueOnCapturedContext: false);

public async Task<Config> SetCompatibilityAsync(string subject, Compatibility compatibility)
=> await RequestAsync<Config>($"config/{WebUtility.UrlEncode(subject)}", HttpMethod.Put, new Config(compatibility))
.ConfigureAwait(continueOnCapturedContext: false);
=> (await RequestAsync<Config>(
string.IsNullOrEmpty(subject) ? "config" : $"config/{WebUtility.UrlEncode(subject)}",
HttpMethod.Get)
.ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel;

public async Task<Compatibility> UpdateCompatibilityAsync(string subject, Compatibility compatibility)
=> (await RequestAsync<Config>(
string.IsNullOrEmpty(subject) ? "config" : $"config/{WebUtility.UrlEncode(subject)}",
HttpMethod.Put, new Config(compatibility))
.ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel;

#endregion Config

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Threading.Tasks;
using Xunit;

namespace Confluent.SchemaRegistry.IntegrationTests;

public static partial class Tests
{
[Theory, MemberData(nameof(SchemaRegistryParameters))]
public static async Task UpdateCompatibility(Config config)
{
var sr = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = config.Server });

// Case 1: Subject is not specified

var globalCompatibility = await sr.UpdateCompatibilityAsync(Compatibility.BackwardTransitive);
Assert.Equal(Compatibility.BackwardTransitive, globalCompatibility);

Assert.Equal(Compatibility.BackwardTransitive, await sr.GetCompatibilityAsync());

// Case 2: Subject is specified

var testSchema =
"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"Confluent.Kafka.Examples.AvroSpecific" +
"\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"i" +
"nt\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}";


var topicName = Guid.NewGuid().ToString();
var subject =
SubjectNameStrategy.Topic.ConstructKeySubjectName(topicName, "Confluent.Kafka.Examples.AvroSpecific.User");

await sr.RegisterSchemaAsync(subject, testSchema);

var compatibility = await sr.UpdateCompatibilityAsync(Compatibility.FullTransitive, subject);
Assert.Equal(Compatibility.FullTransitive, compatibility);

Assert.Equal(Compatibility.FullTransitive, await sr.GetCompatibilityAsync(subject));
Assert.Equal(Compatibility.BackwardTransitive, await sr.GetCompatibilityAsync());
}
}

0 comments on commit 6536219

Please sign in to comment.