diff --git a/CHANGELOG.md b/CHANGELOG.md index 761cae0b..ae5b75d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * Added support for multiple databases in single stream using a virtual topic #41 * Add support for configuring JDBC properties in connection URL (i.e. `?auto_discovery=true`) * Added minimum version check for ClickHouse +* Added support for fixed_string type ## 1.0.11 2024-01-29 * Added support for RowBinaryWithDefaults diff --git a/VERSION b/VERSION index 7c46aa19..7c866b73 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.0.11 +v1.0.12 diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 51b12d6c..99e6db1d 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -204,7 +204,8 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField break;//I notice we just break here, rather than actually validate the type default: if (!colTypeName.equals(dataTypeName)) { - if (!(colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))) { + if (!((colTypeName.equals("STRING") || colTypeName.equalsIgnoreCase("FIXED_STRING")) && dataTypeName.equals("BYTES"))) { + LOGGER.debug("Data schema name: {}", objSchema.name()); if (!("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) { validSchema = false; LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName)); @@ -256,10 +257,9 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va if (value.getFieldType().equals(Schema.Type.INT32) || value.getFieldType().equals(Schema.Type.INT64)) { if (value.getObject().getClass().getName().endsWith(".Date")) { Date date = (Date) value.getObject(); - long epochSecond = date.toInstant().getEpochSecond(); - BinaryStreamUtils.writeUnsignedInt32(stream, epochSecond); + BinaryStreamUtils.writeUnsignedInt32(stream, date.toInstant().getEpochSecond()); } else { - BinaryStreamUtils.writeUnsignedInt32(stream, (Long) value.getObject()); + BinaryStreamUtils.writeUnsignedInt32(stream, Long.parseLong(String.valueOf(value.getObject()))); } } else if (value.getFieldType().equals(Schema.Type.STRING)) { try { @@ -339,6 +339,9 @@ private void doWriteColValue(Column col, ClickHousePipedOutputStream stream, Dat case STRING: doWritePrimitive(columnType, value.getFieldType(), stream, value.getObject()); break; + case FIXED_STRING: + doWriteFixedString(columnType, stream, value.getObject(), col.getPrecision()); + break; case Date: case Date32: case DateTime: @@ -392,6 +395,29 @@ private void doWriteColValue(Column col, ClickHousePipedOutputStream stream, Dat } } + private void doWriteFixedString(Type columnType, ClickHousePipedOutputStream stream, Object value, int length) throws IOException { + LOGGER.trace("Writing fixed string type: {}, value: {}", columnType, value); + + if (value == null) { + BinaryStreamUtils.writeNull(stream); + return; + } + + if (Objects.requireNonNull(columnType) == Type.FIXED_STRING) { + if (value instanceof byte[]) { + byte[] bytes = (byte[]) value; + if (bytes.length != length) { + throw new DataException(String.format("Fixed string length mismatch: expected %d, got %d", length, bytes.length)); + } + BinaryStreamUtils.writeFixedString(stream, new String(bytes, StandardCharsets.UTF_8), length, StandardCharsets.UTF_8); + } else { + String msg = String.format("Not implemented conversion from %s to %s", value.getClass(), columnType); + LOGGER.error(msg); + throw new DataException(msg); + } + } + } + private void doWritePrimitive(Type columnType, Schema.Type dataType, ClickHousePipedOutputStream stream, Object value) throws IOException { LOGGER.trace("Writing primitive type: {}, value: {}", columnType, value); diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java index d23a4a67..97ccf8ec 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java @@ -162,10 +162,11 @@ private static Type dispatchPrimitive(String valueType) { type = Type.DateTime64; } else if (valueType.startsWith("Decimal")) { type = Type.Decimal; + } else if (valueType.startsWith("FixedString")) { + type = Type.FIXED_STRING; } break; - } return type; } @@ -190,6 +191,9 @@ public static Column extractColumn(String name, String valueType, boolean isNull return extractColumn(name, valueType.substring("LowCardinality".length() + 1, valueType.length() - 1), isNull, hasDefaultValue); } else if (valueType.startsWith("Nullable")) { return extractColumn(name, valueType.substring("Nullable".length() + 1, valueType.length() - 1), true, hasDefaultValue); + } else if (type == Type.FIXED_STRING) { + int length = Integer.parseInt(valueType.substring("FixedString".length() + 1, valueType.length() - 1).trim()); + return new Column(name, type, isNull, hasDefaultValue, length, 0); } else if (type == Type.DateTime64) { String[] scaleAndTimezone = valueType.substring("DateTime64".length() + 1, valueType.length() - 1).split(","); int precision = Integer.parseInt(scaleAndTimezone[0].trim()); diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java index 33b2d404..c1f63802 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java @@ -25,5 +25,6 @@ public enum Type { UINT64, UINT128, UINT256, - Decimal + Decimal, + FIXED_STRING } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java index 52c035a1..233b310a 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java @@ -343,6 +343,7 @@ public void schemaWithBytesTest() { assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); } + @AfterAll protected static void tearDown() { db.stop(); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index cf81caf8..87628414 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -12,6 +12,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; import java.util.*; @@ -278,7 +279,7 @@ public void detectUnsupportedDataConversions() { try { chst.put(sr); } catch (RuntimeException e) { - assertTrue(Utils.getRootCause(e) instanceof DataException, "Did not detect wrong date conversion "); + assertInstanceOf(DataException.class, Utils.getRootCause(e), "Did not detect wrong date conversion "); } chst.stop(); } @@ -412,6 +413,48 @@ public void schemaWithDecimalTest() { assertEquals(499700, ClickHouseTestHelpers.sumRows(chc, topic, "decimal_14_2")); } + @Test + public void schemaWithFixedStringTest() { + Map props = getTestProperties(); + ClickHouseHelperClient chc = createClient(props); + + String topic = "fixed-string-value-table-test"; + int fixedStringSize = RandomUtils.nextInt(1, 100); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " + + "`fixed_string` FixedString("+fixedStringSize+") ) Engine = MergeTree ORDER BY off16"); + + Collection sr = SchemaTestData.createFixedStringData(topic, 1, fixedStringSize); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(sr); + chst.stop(); + + assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); + } + + @Test + public void schemaWithFixedStringMismatchTest() { + Map props = getTestProperties(); + ClickHouseHelperClient chc = createClient(props); + + String topic = "fixed-string-mismatch-table-test"; + int fixedStringSize = RandomUtils.nextInt(1, 100); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " + + "`fixed_string` FixedString(" + (fixedStringSize + 1 ) + ") ) Engine = MergeTree ORDER BY off16"); + + Collection sr = SchemaTestData.createFixedStringData(topic, 1, fixedStringSize); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + try { + chst.put(sr); + } catch (RuntimeException e) { + assertInstanceOf(DataException.class, Utils.getRootCause(e), "Size mismatch for FixedString"); + } + chst.stop(); + } + @Test public void schemaWithNullableDecimalTest() { Map props = getTestProperties(); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java index a6ce071f..3242dd64 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java @@ -3,6 +3,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.sink.SinkRecord; +import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; import java.math.BigDecimal; import java.time.*; @@ -478,6 +479,7 @@ public static Collection createDateType(String topic, int partition, .field("off16", Schema.INT16_SCHEMA) .field("date_number", Schema.OPTIONAL_INT32_SCHEMA) .field("date32_number", Schema.OPTIONAL_INT32_SCHEMA) + .field("datetime_int", Schema.INT32_SCHEMA) .field("datetime_number", Schema.INT64_SCHEMA) .field("datetime64_number", Schema.INT64_SCHEMA) .field("timestamp_int64", Timestamp.SCHEMA) @@ -500,11 +502,13 @@ public static Collection createDateType(String topic, int partition, LocalDateTime localDateTime = LocalDateTime.now(); long localDateTimeLong = localDateTime.toEpochSecond(ZoneOffset.UTC); + int localDateTimeInt = (int)localDateTime.toEpochSecond(ZoneOffset.UTC); Struct value_struct = new Struct(NESTED_SCHEMA) .put("off16", (short)n) .put("date_number", localDateInt) .put("date32_number", localDateInt) + .put("datetime_int", localDateTimeInt) .put("datetime_number", localDateTimeLong) .put("datetime64_number", currentTime) .put("timestamp_int64", new Date(System.currentTimeMillis())) @@ -721,4 +725,43 @@ public static Collection createZonedTimestampConversions(String topi }); return array; } + + + + public static Collection createFixedStringData(String topic, int partition, int fixedSize) { + return createFixedStringData(topic, partition, DEFAULT_TOTAL_RECORDS, fixedSize); + } + public static Collection createFixedStringData(String topic, int partition, int totalRecords, int fixedSize) { + + Schema NESTED_SCHEMA = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("fixed_string", Schema.BYTES_SCHEMA) + .build(); + + + List array = new ArrayList<>(); + LongStream.range(0, totalRecords).forEachOrdered(n -> { + Struct value_struct = new Struct(NESTED_SCHEMA) + .put("off16", (short)n) + .put("fixed_string", RandomStringUtils.random(fixedSize, true, true).getBytes()); + + + SinkRecord sr = new SinkRecord( + topic, + partition, + null, + null, NESTED_SCHEMA, + value_struct, + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + ); + + array.add(sr); + }); + return array; + } + + + }