Skip to content

Commit

Permalink
Adding fixed_string and addressing date bug (#326)
Browse files Browse the repository at this point in the history
* Adding fixed_string and addressing date bug

* Update CHANGELOG.md

* Adding test, adjusting test, and simplifying test

* Update VERSION
  • Loading branch information
Paultagoras authored Feb 21, 2024
1 parent 0e64c14 commit 03e3268
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Added support for multiple databases in single stream using a virtual topic #41
* Add support for configuring JDBC properties in connection URL (i.e. `?auto_discovery=true`)
* Added minimum version check for ClickHouse
* Added support for fixed_string type

## 1.0.11 2024-01-29
* Added support for RowBinaryWithDefaults
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.0.11
v1.0.12
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField
break;//I notice we just break here, rather than actually validate the type
default:
if (!colTypeName.equals(dataTypeName)) {
if (!(colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))) {
if (!((colTypeName.equals("STRING") || colTypeName.equalsIgnoreCase("FIXED_STRING")) && dataTypeName.equals("BYTES"))) {
LOGGER.debug("Data schema name: {}", objSchema.name());
if (!("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) {
validSchema = false;
LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName));
Expand Down Expand Up @@ -256,10 +257,9 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va
if (value.getFieldType().equals(Schema.Type.INT32) || value.getFieldType().equals(Schema.Type.INT64)) {
if (value.getObject().getClass().getName().endsWith(".Date")) {
Date date = (Date) value.getObject();
long epochSecond = date.toInstant().getEpochSecond();
BinaryStreamUtils.writeUnsignedInt32(stream, epochSecond);
BinaryStreamUtils.writeUnsignedInt32(stream, date.toInstant().getEpochSecond());
} else {
BinaryStreamUtils.writeUnsignedInt32(stream, (Long) value.getObject());
BinaryStreamUtils.writeUnsignedInt32(stream, Long.parseLong(String.valueOf(value.getObject())));
}
} else if (value.getFieldType().equals(Schema.Type.STRING)) {
try {
Expand Down Expand Up @@ -339,6 +339,9 @@ private void doWriteColValue(Column col, ClickHousePipedOutputStream stream, Dat
case STRING:
doWritePrimitive(columnType, value.getFieldType(), stream, value.getObject());
break;
case FIXED_STRING:
doWriteFixedString(columnType, stream, value.getObject(), col.getPrecision());
break;
case Date:
case Date32:
case DateTime:
Expand Down Expand Up @@ -392,6 +395,29 @@ private void doWriteColValue(Column col, ClickHousePipedOutputStream stream, Dat
}
}

private void doWriteFixedString(Type columnType, ClickHousePipedOutputStream stream, Object value, int length) throws IOException {
LOGGER.trace("Writing fixed string type: {}, value: {}", columnType, value);

if (value == null) {
BinaryStreamUtils.writeNull(stream);
return;
}

if (Objects.requireNonNull(columnType) == Type.FIXED_STRING) {
if (value instanceof byte[]) {
byte[] bytes = (byte[]) value;
if (bytes.length != length) {
throw new DataException(String.format("Fixed string length mismatch: expected %d, got %d", length, bytes.length));
}
BinaryStreamUtils.writeFixedString(stream, new String(bytes, StandardCharsets.UTF_8), length, StandardCharsets.UTF_8);
} else {
String msg = String.format("Not implemented conversion from %s to %s", value.getClass(), columnType);
LOGGER.error(msg);
throw new DataException(msg);
}
}
}

private void doWritePrimitive(Type columnType, Schema.Type dataType, ClickHousePipedOutputStream stream, Object value) throws IOException {
LOGGER.trace("Writing primitive type: {}, value: {}", columnType, value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,11 @@ private static Type dispatchPrimitive(String valueType) {
type = Type.DateTime64;
} else if (valueType.startsWith("Decimal")) {
type = Type.Decimal;
} else if (valueType.startsWith("FixedString")) {
type = Type.FIXED_STRING;
}

break;

}
return type;
}
Expand All @@ -190,6 +191,9 @@ public static Column extractColumn(String name, String valueType, boolean isNull
return extractColumn(name, valueType.substring("LowCardinality".length() + 1, valueType.length() - 1), isNull, hasDefaultValue);
} else if (valueType.startsWith("Nullable")) {
return extractColumn(name, valueType.substring("Nullable".length() + 1, valueType.length() - 1), true, hasDefaultValue);
} else if (type == Type.FIXED_STRING) {
int length = Integer.parseInt(valueType.substring("FixedString".length() + 1, valueType.length() - 1).trim());
return new Column(name, type, isNull, hasDefaultValue, length, 0);
} else if (type == Type.DateTime64) {
String[] scaleAndTimezone = valueType.substring("DateTime64".length() + 1, valueType.length() - 1).split(",");
int precision = Integer.parseInt(scaleAndTimezone[0].trim());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ public enum Type {
UINT64,
UINT128,
UINT256,
Decimal
Decimal,
FIXED_STRING
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ public void schemaWithBytesTest() {
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}


@AfterAll
protected static void tearDown() {
db.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;

import java.util.*;

Expand Down Expand Up @@ -278,7 +279,7 @@ public void detectUnsupportedDataConversions() {
try {
chst.put(sr);
} catch (RuntimeException e) {
assertTrue(Utils.getRootCause(e) instanceof DataException, "Did not detect wrong date conversion ");
assertInstanceOf(DataException.class, Utils.getRootCause(e), "Did not detect wrong date conversion ");
}
chst.stop();
}
Expand Down Expand Up @@ -412,6 +413,48 @@ public void schemaWithDecimalTest() {
assertEquals(499700, ClickHouseTestHelpers.sumRows(chc, topic, "decimal_14_2"));
}

@Test
public void schemaWithFixedStringTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props);

String topic = "fixed-string-value-table-test";
int fixedStringSize = RandomUtils.nextInt(1, 100);
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " +
"`fixed_string` FixedString("+fixedStringSize+") ) Engine = MergeTree ORDER BY off16");

Collection<SinkRecord> sr = SchemaTestData.createFixedStringData(topic, 1, fixedStringSize);
ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();

assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}

@Test
public void schemaWithFixedStringMismatchTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props);

String topic = "fixed-string-mismatch-table-test";
int fixedStringSize = RandomUtils.nextInt(1, 100);
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " +
"`fixed_string` FixedString(" + (fixedStringSize + 1 ) + ") ) Engine = MergeTree ORDER BY off16");

Collection<SinkRecord> sr = SchemaTestData.createFixedStringData(topic, 1, fixedStringSize);
ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
try {
chst.put(sr);
} catch (RuntimeException e) {
assertInstanceOf(DataException.class, Utils.getRootCause(e), "Size mismatch for FixedString");
}
chst.stop();
}

@Test
public void schemaWithNullableDecimalTest() {
Map<String, String> props = getTestProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.sink.SinkRecord;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;

import java.math.BigDecimal;
import java.time.*;
Expand Down Expand Up @@ -478,6 +479,7 @@ public static Collection<SinkRecord> createDateType(String topic, int partition,
.field("off16", Schema.INT16_SCHEMA)
.field("date_number", Schema.OPTIONAL_INT32_SCHEMA)
.field("date32_number", Schema.OPTIONAL_INT32_SCHEMA)
.field("datetime_int", Schema.INT32_SCHEMA)
.field("datetime_number", Schema.INT64_SCHEMA)
.field("datetime64_number", Schema.INT64_SCHEMA)
.field("timestamp_int64", Timestamp.SCHEMA)
Expand All @@ -500,11 +502,13 @@ public static Collection<SinkRecord> createDateType(String topic, int partition,

LocalDateTime localDateTime = LocalDateTime.now();
long localDateTimeLong = localDateTime.toEpochSecond(ZoneOffset.UTC);
int localDateTimeInt = (int)localDateTime.toEpochSecond(ZoneOffset.UTC);

Struct value_struct = new Struct(NESTED_SCHEMA)
.put("off16", (short)n)
.put("date_number", localDateInt)
.put("date32_number", localDateInt)
.put("datetime_int", localDateTimeInt)
.put("datetime_number", localDateTimeLong)
.put("datetime64_number", currentTime)
.put("timestamp_int64", new Date(System.currentTimeMillis()))
Expand Down Expand Up @@ -721,4 +725,43 @@ public static Collection<SinkRecord> createZonedTimestampConversions(String topi
});
return array;
}



public static Collection<SinkRecord> createFixedStringData(String topic, int partition, int fixedSize) {
return createFixedStringData(topic, partition, DEFAULT_TOTAL_RECORDS, fixedSize);
}
public static Collection<SinkRecord> createFixedStringData(String topic, int partition, int totalRecords, int fixedSize) {

Schema NESTED_SCHEMA = SchemaBuilder.struct()
.field("off16", Schema.INT16_SCHEMA)
.field("fixed_string", Schema.BYTES_SCHEMA)
.build();


List<SinkRecord> array = new ArrayList<>();
LongStream.range(0, totalRecords).forEachOrdered(n -> {
Struct value_struct = new Struct(NESTED_SCHEMA)
.put("off16", (short)n)
.put("fixed_string", RandomStringUtils.random(fixedSize, true, true).getBytes());


SinkRecord sr = new SinkRecord(
topic,
partition,
null,
null, NESTED_SCHEMA,
value_struct,
n,
System.currentTimeMillis(),
TimestampType.CREATE_TIME
);

array.add(sr);
});
return array;
}



}

0 comments on commit 03e3268

Please sign in to comment.