Skip to content

Commit

Permalink
13608 & 12026 - align regular and CDC integration tests and data mappers
Browse files Browse the repository at this point in the history
  • Loading branch information
yurii-bidiuk committed Jul 12, 2022
1 parent 743e6c2 commit 0a6b99a
Show file tree
Hide file tree
Showing 8 changed files with 710 additions and 1,004 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,17 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}

protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
}

protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
LocalDate localDate = timestamptz.toLocalDate();
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBCE;

import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

public class DateTimeConverter {

public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern(
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]");

public static String convertToTimeWithTimezone(String time) {
OffsetTime timetz = OffsetTime.parse(time, TIME_WITH_TIMEZONE_FORMATTER);
return timetz.format(TIMETZ_FORMATTER);
}

public static String convertToTimestampWithTimezone(Timestamp timestamp) {
OffsetDateTime timestamptz = OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneOffset.UTC);
LocalDate localDate = timestamptz.toLocalDate();
return resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER));
}

public static String convertToTimestamp(Timestamp timestamp) {
final LocalDateTime localDateTime = LocalDateTime.ofInstant(timestamp.toInstant(),
ZoneOffset.UTC);
final LocalDate date = localDateTime.toLocalDate();
return resolveEra(date, localDateTime.format(TIMESTAMP_FORMATTER));
}

public static Object convertToDate(Date date) {
LocalDate localDate = date.toLocalDate();
return resolveEra(localDate, localDate.toString());
}

public static String convertToTime(String time) {
LocalTime localTime = LocalTime.parse(time);
return localTime.format(TIME_FORMATTER);
}

public static String resolveEra(LocalDate date, String value) {
return isBCE(date) ? value.substring(1) + " BC" : value;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.protocol.models.JsonSchemaType.AIRBYTE_TYPE;
import static io.airbyte.protocol.models.JsonSchemaType.DATE;
import static io.airbyte.protocol.models.JsonSchemaType.DATE_TIME;
import static io.airbyte.protocol.models.JsonSchemaType.FORMAT;
import static io.airbyte.protocol.models.JsonSchemaType.TIME;
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITH_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITHOUT_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITH_TIMEZONE;

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import org.apache.commons.codec.binary.Hex;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
Expand All @@ -19,12 +33,14 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);

private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
private final String[] TEXT_TYPES =
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
private final String BYTEA_TYPE = "BYTEA";

@Override
public void configure(final Properties props) {}
Expand All @@ -39,9 +55,31 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerText(field, registration);
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerMoney(field, registration);
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
registerBytea(field, registration);
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerNumber(field, registration);
}
}

private void registerNumber(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.float64().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
return new BigDecimal(x.toString()).doubleValue();
});
}

private void registerBytea(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
return "\\x" + Hex.encodeHexString((byte[]) x);
});
}

private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
Expand All @@ -57,14 +95,23 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
}

private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
final var fieldType = field.typeName();

registration.register(getJsonSchema(fieldType).optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof PGInterval) {
return convertInterval((PGInterval) x);
} else {
return DebeziumConverterUtils.convertDate(x);
}
return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x.toString());
case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone((Timestamp) x);
case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp((Timestamp) x);
// Debezium doesn't handle era indicators
// https://github.com/airbytehq/airbyte/issues/14590
case "DATE" -> DateTimeConverter.convertToDate((Date) x);
case "TIME" -> DateTimeConverter.convertToTime(x.toString());
case "INTERVAL" -> convertInterval((PGInterval) x);
default -> DebeziumConverterUtils.convertDate(x);
};
});
}

Expand All @@ -84,11 +131,7 @@ private void registerMoney(final RelationalColumn field, final ConverterRegistra
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof Double) {
final BigDecimal result = BigDecimal.valueOf((Double) x);
if (result.compareTo(new BigDecimal("999999999999999")) == 1
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) {
return null;
}
return result.toString();
return Double.toString(result.doubleValue());
} else {
return x.toString();
}
Expand Down Expand Up @@ -131,4 +174,15 @@ private boolean isNegativeTime(final PGInterval pgInterval) {
|| pgInterval.getWholeSeconds() < 0;
}

private SchemaBuilder getJsonSchema(final String fieldType) {
return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "TIMETZ" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITH_TIMEZONE);
case "TIMESTAMPTZ" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITH_TIMEZONE);
case "TIMESTAMP" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITHOUT_TIMEZONE);
case "DATE" -> SchemaBuilder.string().parameter(FORMAT, DATE);
case "TIME" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITHOUT_TIMEZONE);
default -> SchemaBuilder.string();
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -35,7 +36,15 @@
import java.time.OffsetTime;
import java.time.format.DateTimeParseException;
import java.util.Collections;
import org.postgresql.geometric.PGbox;
import org.postgresql.geometric.PGcircle;
import org.postgresql.geometric.PGline;
import org.postgresql.geometric.PGlseg;
import org.postgresql.geometric.PGpath;
import org.postgresql.geometric.PGpoint;
import org.postgresql.geometric.PGpolygon;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.postgresql.util.PGobject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -173,6 +182,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex);
case "circle" -> putObject(json, columnName, resultSet, colIndex, PGcircle.class);
case "box" -> putObject(json, columnName, resultSet, colIndex, PGbox.class);
case "double precision", "float", "float8" -> putFloat8(json, columnName, resultSet, colIndex);
case "line" -> putObject(json, columnName, resultSet, colIndex, PGline.class);
case "lseg" -> putObject(json, columnName, resultSet, colIndex, PGlseg.class);
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
default -> {
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
Expand All @@ -198,19 +215,19 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob

@Override
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
final LocalDate date = getObject(resultSet, index, LocalDate.class);
node.put(columnName, resolveEra(date, date.toString()));
}

@Override
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
final LocalTime time = getObject(resultSet, index, LocalTime.class);
node.put(columnName, time.format(TIME_FORMATTER));
}

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();
node.put(columnName, resolveEra(date, timestamp.format(TIMESTAMP_FORMATTER)));
}
Expand Down Expand Up @@ -261,10 +278,30 @@ protected void putBoolean(final ObjectNode node, final String columnName, final
node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t"));
}

protected void putFloat8(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
throws SQLException {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
node.put(columnName, bigDecimal.setScale(resultSet.getMetaData().getScale(index), RoundingMode.HALF_EVEN).doubleValue());
} else {
node.put(columnName, (BigDecimal) null);
}
}

protected <T extends PGobject> void putObject(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index,
Class<T> clazz)
throws SQLException {
final T object = getObject(resultSet, index, clazz);
node.put(columnName, object.getValue());
}

protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
node.put(columnName, bigDecimal);
node.put(columnName, bigDecimal.doubleValue());
} else {
// Special values (Infinity, -Infinity, and NaN) is default to null for now.
// https://github.com/airbytehq/airbyte/issues/8902
Expand Down
Loading

0 comments on commit 0a6b99a

Please sign in to comment.