Skip to content

Commit

Permalink
Merge pull request #306 from ClickHouse/add-zonedtime-support
Browse files Browse the repository at this point in the history
Adding some simple String date support
  • Loading branch information
Paultagoras authored Jan 30, 2024
2 parents 693f3c4 + ce9ac31 commit 0e052b3
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 10 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## 1.1.0 2024-01-26
## 1.0.11 2024-01-29
* Added support for RowBinaryWithDefaults
* Updated dependencies
* Adjusting default values for some settings (like insert_quorum)
* Added string support for DateTime64

## 1.0.10 2023-12-11
* Fixed writing into nullable Decimal column by @mlivirov in #276
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.1.0
v1.0.11
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.sink.db.mapping.Type;
import com.clickhouse.kafka.connect.sink.dlq.DuplicateException;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.util.Mask;

import com.clickhouse.kafka.connect.util.QueryIdentifier;
import com.clickhouse.kafka.connect.util.Utils;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import java.math.BigDecimal;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoField;
import java.util.*;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -195,7 +195,7 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField
return validSchema;
}

private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data value) throws IOException {
private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data value, int precision) throws IOException {
// TODO: develop more specific tests to have better coverage
if (value.getObject() == null) {
BinaryStreamUtils.writeNull(stream);
Expand Down Expand Up @@ -238,8 +238,16 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va
} else {
BinaryStreamUtils.writeUnsignedInt32(stream, (Long) value.getObject());
}
} else if (value.getFieldType().equals(Schema.Type.STRING)) {
try {
ZonedDateTime zonedDateTime = ZonedDateTime.parse((String) value.getObject());
LOGGER.trace("Writing epoch seconds: {}", zonedDateTime.toInstant().getEpochSecond());
BinaryStreamUtils.writeUnsignedInt32(stream, zonedDateTime.toInstant().getEpochSecond());
} catch (Exception e) {
LOGGER.error("Error parsing date time string: {}", value.getObject());
unsupported = true;
}
} else {

unsupported = true;
}
break;
Expand All @@ -252,6 +260,31 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va
} else {
BinaryStreamUtils.writeInt64(stream, (Long) value.getObject());
}
} else if (value.getFieldType().equals(Schema.Type.STRING)) {
try {
ZonedDateTime zonedDateTime = ZonedDateTime.parse((String) value.getObject());
long seconds = zonedDateTime.toInstant().getEpochSecond();
long milliSeconds = zonedDateTime.toInstant().toEpochMilli();
long microSeconds = TimeUnit.MICROSECONDS.convert(seconds, TimeUnit.SECONDS) + zonedDateTime.get(ChronoField.MICRO_OF_SECOND);
long nanoSeconds = TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS) + zonedDateTime.getNano();

if (precision == 3) {
LOGGER.trace("Writing epoch milliseconds: {}", milliSeconds);
BinaryStreamUtils.writeInt64(stream, milliSeconds);
} else if (precision == 6) {
LOGGER.trace("Writing epoch microseconds: {}", microSeconds);
BinaryStreamUtils.writeInt64(stream, microSeconds);
} else if (precision == 9) {
LOGGER.trace("Writing epoch nanoseconds: {}", nanoSeconds);
BinaryStreamUtils.writeInt64(stream, nanoSeconds);
} else {
LOGGER.trace("Writing epoch seconds: {}", seconds);
BinaryStreamUtils.writeInt64(stream, seconds);
}
} catch (Exception e) {
LOGGER.error("Error parsing date time string: {}", value.getObject());
unsupported = true;
}
} else {
unsupported = true;
}
Expand Down Expand Up @@ -396,7 +429,7 @@ private void doWriteCol(Record record, Column col, ClickHousePipedOutputStream s
case Date32:
case DateTime:
case DateTime64:
doWriteDates(colType, stream, value);
doWriteDates(colType, stream, value, col.getPrecision());
break;
case Decimal:
if (value.getObject() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private Column(String name, Type type, boolean isNullable, boolean hasDefaultVa
this.isNullable = isNullable;
this.hasDefaultValue = hasDefaultValue;
this.subType = null;
this.precision = 0;
}

private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, Type mapKeyType, Type mapValueType) {
Expand All @@ -40,6 +41,7 @@ private Column(String name, Type type, boolean isNullable, boolean hasDefaultVal
this.subType = null;
this.mapKeyType = mapKeyType;
this.mapValueType = mapValueType;
this.precision = 0;
}

private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, Column subType) {
Expand All @@ -48,6 +50,7 @@ private Column(String name, Type type, boolean isNullable, boolean hasDefaultVal
this.isNullable = isNullable;
this.hasDefaultValue = hasDefaultValue;
this.subType = subType;
this.precision = 0;
}

private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, int precision, int scale) {
Expand Down Expand Up @@ -186,6 +189,11 @@ public static Column extractColumn(String name, String valueType, boolean isNull
return extractColumn(name, valueType.substring("LowCardinality".length() + 1, valueType.length() - 1), isNull, hasDefaultValue);
} else if (valueType.startsWith("Nullable")) {
return extractColumn(name, valueType.substring("Nullable".length() + 1, valueType.length() - 1), true, hasDefaultValue);
} else if (type == Type.DateTime64) {
String[] scaleAndTimezone = valueType.substring("DateTime64".length() + 1, valueType.length() - 1).split(",");
int precision = Integer.parseInt(scaleAndTimezone[0].trim());
LOGGER.trace("Precision is {}", precision);
return new Column(name, type, isNull, hasDefaultValue, precision, 0);
} else if (type == Type.Decimal) {
final Pattern patter = Pattern.compile("Decimal(?<size>\\d{2,3})?\\s*(\\((?<a1>\\d{1,}\\s*)?,*\\s*(?<a2>\\d{1,})?\\))?");
Matcher match = patter.matcher(valueType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,27 @@ public void detectUnsupportedDataConversions() {
chst.stop();
}


@Test
public void supportZonedDatesStringTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props);

String topic = "support-dates-string-test";
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, zoned_date DateTime64, offset_date DateTime64) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = SchemaTestData.createZonedTimestampConversions(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();

assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}



@Test
public void withEmptyDataRecordsTest() {
Map<String, String> props = getTestProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import org.apache.kafka.connect.sink.SinkRecord;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.*;
import java.util.*;
import java.util.Date;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -652,4 +650,41 @@ public static Collection<SinkRecord> createDecimalValueDataWithNulls(String topi

return array;
}


public static Collection<SinkRecord> createZonedTimestampConversions(String topic, int partition) {
return createZonedTimestampConversions(topic, partition, DEFAULT_TOTAL_RECORDS);
}
public static Collection<SinkRecord> createZonedTimestampConversions(String topic, int partition, int totalRecords) {

Schema NESTED_SCHEMA = SchemaBuilder.struct()
.field("off16", Schema.INT16_SCHEMA)
.field("zoned_date", Schema.STRING_SCHEMA)
.field("offset_date", Schema.STRING_SCHEMA)
.build();


List<SinkRecord> array = new ArrayList<>();
LongStream.range(0, totalRecords).forEachOrdered(n -> {
Struct value_struct = new Struct(NESTED_SCHEMA)
.put("off16", (short)n)
.put("zoned_date", ZonedDateTime.now().toString())
.put("offset_date", OffsetDateTime.now().toString());


SinkRecord sr = new SinkRecord(
topic,
partition,
null,
null, NESTED_SCHEMA,
value_struct,
n,
System.currentTimeMillis(),
TimestampType.CREATE_TIME
);

array.add(sr);
});
return array;
}
}

0 comments on commit 0e052b3

Please sign in to comment.