Skip to content

Commit

Permalink
13608 & 12026 - align regular and CDC integration tests and data mappers
Browse files Browse the repository at this point in the history
  • Loading branch information
yurii-bidiuk committed Jul 14, 2022
1 parent 743e6c2 commit 7b852e5
Show file tree
Hide file tree
Showing 8 changed files with 730 additions and 1,002 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,17 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}

protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
}

protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
LocalDate localDate = timestamptz.toLocalDate();
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBCE;

import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

public class DateTimeConverter {

public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern(
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]");

public static String convertToTimeWithTimezone(Object time) {
OffsetTime timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER);
return timetz.format(TIMETZ_FORMATTER);
}

public static String convertToTimestampWithTimezone(Object timestamp) {
OffsetDateTime timestamptz = OffsetDateTime.ofInstant(toInstant(timestamp), ZoneOffset.UTC);
LocalDate localDate = timestamptz.toLocalDate();
return resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER));
}

public static String convertToTimestamp(Object timestamp) {
final LocalDateTime localDateTime = LocalDateTime.ofInstant(toInstant(timestamp),
ZoneOffset.UTC);
final LocalDate date = localDateTime.toLocalDate();
return resolveEra(date, localDateTime.format(TIMESTAMP_FORMATTER));
}

public static Object convertToDate(Object date) {
LocalDate localDate;
if (date instanceof Date) {
localDate = ((Date) date).toLocalDate();
} else {
localDate = LocalDate.parse(date.toString());
}
return resolveEra(localDate, localDate.toString());
}

public static String convertToTime(Object time) {
LocalTime localTime = LocalTime.parse(time.toString());
return localTime.format(TIME_FORMATTER);
}

public static String resolveEra(LocalDate date, String value) {
return isBCE(date) ? value.substring(1) + " BC" : value;
}

private static Instant toInstant(Object timestamp) {
if (timestamp instanceof Timestamp) {
return ((Timestamp) timestamp).toInstant();
} else {
return Instant.parse(timestamp.toString());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.protocol.models.JsonSchemaType.AIRBYTE_TYPE;
import static io.airbyte.protocol.models.JsonSchemaType.DATE;
import static io.airbyte.protocol.models.JsonSchemaType.DATE_TIME;
import static io.airbyte.protocol.models.JsonSchemaType.FORMAT;
import static io.airbyte.protocol.models.JsonSchemaType.TIME;
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITH_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITHOUT_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITH_TIMEZONE;

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import org.apache.commons.codec.binary.Hex;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
Expand All @@ -19,12 +33,14 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);

private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
private final String[] TEXT_TYPES =
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
private final String BYTEA_TYPE = "BYTEA";

@Override
public void configure(final Properties props) {}
Expand All @@ -39,9 +55,50 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerText(field, registration);
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerMoney(field, registration);
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
registerBytea(field, registration);
}
else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerNumber(field, registration);
}
}

private void registerNumber(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
// Bad solution
// We applied a solution like this for several reasons:
// 1. Regarding #13608, CDC and nor-CDC data output format should be the same.
// 2. In the non-CDC mode 'decimal' and 'numeric' values are put to JSON node as BigDecimal value.
// According to Jackson Object mapper configuration, all trailing zeros are omitted and
// numbers with decimal places are deserialized with exponent. (e.g. 1234567890.1234567 would
// be deserialized as 1.2345678901234567E9).
// 3. In the CDC mode 'decimal' and 'numeric' values are deserialized as a regular number (e.g.
// 1234567890.1234567 would be deserialized as 1234567890.1234567). Numbers without
// decimal places (e.g 1, 24, 354) are represented with trailing zero (e.g 1.0, 24.0, 354.0).
// One of solution to align deserialization for these 2 modes is setting
// DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS as true for ObjectMapper. But this breaks
// deserialization for other data-types.
// A worked solution was to keep deserialization for non-CDC mode as it is and change it for CDC one.
// The code below strips trailing zeros for integer numbers and represents number with exponent
// if this number has decimals point.
final double doubleValue = Double.parseDouble(x.toString());
var valueWithTruncatedZero = BigDecimal.valueOf(doubleValue).stripTrailingZeros().toString();
return valueWithTruncatedZero.contains(".") ? String.valueOf(doubleValue) : valueWithTruncatedZero;
});
}

private void registerBytea(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
return "\\x" + Hex.encodeHexString((byte[]) x);
});
}

private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
Expand All @@ -57,14 +114,23 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
}

private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final var fieldType = field.typeName();

registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof PGInterval) {
return convertInterval((PGInterval) x);
} else {
return DebeziumConverterUtils.convertDate(x);
}
return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x);
case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone(x);
case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp(x);
// Debezium doesn't handle era indicators
// https://github.com/airbytehq/airbyte/issues/14590
case "DATE" -> DateTimeConverter.convertToDate(x);
case "TIME" -> DateTimeConverter.convertToTime(x);
case "INTERVAL" -> convertInterval((PGInterval) x);
default -> DebeziumConverterUtils.convertDate(x);
};
});
}

Expand All @@ -84,11 +150,7 @@ private void registerMoney(final RelationalColumn field, final ConverterRegistra
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof Double) {
final BigDecimal result = BigDecimal.valueOf((Double) x);
if (result.compareTo(new BigDecimal("999999999999999")) == 1
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) {
return null;
}
return result.toString();
return Double.toString(result.doubleValue());
} else {
return x.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -35,7 +36,15 @@
import java.time.OffsetTime;
import java.time.format.DateTimeParseException;
import java.util.Collections;
import org.postgresql.geometric.PGbox;
import org.postgresql.geometric.PGcircle;
import org.postgresql.geometric.PGline;
import org.postgresql.geometric.PGlseg;
import org.postgresql.geometric.PGpath;
import org.postgresql.geometric.PGpoint;
import org.postgresql.geometric.PGpolygon;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.postgresql.util.PGobject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -173,6 +182,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex);
case "circle" -> putObject(json, columnName, resultSet, colIndex, PGcircle.class);
case "box" -> putObject(json, columnName, resultSet, colIndex, PGbox.class);
case "double precision", "float", "float8" -> putFloat8(json, columnName, resultSet, colIndex);
case "line" -> putObject(json, columnName, resultSet, colIndex, PGline.class);
case "lseg" -> putObject(json, columnName, resultSet, colIndex, PGlseg.class);
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
default -> {
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
Expand All @@ -198,19 +215,19 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob

@Override
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
final LocalDate date = getObject(resultSet, index, LocalDate.class);
node.put(columnName, resolveEra(date, date.toString()));
}

@Override
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
final LocalTime time = getObject(resultSet, index, LocalTime.class);
node.put(columnName, time.format(TIME_FORMATTER));
}

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();
node.put(columnName, resolveEra(date, timestamp.format(TIMESTAMP_FORMATTER)));
}
Expand Down Expand Up @@ -261,6 +278,26 @@ protected void putBoolean(final ObjectNode node, final String columnName, final
node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t"));
}

protected void putFloat8(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
throws SQLException {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
node.put(columnName, bigDecimal.setScale(resultSet.getMetaData().getScale(index), RoundingMode.HALF_EVEN).doubleValue());
} else {
node.put(columnName, (BigDecimal) null);
}
}

protected <T extends PGobject> void putObject(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index,
Class<T> clazz)
throws SQLException {
final T object = getObject(resultSet, index, clazz);
node.put(columnName, object.getValue());
}

protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
Expand Down
Loading

0 comments on commit 7b852e5

Please sign in to comment.