From 353732cc6815fad653c616be61e028e0b4d4f67c Mon Sep 17 00:00:00 2001 From: Grigorii Sokolik Date: Mon, 9 Jan 2023 15:20:18 +0200 Subject: [PATCH] Support JSON Schema converter If the source uses `io.confluent.connect.json.JsonSchemaConverter`, there is no suitable converter for snowflake sink. Adding it. --- .../records/SnowflakeJsonSchemaConverter.java | 62 +++++++++++++++++++ .../connector/records/ConverterTest.java | 40 +++++++++--- 2 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/snowflake/kafka/connector/records/SnowflakeJsonSchemaConverter.java diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeJsonSchemaConverter.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeJsonSchemaConverter.java new file mode 100644 index 000000000..16b0d4295 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeJsonSchemaConverter.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019 Snowflake Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.snowflake.kafka.connector.records; + +import java.nio.ByteBuffer; + +import com.snowflake.kafka.connector.internal.SnowflakeErrors; +import org.apache.kafka.connect.data.SchemaAndValue; + +public class SnowflakeJsonSchemaConverter extends SnowflakeConverter { + /** + * cast bytes array to Json array + * + * @param s topic name. unused + * @param bytes input bytes array, only support single json record now + * @return JSON array + */ + @Override + public SchemaAndValue toConnectData(final String s, final byte[] bytes) { + if (bytes == null) { + return new SchemaAndValue(new SnowflakeJsonSchema(), new SnowflakeRecordContent()); + } + ByteBuffer buffer; + int id; + try { + buffer = ByteBuffer.wrap(bytes); + if (buffer.get() != 0) { + throw SnowflakeErrors.ERROR_0010.getException("unknown bytes"); + } + id = buffer.getInt(); + } catch (Exception ex) { + LOGGER.error("Failed to parse schema-prepended JSON record\n" + ex.getMessage()); + return new SchemaAndValue(new SnowflakeJsonSchema(), new SnowflakeRecordContent(bytes)); + } + + try { + int length = buffer.limit() - 1 - 4; + byte[] data = new byte[length]; + buffer.get(data, 0, length); + + return new SchemaAndValue( + new SnowflakeJsonSchema(), new SnowflakeRecordContent(mapper.readTree(data), id)); + } catch (Exception ex) { + LOGGER.error("Failed to parse JSON record\n" + ex.toString()); + return new SchemaAndValue(new SnowflakeJsonSchema(), new SnowflakeRecordContent(bytes)); + } + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java b/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java index aa03d82b1..e765c0ef8 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java @@ -18,6 +18,7 @@ import static com.snowflake.kafka.connector.records.RecordService.ISO_DATE_TIME_FORMAT; +import com.google.common.collect.Sets; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; import com.snowflake.kafka.connector.mock.MockSchemaRegistryClient; import io.confluent.connect.avro.AvroConverter; @@ -28,20 +29,13 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Collections; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -109,6 +103,34 @@ public void testJsonConverter() { assert ((SnowflakeRecordContent) sv.value()).getData()[0].toString().equals("{}"); } + @Test + public void testConnectJsonSchemaConverter() { + SnowflakeJsonSchemaConverter converter = new SnowflakeJsonSchemaConverter(); + ByteBuffer buffer = ByteBuffer.allocate(29); + int schemaId = 33; + buffer.put((byte)0).putInt(schemaId).put("{\"str\":\"test\",\"num\":123}".getBytes(StandardCharsets.UTF_8)); + + SchemaAndValue sv = + converter.toConnectData("test", buffer.array()); + + assert sv.schema().name().equals(SnowflakeJsonSchema.NAME); + + assert sv.value() instanceof SnowflakeRecordContent; + + SnowflakeRecordContent content = (SnowflakeRecordContent) sv.value(); + assert content.getSchemaID() == schemaId; + + JsonNode[] jsonNodes = content.getData(); + + assert jsonNodes.length == 1; + assert jsonNodes[0].size() == 2; + assert Sets.newHashSet(jsonNodes[0].fieldNames()).containsAll(Arrays.asList("str", "num")); + assert jsonNodes[0].get("str").isTextual(); + assert jsonNodes[0].get("str").asText().equals("test"); + assert jsonNodes[0].get("num").isInt(); + assert jsonNodes[0].get("num").asInt() == 123; + } + @Test public void testAvroConverter() throws IOException { // todo: test schema registry