Skip to content

Commit

Permalink
13608 & 12026 - align regular and CDC integration tests and data mappers
Browse files Browse the repository at this point in the history
  • Loading branch information
yurii-bidiuk committed Jul 8, 2022
1 parent 806c939 commit 52cd91b
Show file tree
Hide file tree
Showing 7 changed files with 705 additions and 999 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,17 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}

protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
}

protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
LocalDate localDate = timestamptz.toLocalDate();
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.airbyte.integrations.debezium.internals;

import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
import static java.time.temporal.ChronoField.NANO_OF_SECOND;
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;

import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneId;
import java.time.chrono.IsoEra;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;

public class DateTimeConverter {

public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.append(new DateTimeFormatterBuilder()
.appendValue(HOUR_OF_DAY, 2)
.appendLiteral(':')
.appendValue(MINUTE_OF_HOUR, 2)
.optionalStart()
.appendLiteral(':')
.appendValue(SECOND_OF_MINUTE, 2)
.optionalStart()
.appendFraction(NANO_OF_SECOND, 0, 9, true).toFormatter()).appendOffset("+HH", "0")
.toFormatter();


public static String convertToTimeWithTimezone(String time) {
OffsetTime timetz = OffsetTime.parse(time, TIME_WITH_TIMEZONE_FORMATTER);
return timetz.format(TIMETZ_FORMATTER);
}

public static String convertToTimestampWithTimezone(Timestamp timestamp) {
OffsetDateTime timestamptz = OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC"));
LocalDate localDate = timestamptz.toLocalDate();
return resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER));
}

public static String convertToTimestamp(Timestamp timestamp) {
final LocalDateTime localDateTime = LocalDateTime.ofInstant(timestamp.toInstant(),
ZoneId.of("UTC"));
final LocalDate date = localDateTime.toLocalDate();
return resolveEra(date, localDateTime.format(TIMESTAMP_FORMATTER));
}

public static String resolveEra(LocalDate date, String value) {
return isBCE(date) ? value.substring(1) + " BC" : value;
}

public static boolean isBCE(LocalDate date) {
return date.getEra().equals(IsoEra.BCE);
}

public static Object convertToDate(Date date) {
LocalDate localDate = date.toLocalDate();

return resolveEra(localDate, localDate.toString());
}

public static String convertToTime(String time) {
LocalTime localTime = LocalTime.parse(time);
return localTime.format(TIME_FORMATTER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.protocol.models.JsonSchemaType.AIRBYTE_TYPE;
import static io.airbyte.protocol.models.JsonSchemaType.DATE;
import static io.airbyte.protocol.models.JsonSchemaType.DATE_TIME;
import static io.airbyte.protocol.models.JsonSchemaType.FORMAT;
import static io.airbyte.protocol.models.JsonSchemaType.TIME;
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITH_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITHOUT_TIMEZONE;
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITH_TIMEZONE;

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import org.apache.commons.codec.binary.Hex;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
Expand All @@ -19,12 +33,13 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);

private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
private final String[] TEXT_TYPES =
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
private final String BYTEA_TYPE = "BYTEA";

@Override
public void configure(final Properties props) {}
Expand All @@ -39,9 +54,20 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerText(field, registration);
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerMoney(field, registration);
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
registerBytea(field, registration);
}
}

private void registerBytea(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
return "\\x" + Hex.encodeHexString((byte[]) x);
});
}

private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
Expand All @@ -57,14 +83,21 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
}

private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
final var fieldType = field.typeName();

registration.register(getJsonSchema(fieldType).optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof PGInterval) {
return convertInterval((PGInterval) x);
} else {
return DebeziumConverterUtils.convertDate(x);
}
return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x.toString());
case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone((Timestamp) x);
case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp((Timestamp) x);
case "DATE" -> DateTimeConverter.convertToDate((Date) x);
case "TIME" -> DateTimeConverter.convertToTime(x.toString());
case "INTERVAL" -> convertInterval((PGInterval) x);
default -> DebeziumConverterUtils.convertDate(x);
};
});
}

Expand All @@ -84,11 +117,7 @@ private void registerMoney(final RelationalColumn field, final ConverterRegistra
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof Double) {
final BigDecimal result = BigDecimal.valueOf((Double) x);
if (result.compareTo(new BigDecimal("999999999999999")) == 1
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) {
return null;
}
return result.toString();
return Double.toString(result.doubleValue());
} else {
return x.toString();
}
Expand Down Expand Up @@ -131,4 +160,15 @@ private boolean isNegativeTime(final PGInterval pgInterval) {
|| pgInterval.getWholeSeconds() < 0;
}

private SchemaBuilder getJsonSchema(final String fieldType) {
return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "TIMETZ" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITH_TIMEZONE);
case "TIMESTAMPTZ" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITH_TIMEZONE);
case "TIMESTAMP" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITHOUT_TIMEZONE);
case "DATE" -> SchemaBuilder.string().parameter(FORMAT, DATE);
case "TIME" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITHOUT_TIMEZONE);
default -> SchemaBuilder.string();
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -35,7 +36,15 @@
import java.time.OffsetTime;
import java.time.format.DateTimeParseException;
import java.util.Collections;
import org.postgresql.geometric.PGbox;
import org.postgresql.geometric.PGcircle;
import org.postgresql.geometric.PGline;
import org.postgresql.geometric.PGlseg;
import org.postgresql.geometric.PGpath;
import org.postgresql.geometric.PGpoint;
import org.postgresql.geometric.PGpolygon;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.postgresql.util.PGobject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -173,6 +182,15 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex);
case "circle" -> putObject(json, columnName, resultSet, colIndex, PGcircle.class);
case "box" -> putObject(json, columnName, resultSet, colIndex, PGbox.class);
case "double precision", "float", "float8" -> putFloat8(json, columnName, resultSet, colIndex);
case "line" -> putObject(json, columnName, resultSet, colIndex, PGline.class);
case "lseg" -> putObject(json, columnName, resultSet, colIndex, PGlseg.class);
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
// case "tsvector" -> putString(json, columnName, resultSet, colIndex);
default -> {
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
Expand All @@ -198,19 +216,19 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob

@Override
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
final LocalDate date = getObject(resultSet, index, LocalDate.class);
node.put(columnName, resolveEra(date, date.toString()));
}

@Override
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
final LocalTime time = getObject(resultSet, index, LocalTime.class);
node.put(columnName, time.format(TIME_FORMATTER));
}

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();
node.put(columnName, resolveEra(date, timestamp.format(TIMESTAMP_FORMATTER)));
}
Expand Down Expand Up @@ -261,6 +279,34 @@ protected void putBoolean(final ObjectNode node, final String columnName, final
node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t"));
}

protected void putFloat8(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
throws SQLException {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
node.put(columnName, bigDecimal.setScale(resultSet.getMetaData().getScale(index), RoundingMode.HALF_EVEN).doubleValue());
return;
}

// ResultSet#getBigDecimal cannot handle Infinity, -Infinity, or NaN, and will throw exception,
// which becomes null. So we need to check these special values as string.
final String value = resultSet.getString(index);
if (value.equalsIgnoreCase("infinity")) {
node.put(columnName, Double.POSITIVE_INFINITY);
} else if (value.equalsIgnoreCase("-infinity")) {
node.put(columnName, Double.NEGATIVE_INFINITY);
} else if (value.equalsIgnoreCase("nan")) {
node.put(columnName, Double.NaN);
} else {
node.put(columnName, (BigDecimal) null);
}
}

protected <T extends PGobject> void putObject(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index, Class<T> clazz)
throws SQLException {
final T object = getObject(resultSet, index, clazz);
node.put(columnName, object.getValue());
}

protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
Expand Down
Loading

0 comments on commit 52cd91b

Please sign in to comment.