Skip to content

Commit

Permalink
🎉Source Postgres: 13608, 12026, 14590 - Align regular and CDC integra…
Browse files Browse the repository at this point in the history
…tion tests and data mappers; improve BCE date handling (#14534)

* 13608 & 12026 - align regular and CDC integration tests and data mappers

* format code

* update int handling

* fix build

* fix PR remarks

* revert changes for money type that are broken by #7338

* bump version

* 🐛 Source Postgres: Improve BCE date handling (#15187)

* 13608 & 12026 - align regular and CDC integration tests and data mappers

* format code

* update int handling

* borked merge - re-delete deleted methods

* enable catalog tests for postgres

* fix build

* fix PR remarks

* revert changes for money type that are broken by #7338

* update BCE handling in JDBC

* reuse existing method

* handle bce dates

* inline methods

* fix JDBC BCE year inconsistency

* use correct data type in test

* format

* Update airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java

Co-authored-by: Edward Gao <[email protected]>

* pmd fix

* use class.getname()

* fix pmd

* format

* bump version

* handle incremental mode

* clean up diff

* more comments

* unused imports

* format

* versions+changelog

Co-authored-by: Yurii Bidiuk <[email protected]>
Co-authored-by: Yurii Bidiuk <[email protected]>

* auto-bump connector version [ci skip]

Co-authored-by: Edward Gao <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
3 people authored Aug 3, 2022
1 parent 316502c commit 708802d
Show file tree
Hide file tree
Showing 15 changed files with 848 additions and 1,005 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.39
dockerImageTag: 0.4.40
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7140,7 +7140,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.39"
- dockerImage: "airbyte/source-postgres:0.4.40"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
*/
public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implements JdbcCompatibleSourceOperations<Datatype> {

/**
* A Date representing the earliest date in CE. Any date before this is in BCE.
*/
private static final Date ONE_CE = Date.valueOf("0001-01-01");

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
Expand Down Expand Up @@ -253,27 +258,68 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}

protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
}

protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
LocalDate localDate = timestamptz.toLocalDate();
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
}

protected String resolveEra(LocalDate date, String value) {
return isBCE(date) ? value.substring(1) + " BC" : value;
/**
* Modifies a string representation of a date/timestamp and normalizes its era indicator.
* Specifically, if this is a BCE value:
* <ul>
* <li>The leading negative sign will be removed if present</li>
* <li>The "BC" suffix will be appended, if not already present</li>
* </ul>
*
* You most likely would prefer to call one of the overloaded methods, which accept temporal types.
*/
public static String resolveEra(boolean isBce, String value) {
String mangledValue = value;
if (isBce) {
if (mangledValue.startsWith("-")) {
mangledValue = mangledValue.substring(1);
}
if (!mangledValue.endsWith(" BC")) {
mangledValue += " BC";
}
}
return mangledValue;
}

public static boolean isBCE(LocalDate date) {
public static boolean isBce(LocalDate date) {
return date.getEra().equals(IsoEra.BCE);
}

public static String resolveEra(LocalDate date, String value) {
return resolveEra(isBce(date), value);
}

/**
* java.sql.Date objects don't properly represent their era (for example, using toLocalDate() always
* returns an object in CE). So to determine the era, we just check whether the date is before 1 AD.
*
* This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.), but
* my understanding is that {@link #ONE_CE} has the same weirdness, so it cancels out.
*/
public static String resolveEra(Date date, String value) {
return resolveEra(date.before(ONE_CE), value);
}

/**
* See {@link #resolveEra(Date, String)} for explanation.
*/
public static String resolveEra(Timestamp timestamp, String value) {
return resolveEra(timestamp.before(ONE_CE), value);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBce;
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.resolveEra;

import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

public class DateTimeConverter {

public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern(
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]");

public static String convertToTimeWithTimezone(Object time) {
OffsetTime timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER);
return timetz.format(TIMETZ_FORMATTER);
}

public static String convertToTimestampWithTimezone(Object timestamp) {
if (timestamp instanceof Timestamp t) {
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually mangles the
// value for ancient dates, because leap years weren't applied consistently in ye olden days.
// Additionally, toInstant() (and toLocalDateTime()) actually lose the era indicator, so we can't
// rely on their getEra() methods.
// So we have special handling for this case, which sidesteps the toInstant conversion.
ZonedDateTime timestamptz = t.toLocalDateTime().atZone(ZoneOffset.UTC);
String value = timestamptz.format(TIMESTAMPTZ_FORMATTER);
return resolveEra(t, value);
} else if (timestamp instanceof OffsetDateTime t) {
// In incremental mode, debezium emits java.time.OffsetDateTime objects.
// java.time classes have a year 0, but the standard AD/BC system does not. For example,
// "0001-01-01 BC" is represented as LocalDate("0000-01-01").
// We just subtract one year to hack around this difference.
LocalDate localDate = t.toLocalDate();
if (isBce(localDate)) {
t = t.minusYears(1);
}
return resolveEra(localDate, t.toString());
} else {
// This case probably isn't strictly necessary, but I'm leaving it just in case there's some weird
// situation that I'm not aware of.
Instant instant = Instant.parse(timestamp.toString());
OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
ZonedDateTime timestamptz = ZonedDateTime.from(offsetDateTime);
LocalDate localDate = timestamptz.toLocalDate();
String value = timestamptz.format(TIMESTAMPTZ_FORMATTER);
return resolveEra(localDate, value);
}
}

/**
* See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening
* here.
*/
public static String convertToTimestamp(Object timestamp) {
if (timestamp instanceof Timestamp t) {
// Snapshot mode
LocalDateTime localDateTime = t.toLocalDateTime();
String value = localDateTime.format(TIMESTAMP_FORMATTER);
return resolveEra(t, value);
} else if (timestamp instanceof Instant i) {
// Incremental mode
LocalDate localDate = i.atZone(ZoneOffset.UTC).toLocalDate();
if (isBce(localDate)) {
// i.minus(1, ChronoUnit.YEARS) would be nice, but it throws an exception because you can't subtract
// YEARS from an Instant
i = i.atZone(ZoneOffset.UTC).minusYears(1).toInstant();
}
return resolveEra(localDate, i.toString());
} else {
LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString());
final LocalDate date = localDateTime.toLocalDate();
String value = localDateTime.format(TIMESTAMP_FORMATTER);
return resolveEra(date, value);
}
}

/**
* See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening
* here.
*/
public static Object convertToDate(Object date) {
if (date instanceof Date d) {
// Snapshot mode
LocalDate localDate = ((Date) date).toLocalDate();
return resolveEra(d, localDate.toString());
} else if (date instanceof LocalDate d) {
// Incremental mode
if (isBce(d)) {
d = d.minusYears(1);
}
return resolveEra(d, d.toString());
} else {
LocalDate localDate = LocalDate.parse(date.toString());
return resolveEra(localDate, localDate.toString());
}
}

public static String convertToTime(Object time) {
LocalTime localTime = LocalTime.parse(time.toString());
return localTime.format(TIME_FORMATTER);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import org.apache.commons.codec.binary.Hex;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
Expand All @@ -19,12 +21,14 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);

private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
private final String[] TEXT_TYPES =
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
private final String BYTEA_TYPE = "BYTEA";

@Override
public void configure(final Properties props) {}
Expand All @@ -39,9 +43,50 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerText(field, registration);
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerMoney(field, registration);
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
registerBytea(field, registration);
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerNumber(field, registration);
}
}

private void registerNumber(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
// Bad solution
// We applied a solution like this for several reasons:
// 1. Regarding #13608, CDC and nor-CDC data output format should be the same.
// 2. In the non-CDC mode 'decimal' and 'numeric' values are put to JSON node as BigDecimal value.
// According to Jackson Object mapper configuration, all trailing zeros are omitted and
// numbers with decimal places are deserialized with exponent. (e.g. 1234567890.1234567 would
// be deserialized as 1.2345678901234567E9).
// 3. In the CDC mode 'decimal' and 'numeric' values are deserialized as a regular number (e.g.
// 1234567890.1234567 would be deserialized as 1234567890.1234567). Numbers without
// decimal places (e.g 1, 24, 354) are represented with trailing zero (e.g 1.0, 24.0, 354.0).
// One of solution to align deserialization for these 2 modes is setting
// DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS as true for ObjectMapper. But this breaks
// deserialization for other data-types.
// A worked solution was to keep deserialization for non-CDC mode as it is and change it for CDC
// one.
// The code below strips trailing zeros for integer numbers and represents number with exponent
// if this number has decimals point.
final double doubleValue = Double.parseDouble(x.toString());
var valueWithTruncatedZero = BigDecimal.valueOf(doubleValue).stripTrailingZeros().toString();
return valueWithTruncatedZero.contains(".") ? String.valueOf(doubleValue) : valueWithTruncatedZero;
});
}

private void registerBytea(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
return "\\x" + Hex.encodeHexString((byte[]) x);
});
}

private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
Expand All @@ -57,14 +102,21 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
}

private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final var fieldType = field.typeName();

registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof PGInterval) {
return convertInterval((PGInterval) x);
} else {
return DebeziumConverterUtils.convertDate(x);
}
return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x);
case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone(x);
case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp(x);
case "DATE" -> DateTimeConverter.convertToDate(x);
case "TIME" -> DateTimeConverter.convertToTime(x);
case "INTERVAL" -> convertInterval((PGInterval) x);
default -> DebeziumConverterUtils.convertDate(x);
};
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();

DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();

DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.39
LABEL io.airbyte.version=0.4.40
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.39
LABEL io.airbyte.version=0.4.40
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.debezium.internals.PostgresConverter;
import java.util.Properties;

public class PostgresCdcProperties {
Expand All @@ -27,7 +28,7 @@ private static Properties commonProperties() {
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");

props.setProperty("converters", "datetime");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.PostgresConverter");
props.setProperty("datetime.type", PostgresConverter.class.getName());
return props;
}

Expand Down
Loading

0 comments on commit 708802d

Please sign in to comment.