From cd4b5800dd1bd45563953c4c6fbf89664a259d35 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Mon, 13 Nov 2023 11:34:49 +0100 Subject: [PATCH] Backport fixes (#3966) * Add null checks on role mapping (#3800) * Add null checks on role mapping * Add check for Role and Unit Test * Fixed a regression in the Settings page search box (#3798) * modified state change checks in ArtifactStateExt and AbstractSqlRegistryStorage (#3637) * Incorrect default value avro serialization (#3633) * Fix default value converter * Add test for default byte type value in converter test * Correct comment * added unit tests (#3593) * Bugfix: prase decimal's default --------- Co-authored-by: Amoncy <138134862+Amoncy@users.noreply.github.com> Co-authored-by: Eric Wittmann Co-authored-by: Jean Xiao <118199750+renjingxiao@users.noreply.github.com> Co-authored-by: Anshu Anna Moncy Co-authored-by: j2gg0s --- .../registry/rest/v2/AdminResourceImpl.java | 15 + .../registry/rest/v2/GroupsResourceImpl.java | 5 + .../registry/storage/ArtifactStateExt.java | 14 +- .../impl/sql/AbstractSqlRegistryStorage.java | 45 +- .../noprofile/rest/v2/GroupsResourceTest.java | 125 ++ .../registry/rbac/AdminResourceTest.java | 51 + .../tests/converters/RegistryConverterIT.java | 139 ++- ui/src/app/pages/settings/settings.tsx | 4 +- .../registry/utils/converter/ConnectEnum.java | 136 +++ .../utils/converter/ConnectUnion.java | 131 ++ .../utils/converter/avro/AvroData.java | 1055 ++++++++++++----- .../utils/converter/avro/AvroDataTest.java | 115 +- 12 files changed, 1433 insertions(+), 402 deletions(-) create mode 100644 utils/converter/src/main/java/io/apicurio/registry/utils/converter/ConnectEnum.java create mode 100644 utils/converter/src/main/java/io/apicurio/registry/utils/converter/ConnectUnion.java diff --git a/app/src/main/java/io/apicurio/registry/rest/v2/AdminResourceImpl.java b/app/src/main/java/io/apicurio/registry/rest/v2/AdminResourceImpl.java index 132fcab5d3..e817bc88a0 100644 --- a/app/src/main/java/io/apicurio/registry/rest/v2/AdminResourceImpl.java +++ b/app/src/main/java/io/apicurio/registry/rest/v2/AdminResourceImpl.java @@ -135,6 +135,12 @@ public class AdminResourceImpl implements AdminResource { @Info(category = "download", description = "Download link expiry", availableSince = "2.1.2.Final") Supplier downloadHrefTtl; + private static void requireParameter(String parameterName, Object parameterValue) { + if (parameterValue == null) { + throw new MissingRequiredParameterException(parameterName); + } + } + /** * @see io.apicurio.registry.rest.v2.AdminResource#listArtifactTypes() */ @@ -173,6 +179,13 @@ public List listGlobalRules() { @Audited(extractParameters = {"0", KEY_RULE}) @Authorized(style=AuthorizedStyle.None, level=AuthorizedLevel.Admin) public void createGlobalRule(Rule data) { + RuleType type = data.getType(); + requireParameter("type", type); + + if (data.getConfig() == null || data.getConfig().isEmpty()) { + throw new MissingRequiredParameterException("Config"); + } + RuleConfigurationDto configDto = new RuleConfigurationDto(); configDto.setConfiguration(data.getConfig()); storage.createGlobalRule(data.getType(), configDto); @@ -391,6 +404,8 @@ public RoleMapping getRoleMapping(String principalId) { @Authorized(style=AuthorizedStyle.None, level=AuthorizedLevel.Admin) @RoleBasedAccessApiOperation public void updateRoleMapping(String principalId, UpdateRole data) { + requireParameter("principalId", principalId); + requireParameter("role", data.getRole()); storage.updateRoleMapping(principalId, data.getRole().name()); } diff --git a/app/src/main/java/io/apicurio/registry/rest/v2/GroupsResourceImpl.java b/app/src/main/java/io/apicurio/registry/rest/v2/GroupsResourceImpl.java index 2241cee51f..e67f394471 100644 --- a/app/src/main/java/io/apicurio/registry/rest/v2/GroupsResourceImpl.java +++ b/app/src/main/java/io/apicurio/registry/rest/v2/GroupsResourceImpl.java @@ -318,6 +318,11 @@ public ArtifactOwner getArtifactOwner(String groupId, String artifactId) { public void updateArtifactOwner(String groupId, String artifactId, ArtifactOwner data) { requireParameter("groupId", groupId); requireParameter("artifactId", artifactId); + requireParameter("data", data); + + if (data.getOwner().isEmpty()) { + throw new MissingRequiredParameterException("Missing required owner"); + } ArtifactOwnerDto dto = new ArtifactOwnerDto(data.getOwner()); storage.updateArtifactOwner(defaultGroupIdToNull(groupId), artifactId, dto); diff --git a/app/src/main/java/io/apicurio/registry/storage/ArtifactStateExt.java b/app/src/main/java/io/apicurio/registry/storage/ArtifactStateExt.java index 7c08f28b05..253c6e4194 100644 --- a/app/src/main/java/io/apicurio/registry/storage/ArtifactStateExt.java +++ b/app/src/main/java/io/apicurio/registry/storage/ArtifactStateExt.java @@ -67,14 +67,16 @@ public void logIfDeprecated(String groupId, Object artifactId, Object version, A } public void applyState(Consumer consumer, ArtifactState previousState, ArtifactState newState) { - if (previousState != null) { - if (canTransition(previousState, newState)) { - consumer.accept(newState); + if ( previousState != newState) { + if (previousState != null) { + if (canTransition(previousState, newState)) { + consumer.accept(newState); + } else { + throw new InvalidArtifactStateException(previousState, newState); + } } else { - throw new InvalidArtifactStateException(previousState, newState); + consumer.accept(newState); } - } else { - consumer.accept(newState); } } } diff --git a/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java b/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java index b521179f3e..31f30d3aa6 100644 --- a/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java +++ b/app/src/main/java/io/apicurio/registry/storage/impl/sql/AbstractSqlRegistryStorage.java @@ -532,24 +532,23 @@ public void updateArtifactState(String groupId, String artifactId, ArtifactState public void updateArtifactState(String groupId, String artifactId, String version, ArtifactState state) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { log.debug("Updating the state of artifact {} {}, version {} to {}", groupId, artifactId, version, state.name()); - ArtifactVersionMetaDataDto dto = this.getArtifactVersionMetaData(groupId, artifactId, version); + var metadata = getArtifactVersionMetaData(groupId, artifactId, version); + updateArtifactVersionStateRaw(metadata.getGlobalId(), metadata.getState(), state); + } + + + /** + * IMPORTANT: Private methods can't be @Transactional. Callers MUST have started a transaction. + */ + private void updateArtifactVersionStateRaw(long globalId, ArtifactState oldState, ArtifactState newState) { handles.withHandleNoException(handle -> { - long globalId = dto.getGlobalId(); - ArtifactState oldState = dto.getState(); - ArtifactState newState = state; - if (oldState != newState) { - artifactStateEx.applyState(s -> { - String sql = sqlStatements.updateArtifactVersionState(); - int rowCount = handle.createUpdate(sql) - .bind(0, s.name()) - .bind(1, tenantContext.tenantId()) - .bind(2, globalId) - .execute(); - if (rowCount == 0) { - throw new VersionNotFoundException(groupId, artifactId, dto.getVersion()); - } - }, oldState, newState); - } + artifactStateEx.applyState(s -> { + handle.createUpdate(sqlStatements.updateArtifactVersionState()) + .bind(0, s.name()) + .bind(1, tenantContext.tenantId()) + .bind(2, globalId) + .execute(); + }, oldState, newState); return null; }); } @@ -2262,7 +2261,7 @@ protected CommentDto createArtifactVersionComment(String groupId, String artifac ArtifactVersionMetaDataDto avmdd = res.orElseThrow(() -> new VersionNotFoundException(groupId, artifactId, version)); String cid = String.valueOf(commentId.generate(handle)); - + sql = sqlStatements.insertComment(); handle.createUpdate(sql) .bind(0, tenantContext.tenantId()) @@ -2292,7 +2291,7 @@ protected CommentDto createArtifactVersionComment(String groupId, String artifac throw new RegistryStorageException(e); } } - + /** * @see io.apicurio.registry.storage.RegistryStorage#getArtifactVersionComments(java.lang.String, java.lang.String, java.lang.String) */ @@ -2319,7 +2318,7 @@ public List getArtifactVersionComments(String groupId, String artifa throw new RegistryStorageException(e); } } - + /** * @see io.apicurio.registry.storage.RegistryStorage#deleteArtifactVersionComment(java.lang.String, java.lang.String, java.lang.String, java.lang.String) */ @@ -2360,7 +2359,7 @@ public void deleteArtifactVersionComment(String groupId, String artifactId, Stri throw new RegistryStorageException(e); } } - + /** * @see io.apicurio.registry.storage.RegistryStorage#updateArtifactVersionComment(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String) */ @@ -3468,7 +3467,7 @@ public List getInboundArtifactReferences(String groupId, S .list(); }); } - + /** * @see RegistryStorage#isArtifactExists(String, String) */ @@ -3854,7 +3853,7 @@ protected void importGroup(Handle handle, GroupEntity entity) { log.warn("Failed to import group entity (likely it already exists).", e); } } - + protected void importComment(Handle handle, CommentEntity entity) { try { String sql = sqlStatements.insertComment(); diff --git a/app/src/test/java/io/apicurio/registry/noprofile/rest/v2/GroupsResourceTest.java b/app/src/test/java/io/apicurio/registry/noprofile/rest/v2/GroupsResourceTest.java index 97ea9705b1..ccaf65cf90 100644 --- a/app/src/test/java/io/apicurio/registry/noprofile/rest/v2/GroupsResourceTest.java +++ b/app/src/test/java/io/apicurio/registry/noprofile/rest/v2/GroupsResourceTest.java @@ -59,6 +59,8 @@ import io.apicurio.registry.AbstractResourceTestBase; import io.apicurio.registry.rest.client.exception.RuleViolationException; +import io.apicurio.registry.rest.v2.beans.ArtifactOwner; +import io.apicurio.registry.types.ArtifactState; import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; import io.apicurio.registry.rest.v2.beans.ArtifactReference; import io.apicurio.registry.rest.v2.beans.Comment; @@ -66,6 +68,7 @@ import io.apicurio.registry.rest.v2.beans.IfExists; import io.apicurio.registry.rest.v2.beans.NewComment; import io.apicurio.registry.rest.v2.beans.Rule; +import io.apicurio.registry.rest.v2.beans.UpdateState; import io.apicurio.registry.rest.v2.beans.VersionMetaData; import io.apicurio.registry.rules.compatibility.jsonschema.diff.DiffType; import io.apicurio.registry.storage.impl.sql.SqlUtil; @@ -178,6 +181,41 @@ public void testDefaultGroup() throws Exception { .body("info.title", equalTo("Empty API")); } + @Test + public void testUpdateArtifactOwner() throws Exception { + String oaiArtifactContent = resourceToString("openapi-empty.json"); + createArtifact("testUpdateArtifactOwner", "testUpdateArtifactOwner/EmptyAPI/1",ArtifactType.OPENAPI, oaiArtifactContent); + + ArtifactOwner artifactOwner = new ArtifactOwner("newOwner"); + + given() + .when() + .contentType(CT_JSON) + .pathParam("groupId", "testUpdateArtifactOwner") + .pathParam("artifactId", "testUpdateArtifactOwner/EmptyAPI/1") + .body(artifactOwner) + .put("/registry/v2/groups/{groupId}/artifacts/{artifactId}/owner") + .then() + .statusCode(204); + } + + @Test + public void testUpdateEmptyArtifactOwner() throws Exception { + String oaiArtifactContent = resourceToString("openapi-empty.json"); + createArtifact("testUpdateEmptyArtifactOwner", "testUpdateEmptyArtifactOwner/EmptyAPI/1",ArtifactType.OPENAPI, oaiArtifactContent); + + ArtifactOwner artifactOwner = new ArtifactOwner(""); + + given() + .when() + .contentType(CT_JSON) + .pathParam("groupId", "testUpdateEmptyArtifactOwner") + .pathParam("artifactId", "testUpdateEmptyArtifactOwner/EmptyAPI/1") + .body(artifactOwner) + .put("/registry/v2/groups/{groupId}/artifacts/{artifactId}/owner") + .then() + .statusCode(400); + } @Test public void testMultipleGroups() throws Exception { @@ -601,6 +639,93 @@ public void testUpdateArtifact() throws Exception { } + @Test + public void testUpdateArtifactState() throws Exception { + String oaiArtifactContent = resourceToString("openapi-empty.json"); + createArtifact("testUpdateArtifactState", "testUpdateArtifactState/EmptyAPI/1",ArtifactType.OPENAPI, oaiArtifactContent); + + UpdateState updateState = new UpdateState(); + updateState.setState(ArtifactState.DEPRECATED); + + // Update the artifact state to DEPRECATED. + given() + .when() + .contentType(CT_JSON) + .pathParam("groupId", "testUpdateArtifactState") + .pathParam("artifactId", "testUpdateArtifactState/EmptyAPI/1") + .body(updateState) + .put("/registry/v2/groups/{groupId}/artifacts/{artifactId}/state") + .then() + .statusCode(204); + + // Update the artifact state to DEPRECATED again. + given() + .when() + .contentType(CT_JSON) + .pathParam("groupId", "testUpdateArtifactState") + .pathParam("artifactId", "testUpdateArtifactState/EmptyAPI/1") + .body(updateState) + .put("/registry/v2/groups/{groupId}/artifacts/{artifactId}/state") + .then() + .statusCode(204); + + // Send a GET request to check if the artifact state is DEPRECATED. + given() + .when() + .contentType(CT_JSON) + .pathParam("groupId", "testUpdateArtifactState") + .pathParam("artifactId", "testUpdateArtifactState/EmptyAPI/1") + .get("/registry/v2/groups/{groupId}/artifacts/{artifactId}") + .then() + .statusCode(200) + .header("X-Registry-Deprecated", "true"); + } + + @Test + public void testUpdateArtifactVersionState() throws Exception { + String oaiArtifactContent = resourceToString("openapi-empty.json"); + createArtifact("testUpdateArtifactVersionState", "testUpdateArtifactVersionState/EmptyAPI",ArtifactType.OPENAPI, oaiArtifactContent); + + UpdateState updateState = new UpdateState(); + updateState.setState(ArtifactState.DEPRECATED); + + // Update the artifact state to DEPRECATED. + given() + .when() + .contentType(CT_JSON) + .pathParam("groupId", "testUpdateArtifactVersionState") + .pathParam("artifactId", "testUpdateArtifactVersionState/EmptyAPI") + .pathParam("versionId", "1") + .body(updateState) + .put("/registry/v2/groups/{groupId}/artifacts/{artifactId}/versions/{versionId}/state") + .then() + .statusCode(204); + + // Update the artifact state to DEPRECATED again. + given() + .when() + .contentType(CT_JSON) + .pathParam("groupId", "testUpdateArtifactVersionState") + .pathParam("artifactId", "testUpdateArtifactVersionState/EmptyAPI") + .pathParam("versionId", "1") + .body(updateState) + .put("/registry/v2/groups/{groupId}/artifacts/{artifactId}/versions/{versionId}/state") + .then() + .statusCode(204); + + // Send a GET request to check if the artifact state is DEPRECATED. + given() + .when() + .contentType(CT_JSON) + .pathParam("groupId", "testUpdateArtifactVersionState") + .pathParam("artifactId", "testUpdateArtifactVersionState/EmptyAPI") + .pathParam("versionId", "1") + .get("/registry/v2/groups/{groupId}/artifacts/{artifactId}/versions/{versionId}") + .then() + .statusCode(200) + .header("X-Registry-Deprecated", "true"); + } + @Test @DisabledIfEnvironmentVariable(named = CURRENT_ENV, matches = CURRENT_ENV_MAS_REGEX) @DisabledOnOs(OS.WINDOWS) diff --git a/app/src/test/java/io/apicurio/registry/rbac/AdminResourceTest.java b/app/src/test/java/io/apicurio/registry/rbac/AdminResourceTest.java index 4d2ef67531..68c1223282 100644 --- a/app/src/test/java/io/apicurio/registry/rbac/AdminResourceTest.java +++ b/app/src/test/java/io/apicurio/registry/rbac/AdminResourceTest.java @@ -101,6 +101,47 @@ public void testGlobalRulesEndpoint() { .statusCode(200) .body(anything()); } + + @Test + public void testCreateGlobalRule() throws Exception + { + //Test Rule type null + Rule nullType = new Rule(); + nullType.setType(null); + nullType.setConfig("TestConfig"); + given() + .when() + .contentType(CT_JSON) + .body(nullType) + .post("/registry/v2/admin/rules") + .then() + .statusCode(400); + + //Test Rule config null + Rule nullConfig = new Rule(); + nullConfig.setType(RuleType.VALIDITY); + nullConfig.setConfig(null); + given() + .when() + .contentType(CT_JSON) + .body(nullConfig) + .post("/registry/v2/admin/rules") + .then() + .statusCode(400); + + //Test Rule config empty + Rule emptyConfig = new Rule(); + emptyConfig.setType(RuleType.VALIDITY); + emptyConfig.setConfig(""); + given() + .when() + .contentType(CT_JSON) + .body(emptyConfig) + .post("/registry/v2/admin/rules") + .then() + .statusCode(400); + + } @Test public void testGlobalRules() throws Exception { @@ -888,6 +929,16 @@ public void testRoleMappings() throws Exception { .body("error_code", equalTo(404)) .body("message", equalTo("Role mapping not found for principal.")); + //Update a mapping with null RoleType + update.setRole(null); + given() + .when() + .contentType(CT_JSON) + .body(update) + .put("/registry/v2/admin/roleMappings/TestUser") + .then() + .statusCode(400); + // Delete a role mapping given() .when() diff --git a/integration-tests/src/test/java/io/apicurio/tests/converters/RegistryConverterIT.java b/integration-tests/src/test/java/io/apicurio/tests/converters/RegistryConverterIT.java index 5f4070d0a7..2911de719c 100644 --- a/integration-tests/src/test/java/io/apicurio/tests/converters/RegistryConverterIT.java +++ b/integration-tests/src/test/java/io/apicurio/tests/converters/RegistryConverterIT.java @@ -18,9 +18,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import io.apicurio.tests.ApicurioRegistryBaseIT; -import io.apicurio.tests.utils.AvroGenericRecordSchemaFactory; -import io.apicurio.tests.utils.Constants; import io.apicurio.registry.rest.client.RegistryClient; import io.apicurio.registry.serde.AbstractKafkaSerDe; import io.apicurio.registry.serde.SerdeConfig; @@ -33,10 +30,14 @@ import io.apicurio.registry.utils.converter.AvroConverter; import io.apicurio.registry.utils.converter.ExtJsonConverter; import io.apicurio.registry.utils.converter.SerdeBasedConverter; +import io.apicurio.registry.utils.converter.avro.AvroData; import io.apicurio.registry.utils.converter.json.CompactFormatStrategy; import io.apicurio.registry.utils.converter.json.FormatStrategy; import io.apicurio.registry.utils.converter.json.PrettyFormatStrategy; import io.apicurio.registry.utils.tests.TestUtils; +import io.apicurio.tests.ApicurioRegistryBaseIT; +import io.apicurio.tests.utils.AvroGenericRecordSchemaFactory; +import io.apicurio.tests.utils.Constants; import io.quarkus.test.junit.QuarkusIntegrationTest; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; @@ -102,6 +103,98 @@ record = (Record) converter.toConnectData(topic, bytes).value(); } + @Test + public void testAvroIntDefaultValue() throws Exception { + String expectedSchema = "{\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"ConnectDefault\",\n" + + " \"namespace\" : \"io.confluent.connect.avro\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"int16Test\",\n" + + " \"type\" : [ {\n" + + " \"type\" : \"int\",\n" + + " \"connect.doc\" : \"int16test field\",\n" + + " \"connect.default\" : 2,\n" + + " \"connect.type\" : \"int16\"\n" + + " }, \"null\" ],\n" + + " \"default\" : 2\n" + + " } ]\n" + + "}"; + + try (AvroConverter converter = new AvroConverter<>()) { + + Map config = new HashMap<>(); + config.put(SerdeConfig.REGISTRY_URL, getRegistryV2ApiUrl()); + config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true"); + converter.configure(config, false); + + org.apache.kafka.connect.data.Schema sc = SchemaBuilder.struct() + .field("int16Test", SchemaBuilder.int16().optional().defaultValue((short) 2).doc("int16test field") + .build()); + Struct struct = new Struct(sc); + struct.put("int16Test", (short) 3); + + String subject = TestUtils.generateArtifactId(); + + byte[] bytes = converter.fromConnectData(subject, sc, struct); + + // some impl details ... + TestUtils.waitForSchema(globalId -> registryClient.getContentByGlobalId(globalId) != null, bytes); + + Struct ir = (Struct) converter.toConnectData(subject, bytes).value(); + Assertions.assertEquals((short) 3, ir.get("int16Test")); + + AvroData avroData = new AvroData(10); + Assertions.assertEquals(expectedSchema, avroData.fromConnectSchema(ir.schema()).toString(true)); + } + } + + @Test + public void testAvroBytesDefaultValue() throws Exception { + String expectedSchema = "{\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"ConnectDefault\",\n" + + " \"namespace\" : \"io.confluent.connect.avro\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"bytesTest\",\n" + + " \"type\" : [ {\n" + + " \"type\" : \"bytes\",\n" + + " \"connect.parameters\" : {\n" + + " \"lenght\" : \"10\"\n" + + " },\n" + + " \"connect.default\" : \"test\"\n" + + " }, \"null\" ],\n" + + " \"default\" : \"test\"\n" + + " } ]\n" + + "}"; + + try (AvroConverter converter = new AvroConverter<>()) { + + Map config = new HashMap<>(); + config.put(SerdeConfig.REGISTRY_URL, getRegistryV2ApiUrl()); + config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true"); + converter.configure(config, false); + + org.apache.kafka.connect.data.Schema sc = SchemaBuilder.struct() + .field("bytesTest", SchemaBuilder.bytes().optional().parameters(Map.of("lenght", "10")).defaultValue("test".getBytes()) + .build()); + Struct struct = new Struct(sc); + + struct.put("bytesTest", "testingBytes".getBytes()); + + String subject = TestUtils.generateArtifactId(); + + byte[] bytes = converter.fromConnectData(subject, sc, struct); + + // some impl details ... + TestUtils.waitForSchema(globalId -> registryClient.getContentByGlobalId(globalId) != null, bytes); + Struct ir = (Struct) converter.toConnectData(subject, bytes).value(); + AvroData avroData = new AvroData(10); + Assertions.assertEquals(expectedSchema, avroData.fromConnectSchema(ir.schema()).toString(true)); + } + + } + @Test public void testAvro() throws Exception { try (AvroConverter converter = new AvroConverter<>()) { @@ -112,8 +205,8 @@ public void testAvro() throws Exception { converter.configure(config, false); org.apache.kafka.connect.data.Schema sc = SchemaBuilder.struct() - .field("bar", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) - .build(); + .field("bar", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .build(); Struct struct = new Struct(sc); struct.put("bar", "somebar"); @@ -132,29 +225,29 @@ public void testAvro() throws Exception { @Test public void testPrettyJson() throws Exception { testJson( - createRegistryClient(), - new PrettyFormatStrategy(), - input -> { - try { - ObjectMapper mapper = new ObjectMapper(); - JsonNode root = mapper.readTree(input); - return root.get("schemaId").asLong(); - } catch (IOException e) { - throw new UncheckedIOException(e); + createRegistryClient(), + new PrettyFormatStrategy(), + input -> { + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode root = mapper.readTree(input); + return root.get("schemaId").asLong(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - } ); } @Test public void testCompactJson() throws Exception { testJson( - createRegistryClient(), - new CompactFormatStrategy(), - input -> { - ByteBuffer buffer = AbstractKafkaSerDe.getByteBuffer(input); - return buffer.getLong(); - } + createRegistryClient(), + new CompactFormatStrategy(), + input -> { + ByteBuffer buffer = AbstractKafkaSerDe.getByteBuffer(input); + return buffer.getLong(); + } ); } @@ -166,8 +259,8 @@ private void testJson(RegistryClient restClient, FormatStrategy formatStrategy, converter.configure(config, false); org.apache.kafka.connect.data.Schema sc = SchemaBuilder.struct() - .field("bar", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) - .build(); + .field("bar", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .build(); Struct struct = new Struct(sc); struct.put("bar", "somebar"); diff --git a/ui/src/app/pages/settings/settings.tsx b/ui/src/app/pages/settings/settings.tsx index e4a98d4752..86246ec4e9 100644 --- a/ui/src/app/pages/settings/settings.tsx +++ b/ui/src/app/pages/settings/settings.tsx @@ -213,7 +213,7 @@ export class SettingsPage extends PageComponent { + private onSearchCriteria = (_evt: any, criteria: string): void => { this.setSingleState("searchCriteria", criteria); }; @@ -225,7 +225,7 @@ export class SettingsPage extends PageComponent { property.value = newValue; diff --git a/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ConnectEnum.java b/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ConnectEnum.java new file mode 100644 index 0000000000..a4408ac1b5 --- /dev/null +++ b/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ConnectEnum.java @@ -0,0 +1,136 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.apicurio.registry.utils.converter; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.DataException; + +import java.util.List; +import java.util.Map; + +public class ConnectEnum { + + public static final String LOGICAL_PARAMETER = "org.apache.kafka.connect.data.Enum"; + + /** + * Returns a SchemaBuilder for an Enum. + * + * @param annotation an arbitrary annotation to be associated with the enum + * @param symbols the enum symbols + * @return a SchemaBuilder + */ + public static SchemaBuilder builder(String annotation, List symbols) { + SchemaBuilder builder = SchemaBuilder.string().parameter(LOGICAL_PARAMETER, annotation); + for (int i = 0; i < symbols.size(); i++) { + builder.parameter(LOGICAL_PARAMETER + "." + symbols.get(i), String.valueOf(i)); + } + return builder; + } + + /** + * Returns a SchemaBuilder for an Enum. + * + * @param annotation an arbitrary annotation to be associated with the enum + * @param symbols a map of enum symbol to its ordinal + * @return a SchemaBuilder + */ + public static SchemaBuilder builder(String annotation, Map symbols) { + SchemaBuilder builder = SchemaBuilder.string().parameter(LOGICAL_PARAMETER, annotation); + for (Map.Entry symbol : symbols.entrySet()) { + builder.parameter(LOGICAL_PARAMETER + "." + symbol.getKey(), + String.valueOf(symbol.getValue())); + } + return builder; + } + + /** + * Returns whether a schema represents an Enum. + * + * @param schema the schema + * @return whether the schema represents an Enum + */ + public static boolean isEnum(Schema schema) { + return schema != null + && schema.parameters() != null + && schema.parameters().containsKey(LOGICAL_PARAMETER); + } + + /** + * Returns whether a schema has an Enum symbol. + * + * @param schema the schema + * @param symbol the enum symbol + * @return whether the schema represents an Enum + */ + public static boolean hasEnumSymbol(Schema schema, String symbol) { + return schema != null + && schema.parameters() != null + && schema.parameters().containsKey(LOGICAL_PARAMETER) + && schema.parameters().containsKey(LOGICAL_PARAMETER + "." + symbol); + } + + /** + * Convert a value from its logical format (Enum) to its encoded format. + * + * @param schema the schema + * @param value the logical value + * @return the encoded value + */ + public static > String fromLogical(Schema schema, T value) { + if (!hasEnumSymbol(schema, value.name())) { + throw new DataException( + "Requested conversion of Enum object but the schema does not match."); + } + return value.name(); + } + + /** + * Convert a value from its encoded format to its logical format (Enum). + * + * @param schema the schema + * @param cls the class of the logical value + * @param symbol the enum symbol + * @return the logical value + */ + public static > T toLogical(Schema schema, Class cls, + String symbol) { + if (!hasEnumSymbol(schema, symbol)) { + throw new DataException( + "Requested conversion of Enum object but the schema does not match."); + } + return Enum.valueOf(cls, symbol); + } + + /** + * Convert a value from its encoded format to its ordinal. + * + * @param schema the schema + * @param symbol the enum symbol + * @return the ordinal + */ + public static int toOrdinal(Schema schema, String symbol) { + if (!hasEnumSymbol(schema, symbol)) { + throw new DataException( + "Requested conversion of Enum object but the schema does not match."); + } + return Integer.parseInt(schema.parameters().get(LOGICAL_PARAMETER + "." + symbol)); + } +} diff --git a/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ConnectUnion.java b/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ConnectUnion.java new file mode 100644 index 0000000000..f57a9e8581 --- /dev/null +++ b/utils/converter/src/main/java/io/apicurio/registry/utils/converter/ConnectUnion.java @@ -0,0 +1,131 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.apicurio.registry.utils.converter; + +import org.apache.kafka.connect.data.ConnectSchema; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; + +public class ConnectUnion { + + public static final String LOGICAL_PARAMETER = "org.apache.kafka.connect.data.Union"; + + /** + * Returns a SchemaBuilder for a Union. + * + * @param annotation an arbitrary annotation to be associated with the union + * @return a SchemaBuilder + */ + public static SchemaBuilder builder(String annotation) { + return SchemaBuilder.struct().parameter(LOGICAL_PARAMETER, annotation); + } + + /** + * Returns whether a schema represents a Union. + * + * @param schema the schema + * @return whether the schema represents a Union + */ + public static boolean isUnion(Schema schema) { + return schema != null + && schema.parameters() != null + && schema.parameters().containsKey(LOGICAL_PARAMETER); + } + + /** + * Convert a value from its logical format (Union) to it's encoded format. + * + * @param schema the schema + * @param value the logical value + * @return the encoded value + */ + public static Object fromLogical(Schema schema, Struct value) { + if (!isUnion(schema)) { + throw new DataException( + "Requested conversion of Union object but the schema does not match."); + } + for (Field field : schema.fields()) { + Object object = value.get(field); + if (object != null) { + return object; + } + } + return null; + } + + /** + * Convert a value from its encoded format to its logical format (Union). + * The value is associated with the field whose schema matches the given value. + * + * @param schema the schema + * @param value the encoded value + * @return the logical value + */ + public static Struct toLogical(Schema schema, Object value) { + if (!isUnion(schema)) { + throw new DataException( + "Requested conversion of Union object but the schema does not match."); + } + Struct struct = new Struct(schema); + for (Field field : schema.fields()) { + if (validate(field.schema(), value)) { + struct.put(field, value); + break; + } + } + return struct; + } + + private static boolean validate(Schema schema, Object value) { + try { + ConnectSchema.validateValue(schema, value); + } catch (DataException e) { + return false; + } + return true; + } + + /** + * Convert a value from its encoded format to its logical format (Union). + * The value is associated with the field with the given field name. + * + * @param schema the schema + * @param fieldName the field name + * @param value the encoded value + * @return the logical value + */ + public static Struct toLogicalUsingName(Schema schema, String fieldName, Object value) { + if (!isUnion(schema)) { + throw new DataException( + "Requested conversion of Union object but the schema does not match."); + } + Struct struct = new Struct(schema); + for (Field field : schema.fields()) { + if (field.name().equals(fieldName)) { + struct.put(field, value); + break; + } + } + return struct; + } +} diff --git a/utils/converter/src/main/java/io/apicurio/registry/utils/converter/avro/AvroData.java b/utils/converter/src/main/java/io/apicurio/registry/utils/converter/avro/AvroData.java index ea4497b7c5..337fe147b1 100644 --- a/utils/converter/src/main/java/io/apicurio/registry/utils/converter/avro/AvroData.java +++ b/utils/converter/src/main/java/io/apicurio/registry/utils/converter/avro/AvroData.java @@ -21,10 +21,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.IntNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; - import io.apicurio.registry.serde.avro.NonRecordContainer; - +import io.apicurio.registry.utils.converter.ConnectEnum; +import io.apicurio.registry.utils.converter.ConnectUnion; +import org.apache.avro.AvroTypeException; import org.apache.avro.JsonProperties; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericEnumSymbol; @@ -36,33 +38,20 @@ import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; -import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Date; -import org.apache.kafka.connect.data.Decimal; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.data.Time; -import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.DataException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.math.BigDecimal; +import java.net.URLEncoder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; +import java.util.regex.Pattern; /** @@ -71,6 +60,8 @@ @SuppressWarnings({"unused", "rawtypes", "unchecked"}) public class AvroData { + private static final Logger log = LoggerFactory.getLogger(AvroData.class); + public static final String NAMESPACE = "io.confluent.connect.avro"; // Avro does not permit empty schema names, which might be the ideal default since we also are // not permitted to simply omit the name. Instead, make it very clear where the default is @@ -89,6 +80,13 @@ public class AvroData { public static final String CONNECT_DEFAULT_VALUE_PROP = "connect.default"; public static final String CONNECT_PARAMETERS_PROP = "connect.parameters"; public static final String CONNECT_INTERNAL_TYPE_NAME = "connect.internal.type"; + public static final String AVRO_RECORD_DOC_PROP = NAMESPACE + ".record.doc"; + public static final String AVRO_ENUM_DOC_PREFIX_PROP = NAMESPACE + ".enum.doc."; + public static final String AVRO_FIELD_DOC_PREFIX_PROP = NAMESPACE + ".field.doc."; + //This property is used to determine whether a default value in the Connect schema originated + //from an Avro field default + public static final String AVRO_FIELD_DEFAULT_FLAG_PROP = NAMESPACE + ".field.default"; + public static final String AVRO_ENUM_DEFAULT_PREFIX_PROP = NAMESPACE + ".enum.default."; public static final String CONNECT_TYPE_PROP = "connect.type"; @@ -100,8 +98,17 @@ public class AvroData { public static final String AVRO_TYPE_ANYTHING = NAMESPACE + ".Anything"; + public static final String GENERALIZED_TYPE_UNION = ConnectUnion.LOGICAL_PARAMETER; + public static final String GENERALIZED_TYPE_ENUM = ConnectEnum.LOGICAL_PARAMETER; + public static final String GENERALIZED_TYPE_UNION_PREFIX = "connect_union_"; + public static final String GENERALIZED_TYPE_UNION_FIELD_PREFIX = + GENERALIZED_TYPE_UNION_PREFIX + "field_"; + private static final Map NON_AVRO_TYPES_BY_TYPE_CODE = new HashMap<>(); + private static Pattern NAME_START_CHAR = Pattern.compile("^[A-Za-z_]"); + private static Pattern NAME_INVALID_CHARS = Pattern.compile("[^A-Za-z0-9_]"); + static { NON_AVRO_TYPES_BY_TYPE_CODE.put(CONNECT_TYPE_INT8, Schema.Type.INT8); NON_AVRO_TYPES_BY_TYPE_CODE.put(CONNECT_TYPE_INT16, Schema.Type.INT16); @@ -251,6 +258,7 @@ public Object convert(Schema schema, Object value) { static final String AVRO_LOGICAL_DECIMAL = "decimal"; static final String AVRO_LOGICAL_DECIMAL_SCALE_PROP = "scale"; static final String AVRO_LOGICAL_DECIMAL_PRECISION_PROP = "precision"; + static final String CONNECT_AVRO_FIXED_SIZE_PROP = "connect.fixed.size"; static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP = "connect.decimal.precision"; static final Integer CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT = 64; @@ -274,7 +282,7 @@ public Object convert(Schema schema, Object value) { public Object convert(Schema schema, Object value) { if (!(value instanceof java.util.Date)) { throw new DataException( - "Invalid type for Date, expected Date but was " + value.getClass()); + "Invalid type for Date, expected Date but was " + value.getClass()); } return Date.fromLogical(schema, (java.util.Date) value); } @@ -303,10 +311,17 @@ public Object convert(Schema schema, Object value) { }); } + private int unionIndex = 0; + private Cache fromConnectSchemaCache; private Cache toConnectSchemaCache; private boolean connectMetaData; + private boolean generalizedSumTypeSupport; + private boolean ignoreDefaultForNullables; private boolean enhancedSchemaSupport; + private boolean scrubInvalidNames; + private boolean discardTypeDocDefault; + private boolean allowOptionalMapKey; private static class AvroSchemaAndVersion { private org.apache.avro.Schema schema; @@ -358,7 +373,12 @@ public AvroData(AvroDataConfig avroDataConfig) { new SynchronizedCache<>(new LRUCache<>( avroDataConfig.getSchemasCacheSize())); this.connectMetaData = avroDataConfig.isConnectMetaData(); + //this.generalizedSumTypeSupport = avroDataConfig.isGeneralizedSumTypeSupport(); + //this.ignoreDefaultForNullables = avroDataConfig.ignoreDefaultForNullables(); this.enhancedSchemaSupport = avroDataConfig.isEnhancedAvroSchemaSupport(); + //this.scrubInvalidNames = avroDataConfig.isScrubInvalidNames(); + //this.discardTypeDocDefault = avroDataConfig.isDiscardTypeDocDefault(); + //this.allowOptionalMapKey = avroDataConfig.isAllowOptionalMapKeys(); } /** @@ -366,7 +386,11 @@ public AvroData(AvroDataConfig avroDataConfig) { */ public Object fromConnectData(Schema schema, Object value) { org.apache.avro.Schema avroSchema = fromConnectSchema(schema); - return fromConnectData(schema, avroSchema, value, true, false, enhancedSchemaSupport); + return fromConnectData(schema, avroSchema, value); + } + + protected Object fromConnectData(Schema schema, org.apache.avro.Schema avroSchema, Object value) { + return fromConnectData(schema, avroSchema, value, true, false); } /** @@ -386,15 +410,16 @@ public Object fromConnectData(Schema schema, Object value) { * null * @return the converted data */ - @SuppressWarnings("unused") - private static Object fromConnectData( - Schema schema, org.apache.avro.Schema avroSchema, - Object logicalValue, boolean requireContainer, - boolean requireSchemalessContainerNull, boolean enhancedSchemaSupport + private Object fromConnectData( + Schema schema, + org.apache.avro.Schema avroSchema, + Object logicalValue, + boolean requireContainer, + boolean requireSchemalessContainerNull ) { Schema.Type schemaType = schema != null - ? schema.type() - : schemaTypeForSchemalessJavaType(logicalValue); + ? schema.type() + : schemaTypeForSchemalessJavaType(logicalValue); if (schemaType == null) { // Schemaless null data since schema is null and we got a null schema type from the value if (requireSchemalessContainerNull) { @@ -420,7 +445,7 @@ private static Object fromConnectData( Object value = logicalValue; if (schema != null && schema.name() != null) { LogicalTypeConverter logicalConverter = TO_AVRO_LOGICAL_CONVERTERS.get(schema.name()); - if (logicalConverter != null && logicalValue != null) { + if (logicalConverter != null) { value = logicalConverter.convert(schema, logicalValue); } } @@ -475,32 +500,50 @@ private static Object fromConnectData( maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_BOOLEAN_FIELD), requireContainer); case STRING: - if (enhancedSchemaSupport && schema != null && schema.parameters() != null - && schema.parameters().containsKey(AVRO_TYPE_ENUM)) { + if (generalizedSumTypeSupport && ConnectEnum.isEnum(schema)) { + String enumSchemaName = schema.parameters().get(GENERALIZED_TYPE_ENUM); + value = enumSymbol(avroSchema, value, enumSchemaName); + } else if (enhancedSchemaSupport && schema != null && schema.parameters() != null + && schema.parameters().containsKey(AVRO_TYPE_ENUM)) { String enumSchemaName = schema.parameters().get(AVRO_TYPE_ENUM); - org.apache.avro.Schema enumSchema; - if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) { - int enumIndex = avroSchema.getIndexNamed(enumSchemaName); - enumSchema = avroSchema.getTypes().get(enumIndex); - } else { - enumSchema = avroSchema; - } - value = new GenericData.EnumSymbol(enumSchema, (String) value); + value = enumSymbol(avroSchema, value, enumSchemaName); } else { String stringValue = (String) value; // Check for correct type } return maybeAddContainer( - avroSchema, - maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_STRING_FIELD), - requireContainer); + avroSchema, + maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_STRING_FIELD), + requireContainer); case BYTES: { - ByteBuffer bytesValue = value instanceof byte[] ? ByteBuffer.wrap((byte[]) value) : - (ByteBuffer) value; + value = value instanceof byte[] ? ByteBuffer.wrap((byte[]) value) : + (ByteBuffer) value; + if (schema != null && isFixedSchema(schema)) { + int size = Integer.parseInt(schema.parameters().get(CONNECT_AVRO_FIXED_SIZE_PROP)); + org.apache.avro.Schema fixedSchema = null; + if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) { + int index = 0; + for (org.apache.avro.Schema memberSchema : avroSchema.getTypes()) { + if (memberSchema.getType() == org.apache.avro.Schema.Type.FIXED + && memberSchema.getFixedSize() == size + && unionMemberFieldName(memberSchema, index) + .equals(unionMemberFieldName(schema, index))) { + fixedSchema = memberSchema; + } + index++; + } + if (fixedSchema == null) { + throw new DataException("Fixed size " + size + " not in union " + avroSchema); + } + } else { + fixedSchema = avroSchema; + } + value = new GenericData.Fixed(fixedSchema, ((ByteBuffer)value).array()); + } return maybeAddContainer( - avroSchema, - maybeWrapSchemaless(schema, bytesValue, ANYTHING_SCHEMA_BYTES_FIELD), - requireContainer); + avroSchema, + maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_BYTES_FIELD), + requireContainer); } case ARRAY: { @@ -509,19 +552,18 @@ private static Object fromConnectData( List converted = new ArrayList<>(list.size()); Schema elementSchema = schema != null ? schema.valueSchema() : null; org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional( - schema, avroSchema); + schema, avroSchema, scrubInvalidNames); org.apache.avro.Schema elementAvroSchema = - schema != null ? underlyingAvroSchema.getElementType() : ANYTHING_SCHEMA; + schema != null ? underlyingAvroSchema.getElementType() : ANYTHING_SCHEMA; for (Object val : list) { converted.add( - fromConnectData( - elementSchema, - elementAvroSchema, - val, - false, - true, - enhancedSchemaSupport - ) + fromConnectData( + elementSchema, + elementAvroSchema, + val, + false, + true + ) ); } return maybeAddContainer( @@ -534,16 +576,17 @@ private static Object fromConnectData( Map map = (Map) value; org.apache.avro.Schema underlyingAvroSchema; if (schema != null && schema.keySchema().type() == Schema.Type.STRING - && !schema.keySchema().isOptional()) { + && (!schema.keySchema().isOptional() || allowOptionalMapKey)) { // TODO most types don't need a new converted object since types pass through - underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(schema, avroSchema); + underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional( + schema, avroSchema, scrubInvalidNames); Map converted = new HashMap<>(); for (Map.Entry entry : map.entrySet()) { // Key is a String, no conversion needed Object convertedValue = fromConnectData(schema.valueSchema(), - underlyingAvroSchema.getValueType(), - entry.getValue(), false, true, enhancedSchemaSupport + underlyingAvroSchema.getValueType(), + entry.getValue(), false, true ); converted.put((String) entry.getKey(), convertedValue); } @@ -559,21 +602,20 @@ private static Object fromConnectData( org.apache.avro.Schema avroValueSchema = elementSchema.getField(VALUE_FIELD).schema(); for (Map.Entry entry : map.entrySet()) { Object keyConverted = fromConnectData(schema != null ? schema.keySchema() : null, - avroKeySchema, entry.getKey(), false, true, - enhancedSchemaSupport); + avroKeySchema, entry.getKey(), false, true); Object valueConverted = fromConnectData(schema != null ? schema.valueSchema() : null, - avroValueSchema, entry.getValue(), false, - true, enhancedSchemaSupport); + avroValueSchema, entry.getValue(), false, + true); converted.add( - new GenericRecordBuilder(elementSchema) - .set(KEY_FIELD, keyConverted) - .set(VALUE_FIELD, valueConverted) - .build() + new GenericRecordBuilder(elementSchema) + .set(KEY_FIELD, keyConverted) + .set(VALUE_FIELD, valueConverted) + .build() ); } return maybeAddContainer( - avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD), - requireContainer); + avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD), + requireContainer); } } @@ -584,32 +626,34 @@ avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD), } //This handles the inverting of a union which is held as a struct, where each field is // one of the union types. - if (AVRO_TYPE_UNION.equals(schema.name())) { + if (isUnionSchema(schema)) { for (Field field : schema.fields()) { - Object object = struct.get(field); + Object object = ignoreDefaultForNullables + ? struct.getWithoutDefault(field.name()) : struct.get(field); if (object != null) { return fromConnectData( - field.schema(), - avroSchema, - object, - false, - true, - enhancedSchemaSupport + field.schema(), + avroSchema, + object, + false, + true ); } } - return fromConnectData(schema, avroSchema, null, false, true, enhancedSchemaSupport); + return fromConnectData(schema, avroSchema, null, false, true); } else { org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional( - schema, avroSchema); + schema, avroSchema, scrubInvalidNames); GenericRecordBuilder convertedBuilder = new GenericRecordBuilder(underlyingAvroSchema); for (Field field : schema.fields()) { - org.apache.avro.Schema.Field theField = underlyingAvroSchema.getField(field.name()); + String fieldName = scrubName(field.name(), scrubInvalidNames); + org.apache.avro.Schema.Field theField = underlyingAvroSchema.getField(fieldName); org.apache.avro.Schema fieldAvroSchema = theField.schema(); + Object fieldValue = ignoreDefaultForNullables + ? struct.getWithoutDefault(field.name()) : struct.get(field); convertedBuilder.set( - field.name(), - fromConnectData(field.schema(), fieldAvroSchema, struct.get(field), false, - true, enhancedSchemaSupport) + fieldName, + fromConnectData(field.schema(), fieldAvroSchema, fieldValue, false, true) ); } return convertedBuilder.build(); @@ -624,6 +668,18 @@ avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD), } } + private GenericData.EnumSymbol enumSymbol( + org.apache.avro.Schema avroSchema, Object value, String enumSchemaName) { + org.apache.avro.Schema enumSchema; + if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) { + int enumIndex = avroSchema.getIndexNamed(enumSchemaName); + enumSchema = avroSchema.getTypes().get(enumIndex); + } else { + enumSchema = avroSchema; + } + return new GenericData.EnumSymbol(enumSchema, (String) value); + } + /** * MapEntry types in connect Schemas are represented as Arrays of record. * Return the array type from the union instead of the union itself. @@ -650,10 +706,12 @@ private static org.apache.avro.Schema avroSchemaForUnderlyingMapEntryType( } private static boolean crossReferenceSchemaNames(final Schema schema, - final org.apache.avro.Schema avroSchema) { - return Objects.equals(avroSchema.getFullName(), schema.name()) - || Objects.equals(avroSchema.getType().getName(), schema.type().getName()) - || (schema.name() == null && avroSchema.getFullName().equals(DEFAULT_SCHEMA_FULL_NAME)); + final org.apache.avro.Schema avroSchema, + final boolean scrubInvalidNames) { + String fullName = scrubFullName(schema.name(), scrubInvalidNames); + return Objects.equals(avroSchema.getFullName(), fullName) + || Objects.equals(avroSchema.getType().getName(), schema.type().getName()) + || (schema.name() == null && avroSchema.getFullName().startsWith(DEFAULT_SCHEMA_FULL_NAME)); } /** @@ -661,14 +719,14 @@ private static boolean crossReferenceSchemaNames(final Schema schema, * Return the Avro schema of the actual type in the Union (instead of the union itself) */ private static org.apache.avro.Schema avroSchemaForUnderlyingTypeIfOptional( - Schema schema, org.apache.avro.Schema avroSchema) { + Schema schema, org.apache.avro.Schema avroSchema, boolean scrubInvalidNames) { if (schema != null && schema.isOptional()) { if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) { for (org.apache.avro.Schema typeSchema : avroSchema - .getTypes()) { + .getTypes()) { if (!typeSchema.getType().equals(org.apache.avro.Schema.Type.NULL) - && crossReferenceSchemaNames(schema, typeSchema)) { + && crossReferenceSchemaNames(schema, typeSchema, scrubInvalidNames)) { return typeSchema; } } @@ -731,8 +789,19 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema) { public org.apache.avro.Schema fromConnectSchema(Schema schema, Map schemaMap) { + if (schema == null) { + return ANYTHING_SCHEMA; + } + + org.apache.avro.Schema cached = fromConnectSchemaCache.get(schema); + if (cached != null) { + return cached; + } + FromConnectContext fromConnectContext = new FromConnectContext(schemaMap); - return fromConnectSchema(schema, fromConnectContext, false); + org.apache.avro.Schema finalSchema = fromConnectSchema(schema, fromConnectContext, false); + fromConnectSchemaCache.put(schema, finalSchema); + return finalSchema; } /** @@ -754,21 +823,11 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema, return ANYTHING_SCHEMA; } - org.apache.avro.Schema cached = fromConnectSchemaCache.get(schema); - - if (cached == null && !AVRO_TYPE_UNION.equals(schema.name()) && !schema.isOptional()) { - cached = fromConnectContext.schemaMap.get(schema); - } - if (cached != null) { - return cached; - } - - String namespace = NAMESPACE; - String name = DEFAULT_SCHEMA_NAME; - if (schema.name() != null) { - String[] split = splitName(schema.name()); - namespace = split[0]; - name = split[1]; + if (!isUnionSchema(schema) && !schema.isOptional()) { + org.apache.avro.Schema cached = fromConnectContext.schemaMap.get(schema); + if (cached != null) { + return cached; + } } // Extra type annotation information for otherwise lossy conversions @@ -800,25 +859,47 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema, baseSchema = org.apache.avro.SchemaBuilder.builder().booleanType(); break; case STRING: - if (enhancedSchemaSupport && schema.parameters() != null - && schema.parameters().containsKey(AVRO_TYPE_ENUM)) { + if ((generalizedSumTypeSupport || enhancedSchemaSupport) + && schema.parameters() != null + && (schema.parameters().containsKey(GENERALIZED_TYPE_ENUM) + || schema.parameters().containsKey(AVRO_TYPE_ENUM))) { + String paramName = generalizedSumTypeSupport ? GENERALIZED_TYPE_ENUM : AVRO_TYPE_ENUM; List symbols = new ArrayList<>(); for (Map.Entry entry : schema.parameters().entrySet()) { - if (entry.getKey().startsWith(AVRO_TYPE_ENUM + ".")) { - symbols.add(entry.getValue()); + if (entry.getKey().startsWith(paramName + ".")) { + String enumSymbol = entry.getKey().substring(paramName.length() + 1); + symbols.add(enumSymbol); } } - baseSchema = - org.apache.avro.SchemaBuilder.builder().enumeration( - schema.parameters().get(AVRO_TYPE_ENUM)) - .doc(schema.parameters().get(CONNECT_ENUM_DOC_PROP)) - .symbols(symbols.toArray(new String[symbols.size()])); + Pair names = getNameOrDefault(fromConnectContext, schema.name()); + String name = names.getValue(); + String enumName = schema.parameters().get(paramName); + String enumDoc = schema.parameters().get(AVRO_ENUM_DOC_PREFIX_PROP + name); + String enumDefault = schema.parameters().get(AVRO_ENUM_DEFAULT_PREFIX_PROP + name); + baseSchema = discardTypeDocDefault + ? org.apache.avro.SchemaBuilder.builder().enumeration(enumName) + .doc(schema.parameters().get(CONNECT_ENUM_DOC_PROP)) + .symbols(symbols.toArray(new String[symbols.size()])) + : org.apache.avro.SchemaBuilder.builder().enumeration(enumName) + .doc(enumDoc) + .defaultSymbol(enumDefault) + .symbols(symbols.toArray(new String[symbols.size()])); } else { baseSchema = org.apache.avro.SchemaBuilder.builder().stringType(); } break; case BYTES: - baseSchema = org.apache.avro.SchemaBuilder.builder().bytesType(); + if (isFixedSchema(schema)) { + Pair names = getNameOrDefault(fromConnectContext, schema.name()); + String namespace = names.getKey(); + String name = names.getValue(); + baseSchema = org.apache.avro.SchemaBuilder.builder() + .fixed(name) + .namespace(namespace) + .size(Integer.parseInt(schema.parameters().get(CONNECT_AVRO_FIXED_SIZE_PROP))); + } else { + baseSchema = org.apache.avro.SchemaBuilder.builder().bytesType(); + } if (Decimal.LOGICAL_NAME.equalsIgnoreCase(schema.name())) { int scale = Integer.parseInt(schema.parameters().get(Decimal.SCALE_FIELD)); baseSchema.addProp(AVRO_LOGICAL_DECIMAL_SCALE_PROP, new IntNode(scale)); @@ -828,8 +909,8 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema, baseSchema.addProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP, new IntNode(precision)); } else { baseSchema - .addProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP, - new IntNode(CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT)); + .addProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP, + new IntNode(CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT)); } } break; @@ -840,67 +921,81 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema, case MAP: // Avro only supports string keys, so we match the representation when possible, but // otherwise fall back on a record representation - if (schema.keySchema().type() == Schema.Type.STRING && !schema.keySchema().isOptional()) { + if (schema.keySchema().type() == Schema.Type.STRING + && (!schema.keySchema().isOptional() || allowOptionalMapKey)) { baseSchema = org.apache.avro.SchemaBuilder.builder().map().values( - fromConnectSchemaWithCycle(schema.valueSchema(), fromConnectContext, false)); + fromConnectSchemaWithCycle(schema.valueSchema(), fromConnectContext, false)); } else { // Special record name indicates format List fields = new ArrayList<>(); final org.apache.avro.Schema mapSchema; if (schema.name() == null) { mapSchema = org.apache.avro.Schema.createRecord( - MAP_ENTRY_TYPE_NAME, - null, - namespace, - false + MAP_ENTRY_TYPE_NAME, + null, + NAMESPACE, + false ); } else { + Pair names = getNameOrDefault(fromConnectContext, schema.name()); + String namespace = names.getKey(); + String name = names.getValue(); mapSchema = org.apache.avro.Schema.createRecord(name, null, namespace, false); mapSchema.addProp(CONNECT_INTERNAL_TYPE_NAME, MAP_ENTRY_TYPE_NAME); } addAvroRecordField( - fields, - KEY_FIELD, - schema.keySchema(), - fromConnectContext); + fields, + KEY_FIELD, + schema.keySchema(), + null, + fromConnectContext); addAvroRecordField( - fields, - VALUE_FIELD, - schema.valueSchema(), - fromConnectContext); + fields, + VALUE_FIELD, + schema.valueSchema(), + null, + fromConnectContext); mapSchema.setFields(fields); baseSchema = org.apache.avro.Schema.createArray(mapSchema); } break; case STRUCT: - if (AVRO_TYPE_UNION.equals(schema.name())) { + if (isUnionSchema(schema)) { List unionSchemas = new ArrayList<>(); if (schema.isOptional()) { unionSchemas.add(org.apache.avro.SchemaBuilder.builder().nullType()); } for (Field field : schema.fields()) { unionSchemas.add( - fromConnectSchemaWithCycle(nonOptional(field.schema()), fromConnectContext, true)); + fromConnectSchemaWithCycle(nonOptional(field.schema()), fromConnectContext, true)); } baseSchema = org.apache.avro.Schema.createUnion(unionSchemas); } else if (schema.isOptional()) { List unionSchemas = new ArrayList<>(); unionSchemas.add(org.apache.avro.SchemaBuilder.builder().nullType()); unionSchemas.add( - fromConnectSchemaWithCycle(nonOptional(schema), fromConnectContext, false)); + fromConnectSchemaWithCycle(nonOptional(schema), fromConnectContext, false)); baseSchema = org.apache.avro.Schema.createUnion(unionSchemas); } else { + Pair names = getNameOrDefault(fromConnectContext, schema.name()); + String namespace = names.getKey(); + String name = names.getValue(); String doc = schema.parameters() != null - ? schema.parameters().get(CONNECT_RECORD_DOC_PROP) - : null; - baseSchema = org.apache.avro.Schema.createRecord( - name != null ? name : DEFAULT_SCHEMA_NAME, doc, namespace, false); + ? schema.parameters() + .get(discardTypeDocDefault ? CONNECT_RECORD_DOC_PROP : AVRO_RECORD_DOC_PROP) + : null; + baseSchema = org.apache.avro.Schema.createRecord(name, doc, namespace, false); if (schema.name() != null) { fromConnectContext.cycleReferences.put(schema.name(), baseSchema); } List fields = new ArrayList<>(); for (Field field : schema.fields()) { - addAvroRecordField(fields, field.name(), field.schema(), fromConnectContext); + String fieldName = scrubName(field.name()); + String fieldDoc = null; + if (!discardTypeDocDefault && schema.parameters() != null) { + fieldDoc = schema.parameters().get(AVRO_FIELD_DOC_PREFIX_PROP + field.name()); + } + addAvroRecordField(fields, fieldName, field.schema(), fieldDoc, fromConnectContext); } baseSchema.setFields(fields); } @@ -917,14 +1012,20 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema, } if (schema.version() != null) { baseSchema.addProp(CONNECT_VERSION_PROP, - JsonNodeFactory.instance.numberNode(schema.version())); + JsonNodeFactory.instance.numberNode(schema.version())); } if (schema.parameters() != null) { - baseSchema.addProp(CONNECT_PARAMETERS_PROP, parametersFromConnect(schema.parameters())); + JsonNode params = parametersFromConnect(schema.parameters()); + if (!params.isEmpty()) { + baseSchema.addProp(CONNECT_PARAMETERS_PROP, params); + } } if (schema.defaultValue() != null) { - baseSchema.addProp(CONNECT_DEFAULT_VALUE_PROP, - defaultValueFromConnect(schema, schema.defaultValue())); + if (discardTypeDocDefault || schema.parameters() == null + || !schema.parameters().containsKey(AVRO_FIELD_DEFAULT_FLAG_PROP)) { + baseSchema.addProp(CONNECT_DEFAULT_VALUE_PROP, + defaultValueFromConnect(schema, schema.defaultValue())); + } } if (schema.name() != null) { baseSchema.addProp(CONNECT_NAME_PROP, schema.name()); @@ -936,15 +1037,33 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema, } } + boolean forceLegacyDecimal = false; // the new and correct way to handle logical types if (schema.name() != null) { if (Decimal.LOGICAL_NAME.equalsIgnoreCase(schema.name())) { String precisionString = schema.parameters().get(CONNECT_AVRO_DECIMAL_PRECISION_PROP); String scaleString = schema.parameters().get(Decimal.SCALE_FIELD); int precision = precisionString == null ? CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT : - Integer.parseInt(precisionString); + Integer.parseInt(precisionString); int scale = scaleString == null ? 0 : Integer.parseInt(scaleString); - org.apache.avro.LogicalTypes.decimal(precision, scale).addToSchema(baseSchema); + if (scale < 0 || scale > precision) { + log.trace( + "Scale and precision of {} and {} cannot be serialized as native Avro logical " + + "decimal type; reverting to legacy serialization method", + scale, + precision + ); + // We cannot use the Avro Java library's support for the decimal logical type when the + // scale is either negative or greater than the precision as this violates the Avro spec + // and causes the Avro library to throw an exception, so we fall back in this case to + // using the legacy method for encoding decimal logical type information. + // Can't add a key/value pair with the CONNECT_AVRO_DECIMAL_PRECISION_PROP key to the + // schema's parameters since the parameters for Connect schemas are immutable, so we + // just track this in a local boolean variable instead. + forceLegacyDecimal = true; + } else { + org.apache.avro.LogicalTypes.decimal(precision, scale).addToSchema(baseSchema); + } } else if (Time.LOGICAL_NAME.equalsIgnoreCase(schema.name())) { org.apache.avro.LogicalTypes.timeMillis().addToSchema(baseSchema); } else if (Timestamp.LOGICAL_NAME.equalsIgnoreCase(schema.name())) { @@ -969,7 +1088,8 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema, // and name(). if (schema.name() != null) { if (Decimal.LOGICAL_NAME.equalsIgnoreCase(schema.name()) - && schema.parameters().containsKey(CONNECT_AVRO_DECIMAL_PRECISION_PROP)) { + && (schema.parameters().containsKey(CONNECT_AVRO_DECIMAL_PRECISION_PROP) + || forceLegacyDecimal)) { baseSchema.addProp(AVRO_LOGICAL_TYPE_PROP, AVRO_LOGICAL_DECIMAL); } else if (Time.LOGICAL_NAME.equalsIgnoreCase(schema.name())) { baseSchema.addProp(AVRO_LOGICAL_TYPE_PROP, AVRO_LOGICAL_TIME_MILLIS); @@ -992,35 +1112,90 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema, // can't store any metadata on the actual top-level schema when it's a union because of Avro // constraints on the format of schemas. if (!ignoreOptional) { - if (schema.isOptional()) { - if (schema.defaultValue() != null) { - finalSchema = org.apache.avro.SchemaBuilder.builder().unionOf() - .type(baseSchema).and() - .nullType() - .endUnion(); - } else { - finalSchema = org.apache.avro.SchemaBuilder.builder().unionOf() - .nullType().and() - .type(baseSchema) - .endUnion(); - } - } + finalSchema = maybeMakeOptional(schema, baseSchema); } } if (!schema.isOptional()) { fromConnectContext.schemaMap.put(schema, finalSchema); } - fromConnectSchemaCache.put(schema, finalSchema); return finalSchema; } + private Pair getNameOrDefault(FromConnectContext ctx, String name) { + if (name != null) { + String[] split = splitName(name); + return new Pair<>(split[0], split[1]); + } else { + int nameIndex = ctx.incrementAndGetNameIndex(); + return new Pair<>(NAMESPACE, DEFAULT_SCHEMA_NAME + (nameIndex > 1 ? nameIndex : "")); + } + } + + private org.apache.avro.Schema maybeMakeOptional( + Schema schema, org.apache.avro.Schema baseSchema) { + if (!schema.isOptional()) { + return baseSchema; + } + if (schema.defaultValue() != null) { + return org.apache.avro.SchemaBuilder.builder().unionOf() + .type(baseSchema).and() + .nullType() + .endUnion(); + } else { + return org.apache.avro.SchemaBuilder.builder().unionOf() + .nullType().and() + .type(baseSchema) + .endUnion(); + } + } + + private static String scrubFullName(String name, boolean scrubInvalidNames) { + if (name == null || !scrubInvalidNames) { + return name; + } + String[] split = splitName(name, scrubInvalidNames); + if (split[0] == null) { + return split[1]; + } else { + return split[0] + "." + split[1]; + } + } + + private String scrubName(String name) { + return scrubName(name, scrubInvalidNames); + } + + private static String scrubName(String name, boolean scrubInvalidNames) { + return scrubInvalidNames ? doScrubName(name) : name; + } + + // Visible for testing + protected static String doScrubName(String name) { + try { + if (name == null) { + return name; + } + String encoded = URLEncoder.encode(name, "UTF-8"); + if (!NAME_START_CHAR.matcher(encoded).lookingAt()) { + encoded = "x" + encoded; // use an arbitrary valid prefix + } + encoded = NAME_INVALID_CHARS.matcher(encoded).replaceAll("_"); + return encoded; + } catch (UnsupportedEncodingException e) { + return name; + } + } + public org.apache.avro.Schema fromConnectSchemaWithCycle( - Schema schema, - FromConnectContext fromConnectContext, boolean ignoreOptional) { + Schema schema, + FromConnectContext fromConnectContext, boolean ignoreOptional) { org.apache.avro.Schema resolvedSchema; if (fromConnectContext.cycleReferences.containsKey(schema.name())) { resolvedSchema = fromConnectContext.cycleReferences.get(schema.name()); + if (!ignoreOptional) { + resolvedSchema = maybeMakeOptional(schema, resolvedSchema); + } } else { resolvedSchema = fromConnectSchema(schema, fromConnectContext, ignoreOptional); } @@ -1028,43 +1203,37 @@ public org.apache.avro.Schema fromConnectSchemaWithCycle( } private void addAvroRecordField( - List fields, - String fieldName, Schema fieldSchema, - FromConnectContext fromConnectContext) { + List fields, + String fieldName, Schema fieldSchema, + String fieldDoc, + FromConnectContext fromConnectContext) { Object defaultVal = null; if (fieldSchema.defaultValue() != null) { - defaultVal = fieldSchema.defaultValue(); - - // If this is a logical, convert to the primitive form for the Avro default value - defaultVal = toAvroLogical(fieldSchema, defaultVal); - - // Avro doesn't handle a few types that Connect uses, so convert those explicitly here - if (defaultVal instanceof Byte) { - // byte are mapped to integers in Avro - defaultVal = ((Byte) defaultVal).intValue(); - } else if (defaultVal instanceof Short) { - // Shorts are mapped to integers in Avro - defaultVal = ((Short) defaultVal).intValue(); - } else if (defaultVal instanceof ByteBuffer) { - // Avro doesn't handle ByteBuffer directly, but does handle 'byte[]' - // Copy the contents of the byte buffer without side effects on the buffer - ByteBuffer buffer = (ByteBuffer) defaultVal; - byte[] bytes = new byte[buffer.remaining()]; - buffer.duplicate().get(bytes); - defaultVal = bytes; - } + defaultVal = JacksonUtils.toObject( + defaultValueFromConnect(fieldSchema, fieldSchema.defaultValue())); } else if (fieldSchema.isOptional()) { defaultVal = JsonProperties.NULL_VALUE; } - org.apache.avro.Schema.Field field = new org.apache.avro.Schema.Field( - fieldName, - fromConnectSchema(fieldSchema, fromConnectContext, false), - fieldSchema.doc(), - defaultVal); + org.apache.avro.Schema.Field field; + org.apache.avro.Schema schema = fromConnectSchema(fieldSchema, fromConnectContext, false); + try { + field = new org.apache.avro.Schema.Field( + fieldName, + schema, + discardTypeDocDefault ? fieldSchema.doc() : fieldDoc, + defaultVal); + } catch (AvroTypeException e) { + field = new org.apache.avro.Schema.Field( + fieldName, + schema, + discardTypeDocDefault ? fieldSchema.doc() : fieldDoc); + log.warn("Ignoring invalid default for field " + fieldName, e); + } fields.add(field); } + private static Object toAvroLogical(Schema schema, Object value) { if (schema != null && schema.name() != null) { LogicalTypeConverter logicalConverter = TO_AVRO_LOGICAL_CONVERTERS.get(schema.name()); @@ -1090,18 +1259,21 @@ private static Object toConnectLogical(Schema schema, Object value) { // though you can get a default value from the schema, default values for complex structures need // to perform the same translation but those defaults will be part of the original top-level // (complex type) default value, not part of the child schema. - @SuppressWarnings("deprecation") - private static JsonNode defaultValueFromConnect(Schema schema, Object value) { + private JsonNode defaultValueFromConnect(Schema schema, Object value) { try { + if (value == null) { + return NullNode.getInstance(); + } + // If this is a logical type, convert it from the convenient Java type to the underlying // serializeable format Object defaultVal = toAvroLogical(schema, value); switch (schema.type()) { case INT8: - return JsonNodeFactory.instance.numberNode((Byte) defaultVal); + return JsonNodeFactory.instance.numberNode(((Byte) defaultVal).intValue()); case INT16: - return JsonNodeFactory.instance.numberNode((Short) defaultVal); + return JsonNodeFactory.instance.numberNode(((Short) defaultVal).intValue()); case INT32: return JsonNodeFactory.instance.numberNode((Integer) defaultVal); case INT64: @@ -1116,9 +1288,11 @@ private static JsonNode defaultValueFromConnect(Schema schema, Object value) { return JsonNodeFactory.instance.textNode((String) defaultVal); case BYTES: if (defaultVal instanceof byte[]) { - return JsonNodeFactory.instance.binaryNode((byte[]) defaultVal); + return JsonNodeFactory.instance.textNode(new String((byte[]) defaultVal, + StandardCharsets.ISO_8859_1)); } else { - return JsonNodeFactory.instance.binaryNode(((ByteBuffer) defaultVal).array()); + return JsonNodeFactory.instance.textNode(new String(((ByteBuffer) defaultVal).array(), + StandardCharsets.ISO_8859_1)); } case ARRAY: { ArrayNode array = JsonNodeFactory.instance.arrayNode(); @@ -1128,7 +1302,8 @@ private static JsonNode defaultValueFromConnect(Schema schema, Object value) { return array; } case MAP: - if (schema.keySchema().type() == Schema.Type.STRING && !schema.keySchema().isOptional()) { + if (schema.keySchema().type() == Schema.Type.STRING + && (!schema.keySchema().isOptional() || allowOptionalMapKey)) { ObjectNode node = JsonNodeFactory.instance.objectNode(); for (Map.Entry entry : ((Map) defaultVal).entrySet()) { JsonNode entryDef = defaultValueFromConnect(schema.valueSchema(), entry.getValue()); @@ -1148,11 +1323,16 @@ private static JsonNode defaultValueFromConnect(Schema schema, Object value) { return array; } case STRUCT: { + boolean isUnion = isUnionSchema(schema); ObjectNode node = JsonNodeFactory.instance.objectNode(); Struct struct = ((Struct) defaultVal); for (Field field : (schema.fields())) { + String fieldName = scrubName(field.name()); JsonNode fieldDef = defaultValueFromConnect(field.schema(), struct.get(field)); - node.put(field.name(), fieldDef); + if (isUnion) { + return fieldDef; + } + node.put(fieldName, fieldDef); } return node; } @@ -1167,10 +1347,13 @@ private static JsonNode defaultValueFromConnect(Schema schema, Object value) { } } - private static JsonNode parametersFromConnect(Map params) { + + private JsonNode parametersFromConnect(Map params) { ObjectNode result = JsonNodeFactory.instance.objectNode(); for (Map.Entry entry : params.entrySet()) { - result.put(entry.getKey(), entry.getValue()); + if (discardTypeDocDefault || !entry.getKey().equals(AVRO_FIELD_DEFAULT_FLAG_PROP)) { + result.put(entry.getKey(), entry.getValue()); + } } return result; } @@ -1234,7 +1417,7 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon private Object toConnectData(Schema schema, Object value, ToConnectContext toConnectContext, boolean doLogicalConversion) { validateSchemaValue(schema, value); - if (value == null) { + if (value == null || value == JsonProperties.NULL_VALUE) { return null; } try { @@ -1246,29 +1429,29 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon IndexedRecord recordValue = (IndexedRecord) value; Object - boolVal = - recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BOOLEAN_FIELD).pos()); + boolVal = + recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BOOLEAN_FIELD).pos()); if (boolVal != null) { return toConnectData(Schema.BOOLEAN_SCHEMA, boolVal, toConnectContext); } Object - bytesVal = - recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BYTES_FIELD).pos()); + bytesVal = + recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BYTES_FIELD).pos()); if (bytesVal != null) { return toConnectData(Schema.BYTES_SCHEMA, bytesVal, toConnectContext); } Object - dblVal = - recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_DOUBLE_FIELD).pos()); + dblVal = + recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_DOUBLE_FIELD).pos()); if (dblVal != null) { return toConnectData(Schema.FLOAT64_SCHEMA, dblVal, toConnectContext); } Object - fltVal = - recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_FLOAT_FIELD).pos()); + fltVal = + recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_FLOAT_FIELD).pos()); if (fltVal != null) { return toConnectData(Schema.FLOAT32_SCHEMA, fltVal, toConnectContext); } @@ -1279,29 +1462,29 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon } Object - longVal = - recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_LONG_FIELD).pos()); + longVal = + recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_LONG_FIELD).pos()); if (longVal != null) { return toConnectData(Schema.INT64_SCHEMA, longVal, toConnectContext); } Object - stringVal = - recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_STRING_FIELD).pos()); + stringVal = + recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_STRING_FIELD).pos()); if (stringVal != null) { return toConnectData(Schema.STRING_SCHEMA, stringVal, toConnectContext); } Object - arrayVal = - recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_ARRAY_FIELD).pos()); + arrayVal = + recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_ARRAY_FIELD).pos()); if (arrayVal != null) { // We cannot reuse the logic like we do in other cases because it is not possible to // construct an array schema with a null item schema, but the items have no schema. if (!(arrayVal instanceof Collection)) { throw new DataException( - "Expected a Collection for schemaless array field but found a " - + arrayVal.getClass().getName() + "Expected a Collection for schemaless array field but found a " + + arrayVal.getClass().getName() ); } Collection original = (Collection) arrayVal; @@ -1318,8 +1501,8 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon // construct a map schema with a null item schema, but the items have no schema. if (!(mapVal instanceof Collection)) { throw new DataException( - "Expected a List for schemaless map field but found a " - + mapVal.getClass().getName() + "Expected a List for schemaless map field but found a " + + mapVal.getClass().getName() ); } Collection original = (Collection) mapVal; @@ -1328,9 +1511,9 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon int avroKeyFieldIndex = entry.getSchema().getField(KEY_FIELD).pos(); int avroValueFieldIndex = entry.getSchema().getField(VALUE_FIELD).pos(); Object convertedKey = toConnectData( - null, entry.get(avroKeyFieldIndex), toConnectContext); + null, entry.get(avroKeyFieldIndex), toConnectContext); Object convertedValue = toConnectData( - null, entry.get(avroValueFieldIndex), toConnectContext); + null, entry.get(avroValueFieldIndex), toConnectContext); result.put(convertedKey, convertedValue); } return result; @@ -1349,13 +1532,8 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon break; } case INT64: { - long longValue; - if (value instanceof Integer) { // Convert up - longValue = ((Integer) value).longValue(); - } else { // Validate type - longValue = (Long) value; - } - converted = longValue; + Long longValue = (Long) value; // Validate type + converted = value; break; } case FLOAT32: { @@ -1453,35 +1631,48 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon case STRUCT: { // Special case support for union types - if (schema.name() != null && schema.name().equals(AVRO_TYPE_UNION)) { + if (isUnionSchema(schema)) { Schema valueRecordSchema = null; if (value instanceof IndexedRecord) { IndexedRecord valueRecord = ((IndexedRecord) value); valueRecordSchema = toConnectSchemaWithCycles( - valueRecord.getSchema(), true, null, null, toConnectContext); + valueRecord.getSchema(), true, null, null, toConnectContext); } + int index = 0; for (Field field : schema.fields()) { Schema fieldSchema = field.schema(); - - if (isInstanceOfAvroSchemaTypeForSimpleSchema(fieldSchema, value) - || (valueRecordSchema != null && schemaEquals(valueRecordSchema, fieldSchema))) { + if (isInstanceOfAvroSchemaTypeForSimpleSchema(fieldSchema, value, index) + || (valueRecordSchema != null && schemaEquals(valueRecordSchema, fieldSchema))) { converted = new Struct(schema).put( - unionMemberFieldName(fieldSchema), - toConnectData(fieldSchema, value, toConnectContext)); + unionMemberFieldName(fieldSchema, index), + toConnectData(fieldSchema, value, toConnectContext)); break; } + index++; } if (converted == null) { - throw new DataException( - "Did not find matching union field for data: " + value.toString()); + throw new DataException("Did not find matching union field for data"); } + } else if (value instanceof Map) { + // Default values from Avro are returned as Map + Map original = (Map) value; + Struct result = new Struct(schema); + for (Field field : schema.fields()) { + String fieldName = scrubName(field.name()); + Object convertedFieldValue = toConnectData(field.schema(), + original.getOrDefault(fieldName, field.schema().defaultValue()), + toConnectContext); + result.put(field, convertedFieldValue); + } + return result; } else { IndexedRecord original = (IndexedRecord) value; Struct result = new Struct(schema); for (Field field : schema.fields()) { - int avroFieldIndex = original.getSchema().getField(field.name()).pos(); + String fieldName = scrubName(field.name()); + int avroFieldIndex = original.getSchema().getField(fieldName).pos(); Object convertedFieldValue = - toConnectData(field.schema(), original.get(avroFieldIndex), toConnectContext); + toConnectData(field.schema(), original.get(avroFieldIndex), toConnectContext); result.put(field, convertedFieldValue); } converted = result; @@ -1493,7 +1684,7 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon throw new DataException("Unknown Connect schema type: " + schema.type()); } - if (schema != null && schema.name() != null && doLogicalConversion) { + if (schema.name() != null && doLogicalConversion) { LogicalTypeConverter logicalConverter = TO_CONNECT_LOGICAL_CONVERTERS.get(schema.name()); if (logicalConverter != null) { converted = logicalConverter.convert(schema, converted); @@ -1501,7 +1692,8 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon } return converted; } catch (ClassCastException e) { - throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass()); + String schemaType = schema != null ? schema.type().toString() : "null"; + throw new DataException("Invalid type for " + schemaType + ": " + value.getClass()); } } @@ -1525,6 +1717,12 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, AvroSchemaAndVersion schemaAndVersion = new AvroSchemaAndVersion(schema, version); Schema cachedSchema = toConnectSchemaCache.get(schemaAndVersion); if (cachedSchema != null) { + if (schema.getType() == org.apache.avro.Schema.Type.RECORD) { + // cycleReferences is only populated with record type schemas. We need to initialize it here + // with the top-level record schema, as would happen if we did not hit the cache. This + // schema has the version information set, thus it properly works with schemaEquals. + toConnectContext.cycleReferences.put(schema, new CyclicSchemaWrapper(cachedSchema)); + } return cachedSchema; } @@ -1577,28 +1775,29 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, case FIXED: if (AVRO_LOGICAL_DECIMAL.equalsIgnoreCase(logicalType)) { Object scaleNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_SCALE_PROP); - if (null == scaleNode || !(scaleNode instanceof Number)) { - throw new DataException("scale must be specified and must be a number."); - } - Number scale = (Number) scaleNode; - builder = Decimal.builder(scale.intValue()); + // In Avro the scale is optional and should default to 0 + int scale = scaleNode instanceof Number ? ((Number) scaleNode).intValue() : 0; + builder = Decimal.builder(scale); Object precisionNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP); if (null != precisionNode) { if (!(precisionNode instanceof Number)) { throw new DataException(AVRO_LOGICAL_DECIMAL_PRECISION_PROP - + " property must be a JSON Integer." - + " https://avro.apache.org/docs/1.9.1/spec.html#Decimal"); + + " property must be a JSON Integer." + + " https://avro.apache.org/docs/1.9.1/spec.html#Decimal"); } // Capture the precision as a parameter only if it is not the default - Integer precision = ((Number) precisionNode).intValue(); - if (! precision.equals(CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT)) { - builder.parameter(CONNECT_AVRO_DECIMAL_PRECISION_PROP, precision.toString()); + int precision = ((Number) precisionNode).intValue(); + if (precision != CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT) { + builder.parameter(CONNECT_AVRO_DECIMAL_PRECISION_PROP, String.valueOf(precision)); } } } else { builder = SchemaBuilder.bytes(); } + if (schema.getType() == org.apache.avro.Schema.Type.FIXED) { + builder.parameter(CONNECT_AVRO_FIXED_SIZE_PROP, String.valueOf(schema.getFixedSize())); + } break; case DOUBLE: builder = SchemaBuilder.float64(); @@ -1670,10 +1869,21 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, case RECORD: { builder = SchemaBuilder.struct(); toConnectContext.cycleReferences.put(schema, new CyclicSchemaWrapper(builder)); + if (!discardTypeDocDefault && connectMetaData && schema.getDoc() != null) { + builder.parameter(AVRO_RECORD_DOC_PROP, schema.getDoc()); + } for (org.apache.avro.Schema.Field field : schema.getFields()) { - + if (!discardTypeDocDefault && connectMetaData && field.doc() != null) { + builder.parameter(AVRO_FIELD_DOC_PREFIX_PROP + field.name(), field.doc()); + } + Object defaultVal = null; + try { + defaultVal = field.defaultVal(); + } catch (Exception e) { + log.warn("Ignoring invalid default for field " + field, e); + } Schema fieldSchema = toConnectSchema(field.schema(), getForceOptionalDefault(), - field.defaultVal(), field.doc(), toConnectContext); + defaultVal, field.doc(), toConnectContext); builder.field(field.name(), fieldSchema); } break; @@ -1682,12 +1892,28 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, case ENUM: // enums are unwrapped to strings and the original enum is not preserved builder = SchemaBuilder.string(); - if (connectMetaData && schema.getDoc() != null) { - builder.parameter(CONNECT_ENUM_DOC_PROP, schema.getDoc()); + if (connectMetaData) { + if (schema.getDoc() != null) { + builder.parameter(discardTypeDocDefault + ? CONNECT_ENUM_DOC_PROP + : AVRO_ENUM_DOC_PREFIX_PROP + schema.getName(), + schema.getDoc()); + } + if (!discardTypeDocDefault && schema.getEnumDefault() != null) { + builder.parameter(AVRO_ENUM_DEFAULT_PREFIX_PROP + schema.getName(), + schema.getEnumDefault()); + } } - builder.parameter(AVRO_TYPE_ENUM, schema.getFullName()); + String paramName = generalizedSumTypeSupport ? GENERALIZED_TYPE_ENUM : AVRO_TYPE_ENUM; + builder.parameter(paramName, schema.getFullName()); + int symbolIndex = 0; for (String enumSymbol : schema.getEnumSymbols()) { - builder.parameter(AVRO_TYPE_ENUM + "." + enumSymbol, enumSymbol); + if (generalizedSumTypeSupport) { + builder.parameter(paramName + "." + enumSymbol, String.valueOf(symbolIndex)); + } else { + builder.parameter(paramName + "." + enumSymbol, enumSymbol); + } + symbolIndex++; } break; @@ -1702,13 +1928,20 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, } } } - builder = SchemaBuilder.struct().name(AVRO_TYPE_UNION); + String unionName = generalizedSumTypeSupport + ? GENERALIZED_TYPE_UNION_PREFIX + (unionIndex++) + : AVRO_TYPE_UNION; + builder = SchemaBuilder.struct().name(unionName); + if (generalizedSumTypeSupport) { + builder.parameter(GENERALIZED_TYPE_UNION, unionName); + } Set fieldNames = new HashSet<>(); + int fieldIndex = 0; for (org.apache.avro.Schema memberSchema : schema.getTypes()) { if (memberSchema.getType() == org.apache.avro.Schema.Type.NULL) { builder.optional(); } else { - String fieldName = unionMemberFieldName(memberSchema); + String fieldName = unionMemberFieldName(memberSchema, fieldIndex); if (fieldNames.contains(fieldName)) { throw new DataException("Multiple union schemas map to the Connect union field name"); } @@ -1718,6 +1951,7 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, toConnectSchemaWithCycles(memberSchema, true, null, null, toConnectContext) ); } + fieldIndex++; } break; } @@ -1733,13 +1967,21 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, + schema.getType().getName() + "."); } - String docVal = docDefaultVal != null ? docDefaultVal : - (schema.getDoc() != null ? schema.getDoc() : schema.getProp(CONNECT_DOC_PROP)); - if (docVal != null) { - builder.doc(docVal); - } - if (connectMetaData && schema.getDoc() != null) { - builder.parameter(CONNECT_RECORD_DOC_PROP, schema.getDoc()); + if (discardTypeDocDefault) { + String docVal = docDefaultVal != null ? docDefaultVal : + (schema.getDoc() != null ? schema.getDoc() : schema.getProp(CONNECT_DOC_PROP)); + if (docVal != null) { + builder.doc(docVal); + } + if (connectMetaData && schema.getDoc() != null) { + builder.parameter(CONNECT_RECORD_DOC_PROP, schema.getDoc()); + } + + } else { + String docVal = schema.getProp(CONNECT_DOC_PROP); + if (connectMetaData && docVal != null) { + builder.doc(docVal); + } } // Included Kafka Connect version takes priority, fall back to schema registry version @@ -1793,12 +2035,19 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, } } + Object connectDefault = schema.getObjectProp(CONNECT_DEFAULT_VALUE_PROP); if (fieldDefaultVal == null) { - fieldDefaultVal = JacksonUtils.toJsonNode(schema.getObjectProp(CONNECT_DEFAULT_VALUE_PROP)); + fieldDefaultVal = JacksonUtils.toJsonNode(connectDefault); + } else if (!discardTypeDocDefault && connectMetaData && connectDefault == null) { + builder.parameter(AVRO_FIELD_DEFAULT_FLAG_PROP, "true"); } if (fieldDefaultVal != null) { - builder.defaultValue( - defaultValueFromAvro(builder, schema, fieldDefaultVal, toConnectContext)); + try { + builder.defaultValue( + defaultValueFromAvro(builder, schema, fieldDefaultVal, toConnectContext)); + } catch (DataException e) { + log.warn("Ignoring invalid default for schema " + schema.getName(), e); + } } Object connectNameJson = schema.getObjectProp(CONNECT_NAME_PROP); @@ -1810,10 +2059,11 @@ private Schema toConnectSchema(org.apache.avro.Schema schema, name = (String) connectNameJson; } else if (schema.getType() == org.apache.avro.Schema.Type.RECORD - || schema.getType() == org.apache.avro.Schema.Type.ENUM) { + || schema.getType() == org.apache.avro.Schema.Type.ENUM + || schema.getType() == org.apache.avro.Schema.Type.FIXED) { name = schema.getFullName(); } - if (name != null && !name.equals(DEFAULT_SCHEMA_FULL_NAME)) { + if (name != null && !name.startsWith(DEFAULT_SCHEMA_FULL_NAME)) { if (builder.name() != null) { if (!name.equals(builder.name())) { throw new DataException("Mismatched names: name already added to SchemaBuilder (" @@ -1934,19 +2184,20 @@ private Object defaultValueFromAvroWithoutLogical(Schema schema, case ARRAY: { if (!jsonValue.isArray()) { - throw new DataException("Invalid JSON for array default value: " + jsonValue.toString()); + throw new DataException("Invalid JSON for array default value"); } List result = new ArrayList<>(jsonValue.size()); for (JsonNode elem : jsonValue) { - result.add( - defaultValueFromAvro(schema, avroSchema.getElementType(), elem, toConnectContext)); + Object converted = defaultValueFromAvro( + schema.valueSchema(), avroSchema.getElementType(), elem, toConnectContext); + result.add(converted); } return result; } case MAP: { if (!jsonValue.isObject()) { - throw new DataException("Invalid JSON for map default value: " + jsonValue.toString()); + throw new DataException("Invalid JSON for map default value"); } Map result = new HashMap<>(jsonValue.size()); Iterator> fieldIt = jsonValue.fields(); @@ -1961,7 +2212,7 @@ private Object defaultValueFromAvroWithoutLogical(Schema schema, case RECORD: { if (!jsonValue.isObject()) { - throw new DataException("Invalid JSON for record default value: " + jsonValue.toString()); + throw new DataException("Invalid JSON for record default value"); } Struct result = new Struct(schema); @@ -1982,10 +2233,10 @@ private Object defaultValueFromAvroWithoutLogical(Schema schema, return null; } else { return defaultValueFromAvro( - schema.field(unionMemberFieldName(memberAvroSchema)).schema(), - memberAvroSchema, - value, - toConnectContext); + schema.field(unionMemberFieldName(memberAvroSchema, 0)).schema(), + memberAvroSchema, + value, + toConnectContext); } } default: { @@ -1996,9 +2247,13 @@ private Object defaultValueFromAvroWithoutLogical(Schema schema, } - private String unionMemberFieldName(org.apache.avro.Schema schema) { + private String unionMemberFieldName(org.apache.avro.Schema schema, int index) { + if (generalizedSumTypeSupport) { + return GENERALIZED_TYPE_UNION_FIELD_PREFIX + index; + } if (schema.getType() == org.apache.avro.Schema.Type.RECORD - || schema.getType() == org.apache.avro.Schema.Type.ENUM) { + || schema.getType() == org.apache.avro.Schema.Type.ENUM + || schema.getType() == org.apache.avro.Schema.Type.FIXED) { if (enhancedSchemaSupport) { return schema.getFullName(); } else { @@ -2008,10 +2263,13 @@ private String unionMemberFieldName(org.apache.avro.Schema schema) { return schema.getType().getName(); } - private String unionMemberFieldName(Schema schema) { - if (schema.type() == Schema.Type.STRUCT || isEnumSchema(schema)) { + private String unionMemberFieldName(Schema schema, int index) { + if (generalizedSumTypeSupport) { + return GENERALIZED_TYPE_UNION_FIELD_PREFIX + index; + } + if (schema.type() == Schema.Type.STRUCT || isEnumSchema(schema) || isFixedSchema(schema)) { if (enhancedSchemaSupport) { - return schema.name(); + return scrubFullName(schema.name(), scrubInvalidNames); } else { return splitName(schema.name())[1]; } @@ -2019,30 +2277,84 @@ private String unionMemberFieldName(Schema schema) { return CONNECT_TYPES_TO_AVRO_TYPES.get(schema.type()).getName(); } + private static boolean isUnionSchema(Schema schema) { + return AVRO_TYPE_UNION.equals(schema.name()) || ConnectUnion.isUnion(schema); + } + private static boolean isEnumSchema(Schema schema) { return schema.type() == Schema.Type.STRING - && schema.name() != null - && schema.name().equals(AVRO_TYPE_ENUM); + && schema.parameters() != null + && (schema.parameters().containsKey(GENERALIZED_TYPE_ENUM) + || schema.parameters().containsKey(AVRO_TYPE_ENUM)); + } + + private static boolean isFixedSchema(Schema schema) { + return schema.type() == Schema.Type.BYTES + && schema.name() != null + && schema.parameters() != null + && schema.parameters().containsKey(CONNECT_AVRO_FIXED_SIZE_PROP); } - private static boolean isInstanceOfAvroSchemaTypeForSimpleSchema(Schema fieldSchema, - Object value) { + private boolean isInstanceOfAvroSchemaTypeForSimpleSchema(Schema fieldSchema, + Object value, + int index) { + if (isEnumSchema(fieldSchema)) { + String paramName = generalizedSumTypeSupport ? GENERALIZED_TYPE_ENUM : AVRO_TYPE_ENUM; + String enumSchemaName = fieldSchema.parameters().get(paramName); + if (value instanceof GenericData.EnumSymbol) { + return ((GenericData.EnumSymbol) value).getSchema().getFullName().equals(enumSchemaName); + } else { + return value.getClass().getName().equals(enumSchemaName); + } + } List classes = SIMPLE_AVRO_SCHEMA_TYPES.get(fieldSchema.type()); if (classes == null) { return false; } for (Class type : classes) { if (type.isInstance(value)) { - return true; + if (isFixedSchema(fieldSchema)) { + if (fixedValueSizeMatch(fieldSchema, value, + Integer.parseInt(fieldSchema.parameters().get(CONNECT_AVRO_FIXED_SIZE_PROP)), + index)) { + return true; + } + } else { + return true; + } } } return false; } + /** + * Returns true if the fixed value size of the value matches the expected size + */ + private boolean fixedValueSizeMatch(Schema fieldSchema, + Object value, + int size, + int index) { + if (value instanceof byte[]) { + return ((byte[]) value).length == size; + } else if (value instanceof ByteBuffer) { + return ((ByteBuffer)value).remaining() == size; + } else if (value instanceof GenericFixed) { + return unionMemberFieldName(((GenericFixed) value).getSchema(), index) + .equals(unionMemberFieldName(fieldSchema, index)); + } else { + throw new DataException("Invalid class for fixed, expecting GenericFixed, byte[]" + + " or ByteBuffer but found " + value.getClass()); + } + } + /** * Split a full dotted-syntax name into a namespace and a single-component name. */ - private static String[] splitName(String fullName) { + private String[] splitName(String fullName) { + return splitName(fullName, scrubInvalidNames); + } + + private static String[] splitName(String fullName, boolean scrubInvalidNames) { String[] result = new String[2]; int indexLastDot = fullName.lastIndexOf('.'); if (indexLastDot >= 0) { @@ -2052,6 +2364,7 @@ private static String[] splitName(String fullName) { result[0] = null; result[1] = fullName; } + result[1] = scrubName(result[1], scrubInvalidNames); return result; } @@ -2096,26 +2409,128 @@ public static Schema valueSchema(Schema schema) { } } + private static boolean fieldListEquals(List one, List two, + Map, Boolean> cache) { + if (one == two) { + return true; + } else if (one == null || two == null) { + return false; + } else { + ListIterator itOne = one.listIterator(); + ListIterator itTwo = two.listIterator(); + while (itOne.hasNext() && itTwo.hasNext()) { + if (!fieldEquals(itOne.next(), itTwo.next(), cache)) { + return false; + } + } + return itOne.hasNext() == itTwo.hasNext(); + } + } + + private static boolean fieldEquals( + Field one, Field two, Map, Boolean> cache) { + if (one == two) { + return true; + } else if (one == null || two == null) { + return false; + } else { + return one.getClass() == two.getClass() + && Objects.equals(one.index(), two.index()) + && Objects.equals(one.name(), two.name()) + && schemaEquals(one.schema(), two.schema(), cache); + } + } + + static class Pair { + private K key; + private V value; + + public Pair(K key, V value) { + this.key = key; + this.value = value; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Pair pair = (Pair) o; + return Objects.equals(key, pair.key) + && Objects.equals(value, pair.value); + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + @Override + public String toString() { + return "Pair{" + + "key=" + key + + ", value=" + value + + '}'; + } + } + private static boolean schemaEquals(Schema src, Schema that) { + return schemaEquals(src, that, new HashMap<>()); + } + + private static boolean schemaEquals( + Schema src, Schema that, Map, Boolean> cache) { + if (src == that) { + return true; + } else if (src == null || that == null) { + return false; + } + + // Add a temporary value to the cache to avoid cycles. As long as we recurse only at the end of + // the method, we can safely default to true here. The cache is updated at the end of the method + // with the actual comparison result. + Pair sp = new Pair<>(src, that); + Boolean cacheHit = cache.putIfAbsent(sp, true); + if (cacheHit != null) { + return cacheHit; + } + boolean equals = Objects.equals(src.isOptional(), that.isOptional()) - && Objects.equals(src.version(), that.version()) - && Objects.equals(src.name(), that.name()) - && Objects.equals(src.doc(), that.doc()) - && Objects.equals(src.type(), that.type()) - && Objects.deepEquals(src.defaultValue(), that.defaultValue()) - && Objects.equals(src.fields(), that.fields()) - && Objects.equals(src.parameters(), that.parameters()); + && Objects.equals(src.version(), that.version()) + && Objects.equals(src.name(), that.name()) + && Objects.equals(src.doc(), that.doc()) + && Objects.equals(src.type(), that.type()) + && Objects.deepEquals(src.defaultValue(), that.defaultValue()) + && Objects.equals(src.parameters(), that.parameters()); switch (src.type()) { + case STRUCT: + equals = equals && fieldListEquals(src.fields(), that.fields(), cache); + break; case ARRAY: - return equals && Objects.equals(src.valueSchema(), that.valueSchema()); + equals = equals && schemaEquals(src.valueSchema(), that.valueSchema(), cache); + break; case MAP: - return equals - && Objects.equals(src.valueSchema(), that.valueSchema()) - && Objects.equals(src.keySchema(), that.keySchema()); + equals = equals + && schemaEquals(src.valueSchema(), that.valueSchema(), cache) + && schemaEquals(src.keySchema(), that.keySchema(), cache); + break; default: - return equals; + break; } + cache.put(sp, equals); + return equals; } private static class CyclicSchemaWrapper implements Schema { @@ -2192,6 +2607,24 @@ public Schema schema() { return schema; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + CyclicSchemaWrapper other = (CyclicSchemaWrapper) o; + return Objects.equals(optional, other.optional) && Objects.equals(schema, other.schema); + } + + @Override + public int hashCode() { + return Objects.hashCode(optional) + Objects.hashCode(schema); + } } /** @@ -2219,11 +2652,15 @@ private static class FromConnectContext { private final Map schemaMap; //schema name to Schema reference to resolve cycles private final Map cycleReferences; + private int defaultSchemaNameIndex = 0; private FromConnectContext(Map schemaMap) { this.schemaMap = schemaMap; this.cycleReferences = new IdentityHashMap<>(); } + public int incrementAndGetNameIndex() { + return ++defaultSchemaNameIndex; + } } } diff --git a/utils/converter/src/test/java/io/apicurio/registry/utils/converter/avro/AvroDataTest.java b/utils/converter/src/test/java/io/apicurio/registry/utils/converter/avro/AvroDataTest.java index 57551922af..a4d71c65d1 100644 --- a/utils/converter/src/test/java/io/apicurio/registry/utils/converter/avro/AvroDataTest.java +++ b/utils/converter/src/test/java/io/apicurio/registry/utils/converter/avro/AvroDataTest.java @@ -11,20 +11,20 @@ public class AvroDataTest { @Test public void testIntWithConnectDefault() { final String s = "{" - + " \"type\": \"record\"," - + " \"name\": \"sample\"," - + " \"namespace\": \"io.apicurio\"," - + " \"fields\": [" - + " {" - + " \"name\": \"prop\"," - + " \"type\": {" - + " \"type\": \"int\"," - + " \"connect.default\": 42," - + " \"connect.version\": 1" - + " }" - + " }" - + " ]" - + "}"; + + " \"type\": \"record\"," + + " \"name\": \"sample\"," + + " \"namespace\": \"io.apicurio\"," + + " \"fields\": [" + + " {" + + " \"name\": \"prop\"," + + " \"type\": {" + + " \"type\": \"int\"," + + " \"connect.default\": 42," + + " \"connect.version\": 1" + + " }" + + " }" + + " ]" + + "}"; org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(s); @@ -37,20 +37,20 @@ public void testIntWithConnectDefault() { @Test public void testLongWithConnectDefault() { final String s = "{" - + " \"type\": \"record\"," - + " \"name\": \"sample\"," - + " \"namespace\": \"io.apicurio\"," - + " \"fields\": [" - + " {" - + " \"name\": \"prop\"," - + " \"type\": {" - + " \"type\": \"long\"," - + " \"connect.default\": 42," - + " \"connect.version\": 1" - + " }" - + " }" - + " ]" - + "}"; + + " \"type\": \"record\"," + + " \"name\": \"sample\"," + + " \"namespace\": \"io.apicurio\"," + + " \"fields\": [" + + " {" + + " \"name\": \"prop\"," + + " \"type\": {" + + " \"type\": \"long\"," + + " \"connect.default\": 42," + + " \"connect.version\": 1" + + " }" + + " }" + + " ]" + + "}"; org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(s); @@ -63,20 +63,57 @@ public void testLongWithConnectDefault() { @Test public void testAvroInt64WithInteger() { final String s = "{" - + " \"type\": \"record\"," - + " \"name\": \"sample\"," - + " \"namespace\": \"io.apicurio\"," - + " \"fields\": [" - + " {" - + " \"name\": \"someprop\"," - + " \"type\": [\"long\",\"null\"]" - + " }" - + " ]" - + "}"; + + " \"type\": \"record\"," + + " \"name\": \"sample\"," + + " \"namespace\": \"io.apicurio\"," + + " \"fields\": [" + + " {" + + " \"name\": \"someprop\"," + + " \"type\": [\"long\",\"null\"]" + + " }" + + " ]" + + "}"; org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(s); - GenericRecord outputRecord = new GenericRecordBuilder(avroSchema).set("someprop", 42).build(); + GenericRecord outputRecord = new GenericRecordBuilder(avroSchema).set("someprop", (long) 42).build(); AvroData avroData = new AvroData(0); Assertions.assertDoesNotThrow(() -> avroData.toConnectData(avroSchema, outputRecord)); } + + @Test + public void testDecimal() { + final String s = "{" + + " \"type\": \"record\"," + + " \"name\": \"sample\"," + + " \"namespace\": \"io.apicurio\"," + + " \"fields\": [" + + " {" + + " \"name\": \"somedecimal\"," + + " \"type\": [\n" + + " {\n" + + " \"type\": \"bytes\",\n" + + " \"scale\": 4,\n" + + " \"precision\": 4,\n" + + " \"connect.version\": 1,\n" + + " \"connect.parameters\": {\n" + + " \"scale\": \"4\",\n" + + " \"connect.decimal.precision\": \"4\"\n" + + " },\n" + + " \"connect.default\": \"AA==\",\n" + + " \"connect.name\": \"org.apache.kafka.connect.data.Decimal\",\n" + + " \"logicalType\": \"decimal\"\n" + + " },\n" + + " \"null\"\n" + + " ],\n" + + " \"default\": \"AA==\"" + + " }" + + " ]," + + "\"connect.name\":\"io.apicurio.sample\"\n" + + "}"; + + org.apache.avro.Schema bSchema = new org.apache.avro.Schema.Parser().parse(s); + AvroData avroData = new AvroData(0); + org.apache.avro.Schema aSchema = avroData.fromConnectSchema(avroData.toConnectSchema(bSchema)); + Assertions.assertEquals(bSchema.toString(), aSchema.toString()); + } }