Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉Source Postgres: 13608, 12026, 14590 - Align regular and CDC integration tests and data mappers; improve BCE date handling #14534

Merged
merged 11 commits into from
Aug 3, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.39
dockerImageTag: 0.4.40
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7140,7 +7140,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.39"
- dockerImage: "airbyte/source-postgres:0.4.40"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
*/
public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implements JdbcCompatibleSourceOperations<Datatype> {

/**
* A Date representing the earliest date in CE. Any date before this is in BCE.
*/
private static final Date ONE_CE = Date.valueOf("0001-01-01");

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
Expand Down Expand Up @@ -253,27 +258,68 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}

protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
}

protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
LocalDate localDate = timestamptz.toLocalDate();
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
}

protected String resolveEra(LocalDate date, String value) {
return isBCE(date) ? value.substring(1) + " BC" : value;
/**
* Modifies a string representation of a date/timestamp and normalizes its era indicator.
* Specifically, if this is a BCE value:
* <ul>
* <li>The leading negative sign will be removed if present</li>
* <li>The "BC" suffix will be appended, if not already present</li>
* </ul>
*
* You most likely would prefer to call one of the overloaded methods, which accept temporal types.
*/
public static String resolveEra(boolean isBce, String value) {
String mangledValue = value;
if (isBce) {
if (mangledValue.startsWith("-")) {
mangledValue = mangledValue.substring(1);
}
if (!mangledValue.endsWith(" BC")) {
mangledValue += " BC";
}
}
return mangledValue;
}

public static boolean isBCE(LocalDate date) {
public static boolean isBce(LocalDate date) {
return date.getEra().equals(IsoEra.BCE);
}

public static String resolveEra(LocalDate date, String value) {
return resolveEra(isBce(date), value);
}

/**
* java.sql.Date objects don't properly represent their era (for example, using toLocalDate() always
* returns an object in CE). So to determine the era, we just check whether the date is before 1 AD.
*
* This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.), but
* my understanding is that {@link #ONE_CE} has the same weirdness, so it cancels out.
*/
public static String resolveEra(Date date, String value) {
return resolveEra(date.before(ONE_CE), value);
}

/**
* See {@link #resolveEra(Date, String)} for explanation.
*/
public static String resolveEra(Timestamp timestamp, String value) {
return resolveEra(timestamp.before(ONE_CE), value);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBce;
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.resolveEra;

import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

public class DateTimeConverter {

public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern(
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]");

public static String convertToTimeWithTimezone(Object time) {
OffsetTime timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonblocking, somewhat nitpicky; @subodh1810 I'd be interested in your opinion here: this toString -> parse -> format makes me a little nervous - it seems weird to take an object that might already be a Timestamp (or Date/Datetime/whatever) and convert it back-and-forth.

But also, checking a bunch of instanceof would be really gross :/ so I'm pretty conflicted about which option is less bad

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @subodh1810. Could you please share your opinion about that?

edgao marked this conversation as resolved.
Show resolved Hide resolved
return timetz.format(TIMETZ_FORMATTER);
}

public static String convertToTimestampWithTimezone(Object timestamp) {
if (timestamp instanceof Timestamp t) {
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually mangles the
// value for ancient dates, because leap years weren't applied consistently in ye olden days.
// Additionally, toInstant() (and toLocalDateTime()) actually lose the era indicator, so we can't
// rely on their getEra() methods.
// So we have special handling for this case, which sidesteps the toInstant conversion.
ZonedDateTime timestamptz = t.toLocalDateTime().atZone(ZoneOffset.UTC);
String value = timestamptz.format(TIMESTAMPTZ_FORMATTER);
return resolveEra(t, value);
} else if (timestamp instanceof OffsetDateTime t) {
// In incremental mode, debezium emits java.time.OffsetDateTime objects.
// java.time classes have a year 0, but the standard AD/BC system does not. For example,
// "0001-01-01 BC" is represented as LocalDate("0000-01-01").
// We just subtract one year to hack around this difference.
LocalDate localDate = t.toLocalDate();
if (isBce(localDate)) {
t = t.minusYears(1);
}
return resolveEra(localDate, t.toString());
} else {
// This case probably isn't strictly necessary, but I'm leaving it just in case there's some weird
// situation that I'm not aware of.
Instant instant = Instant.parse(timestamp.toString());
OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
ZonedDateTime timestamptz = ZonedDateTime.from(offsetDateTime);
LocalDate localDate = timestamptz.toLocalDate();
String value = timestamptz.format(TIMESTAMPTZ_FORMATTER);
return resolveEra(localDate, value);
}
}

/**
* See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening
* here.
*/
public static String convertToTimestamp(Object timestamp) {
if (timestamp instanceof Timestamp t) {
// Snapshot mode
LocalDateTime localDateTime = t.toLocalDateTime();
String value = localDateTime.format(TIMESTAMP_FORMATTER);
return resolveEra(t, value);
} else if (timestamp instanceof Instant i) {
// Incremental mode
LocalDate localDate = i.atZone(ZoneOffset.UTC).toLocalDate();
if (isBce(localDate)) {
// i.minus(1, ChronoUnit.YEARS) would be nice, but it throws an exception because you can't subtract
// YEARS from an Instant
i = i.atZone(ZoneOffset.UTC).minusYears(1).toInstant();
}
return resolveEra(localDate, i.toString());
} else {
LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString());
final LocalDate date = localDateTime.toLocalDate();
String value = localDateTime.format(TIMESTAMP_FORMATTER);
return resolveEra(date, value);
}
}

/**
* See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening
* here.
*/
public static Object convertToDate(Object date) {
if (date instanceof Date d) {
// Snapshot mode
LocalDate localDate = ((Date) date).toLocalDate();
return resolveEra(d, localDate.toString());
} else if (date instanceof LocalDate d) {
// Incremental mode
if (isBce(d)) {
d = d.minusYears(1);
}
return resolveEra(d, d.toString());
} else {
LocalDate localDate = LocalDate.parse(date.toString());
return resolveEra(localDate, localDate.toString());
}
}

public static String convertToTime(Object time) {
LocalTime localTime = LocalTime.parse(time.toString());
return localTime.format(TIME_FORMATTER);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import org.apache.commons.codec.binary.Hex;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
Expand All @@ -19,12 +21,14 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);

private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
private final String[] TEXT_TYPES =
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
private final String BYTEA_TYPE = "BYTEA";

@Override
public void configure(final Properties props) {}
Expand All @@ -39,9 +43,50 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerText(field, registration);
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerMoney(field, registration);
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
registerBytea(field, registration);
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerNumber(field, registration);
}
}

private void registerNumber(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
// Bad solution
// We applied a solution like this for several reasons:
// 1. Regarding #13608, CDC and nor-CDC data output format should be the same.
// 2. In the non-CDC mode 'decimal' and 'numeric' values are put to JSON node as BigDecimal value.
// According to Jackson Object mapper configuration, all trailing zeros are omitted and
// numbers with decimal places are deserialized with exponent. (e.g. 1234567890.1234567 would
// be deserialized as 1.2345678901234567E9).
// 3. In the CDC mode 'decimal' and 'numeric' values are deserialized as a regular number (e.g.
// 1234567890.1234567 would be deserialized as 1234567890.1234567). Numbers without
// decimal places (e.g 1, 24, 354) are represented with trailing zero (e.g 1.0, 24.0, 354.0).
// One of solution to align deserialization for these 2 modes is setting
// DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS as true for ObjectMapper. But this breaks
// deserialization for other data-types.
// A worked solution was to keep deserialization for non-CDC mode as it is and change it for CDC
// one.
// The code below strips trailing zeros for integer numbers and represents number with exponent
// if this number has decimals point.
final double doubleValue = Double.parseDouble(x.toString());
var valueWithTruncatedZero = BigDecimal.valueOf(doubleValue).stripTrailingZeros().toString();
return valueWithTruncatedZero.contains(".") ? String.valueOf(doubleValue) : valueWithTruncatedZero;
});
}

private void registerBytea(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a property binary.handling.mode that we can use

Specifies how binary (bytea) columns should be represented in change events:

bytes represents binary data as byte array.

base64 represents binary data as base64-encoded strings.

hex represents binary data as hex-encoded (base16) strings.

registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
return "\\x" + Hex.encodeHexString((byte[]) x);
});
}

private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
Expand All @@ -57,14 +102,21 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
}

private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final var fieldType = field.typeName();

registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof PGInterval) {
return convertInterval((PGInterval) x);
} else {
return DebeziumConverterUtils.convertDate(x);
}
return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x);
edgao marked this conversation as resolved.
Show resolved Hide resolved
case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone(x);
case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp(x);
case "DATE" -> DateTimeConverter.convertToDate(x);
case "TIME" -> DateTimeConverter.convertToTime(x);
case "INTERVAL" -> convertInterval((PGInterval) x);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a debezium property which we can specify so that debezium can handle interval data type differently Please take a look at this and let me know what you think of it. https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-interval-handling-mode

default -> DebeziumConverterUtils.convertDate(x);
};
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();

DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();

DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.39
LABEL io.airbyte.version=0.4.40
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.39
LABEL io.airbyte.version=0.4.40
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.debezium.internals.PostgresConverter;
import java.util.Properties;

public class PostgresCdcProperties {
Expand All @@ -27,7 +28,7 @@ private static Properties commonProperties() {
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");

props.setProperty("converters", "datetime");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.PostgresConverter");
props.setProperty("datetime.type", PostgresConverter.class.getName());
return props;
}

Expand Down
Loading