From 9eaf40bed0808c9e3560892d0741ecefb3912b29 Mon Sep 17 00:00:00 2001 From: Yurii Bidiuk Date: Fri, 8 Jul 2022 14:34:31 +0300 Subject: [PATCH 1/9] 13608 & 12026 - align regular and CDC integration tests and data mappers --- ...bstractJdbcCompatibleSourceOperations.java | 6 +- .../debezium/internals/DateTimeConverter.java | 74 +++ .../debezium/internals/PostgresConverter.java | 84 ++- .../postgres/PostgresSourceOperations.java | 43 +- .../AbstractPostgresSourceDatatypeTest.java | 526 ++++++++++++++++++ .../CdcPostgresSourceDatatypeTest.java | 477 +--------------- .../sources/PostgresSourceDatatypeTest.java | 513 +---------------- docs/integrations/sources/postgres.md | 6 +- 8 files changed, 729 insertions(+), 1000 deletions(-) create mode 100644 airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java create mode 100644 airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index ea4910c16518..435f7281a93d 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -253,17 +253,17 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection, return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName; } - protected DateTime getDateTimeObject(ResultSet resultSet, int index, Class clazz) throws SQLException { + protected ObjectType getObject(ResultSet resultSet, int index, Class clazz) throws SQLException { return resultSet.getObject(index, clazz); } protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class); + OffsetTime timetz = getObject(resultSet, index, OffsetTime.class); node.put(columnName, timetz.format(TIMETZ_FORMATTER)); } protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class); + OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class); LocalDate localDate = timestamptz.toLocalDate(); node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER))); } diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java new file mode 100644 index 000000000000..f6062b8d8ff1 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER; +import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBCE; + +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +public class DateTimeConverter { + + public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern( + "HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]"); + + public static String convertToTimeWithTimezone(Object time) { + OffsetTime timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER); + return timetz.format(TIMETZ_FORMATTER); + } + + public static String convertToTimestampWithTimezone(Object timestamp) { + OffsetDateTime timestamptz = OffsetDateTime.ofInstant(toInstant(timestamp), ZoneOffset.UTC); + LocalDate localDate = timestamptz.toLocalDate(); + return resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)); + } + + public static String convertToTimestamp(Object timestamp) { + final LocalDateTime localDateTime = LocalDateTime.ofInstant(toInstant(timestamp), + ZoneOffset.UTC); + final LocalDate date = localDateTime.toLocalDate(); + return resolveEra(date, localDateTime.format(TIMESTAMP_FORMATTER)); + } + + public static Object convertToDate(Object date) { + LocalDate localDate; + if (date instanceof Date) { + localDate = ((Date) date).toLocalDate(); + } else { + localDate = LocalDate.parse(date.toString()); + } + return resolveEra(localDate, localDate.toString()); + } + + public static String convertToTime(Object time) { + LocalTime localTime = LocalTime.parse(time.toString()); + return localTime.format(TIME_FORMATTER); + } + + public static String resolveEra(LocalDate date, String value) { + return isBCE(date) ? value.substring(1) + " BC" : value; + } + + private static Instant toInstant(Object timestamp) { + if (timestamp instanceof Timestamp) { + return ((Timestamp) timestamp).toInstant(); + } else { + return Instant.parse(timestamp.toString()); + } + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index aee741b6aaca..a7f67855f9ae 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -4,12 +4,26 @@ package io.airbyte.integrations.debezium.internals; +import static io.airbyte.protocol.models.JsonSchemaType.AIRBYTE_TYPE; +import static io.airbyte.protocol.models.JsonSchemaType.DATE; +import static io.airbyte.protocol.models.JsonSchemaType.DATE_TIME; +import static io.airbyte.protocol.models.JsonSchemaType.FORMAT; +import static io.airbyte.protocol.models.JsonSchemaType.TIME; +import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE; +import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITH_TIMEZONE; +import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITHOUT_TIMEZONE; +import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITH_TIMEZONE; + import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Timestamp; import java.util.Arrays; +import java.util.Locale; import java.util.Properties; +import org.apache.commons.codec.binary.Hex; import org.apache.kafka.connect.data.SchemaBuilder; import org.postgresql.util.PGInterval; import org.slf4j.Logger; @@ -19,12 +33,14 @@ public class PostgresConverter implements CustomConverter s.equalsIgnoreCase(field.typeName()))) { registerMoney(field, registration); + } else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) { + registerBytea(field, registration); + } + else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerNumber(field, registration); } } + private void registerNumber(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.string().optional(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } +// Bad solution +// We applied a solution like this for several reasons: +// 1. Regarding #13608, CDC and nor-CDC data output format should be the same. +// 2. In the non-CDC mode 'decimal' and 'numeric' values are put to JSON node as BigDecimal value. +// According to Jackson Object mapper configuration, all trailing zeros are omitted and +// numbers with decimal places are deserialized with exponent. (e.g. 1234567890.1234567 would +// be deserialized as 1.2345678901234567E9). +// 3. In the CDC mode 'decimal' and 'numeric' values are deserialized as a regular number (e.g. +// 1234567890.1234567 would be deserialized as 1234567890.1234567). Numbers without +// decimal places (e.g 1, 24, 354) are represented with trailing zero (e.g 1.0, 24.0, 354.0). +// One of solution to align deserialization for these 2 modes is setting +// DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS as true for ObjectMapper. But this breaks +// deserialization for other data-types. +// A worked solution was to keep deserialization for non-CDC mode as it is and change it for CDC one. +// The code below strips trailing zeros for integer numbers and represents number with exponent +// if this number has decimals point. + final double doubleValue = Double.parseDouble(x.toString()); + var valueWithTruncatedZero = BigDecimal.valueOf(doubleValue).stripTrailingZeros().toString(); + return valueWithTruncatedZero.contains(".") ? String.valueOf(doubleValue) : valueWithTruncatedZero; + }); + } + + private void registerBytea(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.string().optional(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + return "\\x" + Hex.encodeHexString((byte[]) x); + }); + } + private void registerText(final RelationalColumn field, final ConverterRegistration registration) { registration.register(SchemaBuilder.string().optional(), x -> { if (x == null) { @@ -57,14 +114,23 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat } private void registerDate(final RelationalColumn field, final ConverterRegistration registration) { + final var fieldType = field.typeName(); + registration.register(SchemaBuilder.string().optional(), x -> { if (x == null) { return DebeziumConverterUtils.convertDefaultValue(field); - } else if (x instanceof PGInterval) { - return convertInterval((PGInterval) x); - } else { - return DebeziumConverterUtils.convertDate(x); } + return switch (fieldType.toUpperCase(Locale.ROOT)) { + case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x); + case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone(x); + case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp(x); + // Debezium doesn't handle era indicators + // https://github.com/airbytehq/airbyte/issues/14590 + case "DATE" -> DateTimeConverter.convertToDate(x); + case "TIME" -> DateTimeConverter.convertToTime(x); + case "INTERVAL" -> convertInterval((PGInterval) x); + default -> DebeziumConverterUtils.convertDate(x); + }; }); } @@ -84,11 +150,7 @@ private void registerMoney(final RelationalColumn field, final ConverterRegistra return DebeziumConverterUtils.convertDefaultValue(field); } else if (x instanceof Double) { final BigDecimal result = BigDecimal.valueOf((Double) x); - if (result.compareTo(new BigDecimal("999999999999999")) == 1 - || result.compareTo(new BigDecimal("-999999999999999")) == -1) { - return null; - } - return result.toString(); + return Double.toString(result.doubleValue()); } else { return x.toString(); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 481f82304436..658b02fcd4e4 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -23,6 +23,7 @@ import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; +import java.math.RoundingMode; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -35,7 +36,15 @@ import java.time.OffsetTime; import java.time.format.DateTimeParseException; import java.util.Collections; +import org.postgresql.geometric.PGbox; +import org.postgresql.geometric.PGcircle; +import org.postgresql.geometric.PGline; +import org.postgresql.geometric.PGlseg; +import org.postgresql.geometric.PGpath; +import org.postgresql.geometric.PGpoint; +import org.postgresql.geometric.PGpolygon; import org.postgresql.jdbc.PgResultSetMetaData; +import org.postgresql.util.PGobject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,6 +182,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex); case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex); + case "circle" -> putObject(json, columnName, resultSet, colIndex, PGcircle.class); + case "box" -> putObject(json, columnName, resultSet, colIndex, PGbox.class); + case "double precision", "float", "float8" -> putFloat8(json, columnName, resultSet, colIndex); + case "line" -> putObject(json, columnName, resultSet, colIndex, PGline.class); + case "lseg" -> putObject(json, columnName, resultSet, colIndex, PGlseg.class); + case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class); + case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class); + case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class); default -> { switch (columnType) { case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); @@ -198,19 +215,19 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob @Override protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class); + final LocalDate date = getObject(resultSet, index, LocalDate.class); node.put(columnName, resolveEra(date, date.toString())); } @Override protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class); + final LocalTime time = getObject(resultSet, index, LocalTime.class); node.put(columnName, time.format(TIME_FORMATTER)); } @Override protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); + final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class); final LocalDate date = timestamp.toLocalDate(); node.put(columnName, resolveEra(date, timestamp.format(TIMESTAMP_FORMATTER))); } @@ -264,6 +281,26 @@ protected void putBoolean(final ObjectNode node, final String columnName, final } @Override + protected void putFloat8(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) + throws SQLException { + final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index)); + if (bigDecimal != null) { + node.put(columnName, bigDecimal.setScale(resultSet.getMetaData().getScale(index), RoundingMode.HALF_EVEN).doubleValue()); + } else { + node.put(columnName, (BigDecimal) null); + } + } + + protected void putObject(final ObjectNode node, + final String columnName, + final ResultSet resultSet, + final int index, + Class clazz) + throws SQLException { + final T object = getObject(resultSet, index, clazz); + node.put(columnName, object.getValue()); + } + protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) { final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index)); if (bigDecimal != null) { diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java new file mode 100644 index 000000000000..d383ad927f5c --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -0,0 +1,526 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTest; +import io.airbyte.integrations.standardtest.source.TestDataHolder; +import io.airbyte.protocol.models.JsonSchemaType; +import java.util.Set; +import org.jooq.DSLContext; +import org.testcontainers.containers.PostgreSQLContainer; + +public abstract class AbstractPostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest { + + protected PostgreSQLContainer container; + protected JsonNode config; + protected DSLContext dslContext; + protected static final String SCHEMA_NAME = "test"; + + @Override + protected String getNameSpace() { + return SCHEMA_NAME; + } + + @Override + protected String getImageName() { + return "airbyte/source-postgres:dev"; + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + public boolean testCatalog() { + return true; + } + + // Test cases are sorted alphabetically based on the source type + // See https://www.postgresql.org/docs/14/datatype.html + @Override + protected void initTests() { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bigint") + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("-9223372036854775808", "9223372036854775807", "0", "null") + .addExpectedValues("-9223372036854775808", "9223372036854775807", "0", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bigserial") + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("1", "9223372036854775807", "0", "-9223372036854775808") + .addExpectedValues("1", "9223372036854775807", "0", "-9223372036854775808") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bit") + .fullSourceDataType("BIT(1)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("B'0'") + .addExpectedValues("0") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bit") + .fullSourceDataType("BIT(3)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("B'101'") + .addExpectedValues("101") + .build()); + + for (final String type : Set.of("bit varying", "varbit")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bit_varying") + .fullSourceDataType("BIT VARYING(5)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("B'101'", "null") + .addExpectedValues("101", null) + .build()); + } + + for (final String type : Set.of("boolean", "bool")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.BOOLEAN) + .addInsertValues("true", "'yes'", "'1'", "false", "'no'", "'0'", "null") + .addExpectedValues("true", "true", "true", "false", "false", "false", null) + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("box") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") + .addExpectedValues("(15.0,18.0),(3.0,7.0)", "(0.0,0.0),(0.0,0.0)", null) + .build()); + + // bytea stores variable length binary string + // https://www.postgresql.org/docs/14/datatype-binary.html + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bytea") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "decode('1234', 'hex')", "'1234'", "'abcd'", "'\\xabcd'") + .addExpectedValues(null, "\\x1234", "\\x31323334", "\\x61626364", "\\xabcd") + .build()); + + for (final String type : Set.of("character", "char")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'a'", "'*'", "null") + .addExpectedValues("a", "*", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .fullSourceDataType(type + "(8)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'{asb123}'", "'{asb12}'") + .addExpectedValues("{asb123}", "{asb12} ") + .build()); + } + + for (final String type : Set.of("varchar", "text")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'a'", "'abc'", "'Миші йдуть на південь, не питай чому;'", "'櫻花分店'", + "''", "null", "'\\xF0\\x9F\\x9A\\x80'") + .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", + null, "\\xF0\\x9F\\x9A\\x80") + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("varchar") + .fullSourceDataType("character varying(10)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'{asb123}'", "'{asb12}'") + .addExpectedValues("{asb123}", "{asb12}") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("cidr") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'192.168.100.128/25'", "'192.168/24'", "'192.168.1'", + "'128.1'", "'2001:4f8:3:ba::/64'") + .addExpectedValues(null, "192.168.100.128/25", "192.168.0.0/24", "192.168.1.0/24", + "128.1.0.0/16", "2001:4f8:3:ba::/64") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("circle") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'(5,7),10'", "'(0,0),0'", "'(-10,-4),10'", "null") + .addExpectedValues("<(5.0,7.0),10.0>", "<(0.0,0.0),0.0>", "<(-10.0,-4.0),10.0>", null) + .build()); + + // Debezium does not handle era indicators (AD nd BC) + // https://github.com/airbytehq/airbyte/issues/14590 + // addDataTypeTestData( + // TestDataHolder.builder() + // .sourceType("date") + // .airbyteType(JsonSchemaType.STRING_DATE) + // .addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "null") + // .addExpectedValues("1999-01-08", "1990-02-10 BC", null) + // .build()); + + for (final String type : Set.of("double precision", "float", "float8")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("'123'", "'1234567890.1234567'", "null") + // Postgres source does not support these special values yet + // https://github.com/airbytehq/airbyte/issues/8902 + // "'-Infinity'", "'Infinity'", "'NaN'", "null") + .addExpectedValues("123.0", "1.2345678901234567E9", null) + // "-Infinity", "Infinity", "NaN", null) + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("inet") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'198.24.10.0/24'", "'198.24.10.0'", "'198.10/8'", "null") + .addExpectedValues("198.24.10.0/24", "198.24.10.0", "198.10.0.0/8", null) + .build()); + + for (final String type : Set.of("integer", "int", "int4")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("null", "1001", "-2147483648", "2147483647") + .addExpectedValues(null, "1001", "-2147483648", "2147483647") + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("interval") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'P1Y2M3DT4H5M6S'", "'-178000000'", "'178000000'") + .addExpectedValues(null, "1 year 2 mons 3 days 04:05:06", "-49444:26:40", "49444:26:40") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("json") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'{\"a\": 10, \"b\": 15}'") + .addExpectedValues(null, "{\"a\": 10, \"b\": 15}") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("jsonb") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'[1, 2, 3]'::jsonb") + .addExpectedValues(null, "[1, 2, 3]") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("line") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'{4,5,6}'", "'{0,1,0}'", "null") + .addExpectedValues("{4.0,5.0,6.0}", "{0.0,1.0,0.0}", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("lseg") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") + .addExpectedValues("[(3.0,7.0),(15.0,18.0)]", "[(0.0,0.0),(0.0,0.0)]", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("macaddr") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'08:00:2b:01:02:03'", "'08-00-2b-01-02-04'", + "'08002b:010205'") + .addExpectedValues(null, "08:00:2b:01:02:03", "08:00:2b:01:02:04", "08:00:2b:01:02:05") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("macaddr8") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'08:00:2b:01:02:03:04:05'", "'08-00-2b-01-02-03-04-06'", + "'08002b:0102030407'") + .addExpectedValues(null, "08:00:2b:01:02:03:04:05", "08:00:2b:01:02:03:04:06", + "08:00:2b:01:02:03:04:07") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("money") + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues( + "null", + "'999.99'", "'1,001.01'", "'-1,000'", + "'$999.99'", "'$1001.01'", "'-$1,000'", + // max values for Money type: "-92233720368547758.08", "92233720368547758.07" + "'-92233720368547758.08'", "'92233720368547758.07'") + .addExpectedValues( + null, + // Double#toString method is necessary here because sometimes the output + // has unexpected decimals, e.g. Double.toString(-1000) is -1000.0 + "999.99", "1001.01", Double.toString(-1000), + "999.99", "1001.01", Double.toString(-1000), + Double.toString(-92233720368547758.08), Double.toString(92233720368547758.07)) + .build()); + + // Blocked by https://github.com/airbytehq/airbyte/issues/8902 + for (final String type : Set.of("numeric", "decimal")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues( + "'123'", "null", "'1234567890.1234567'") + // Postgres source does not support these special values yet + // https://github.com/airbytehq/airbyte/issues/8902 + // "'infinity'", "'-infinity'", "'nan'" + .addExpectedValues("123", null, "1.2345678901234567E9") + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("path") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") + .addExpectedValues("((3.0,7.0),(15.0,18.0))", "((0.0,0.0),(0.0,0.0))", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("pg_lsn") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'7/A25801C8'::pg_lsn", "'0/0'::pg_lsn", "null") + .addExpectedValues("7/A25801C8", "0/0", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("point") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'(3,7)'", "'(0,0)'", "'(999999999999999999999999,0)'", "null") + .addExpectedValues("(3.0,7.0)", "(0.0,0.0)", "(1.0E24,0.0)", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("polygon") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", + "'((0,0),(999999999999999999999999,0))'", "null") + .addExpectedValues("((3.0,7.0),(15.0,18.0))", "((0.0,0.0),(0.0,0.0))", "((0.0,0.0),(1.0E24,0.0))", null) + .build()); + + for (final String type : Set.of("real", "float4")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("null", "3.4145") + .addExpectedValues(null, "3.4145") + .build()); + } + + for (final String type : Set.of("smallint", "int2")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("null", "-32768", "32767") + .addExpectedValues(null, "-32768", "32767") + .build()); + } + + for (final String type : Set.of("smallserial", "serial2")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("1", "32767", "0", "-32767") + .addExpectedValues("1", "32767", "0", "-32767") + .build()); + } + + for (final String type : Set.of("serial", "serial4")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("1", "2147483647", "0", "-2147483647") + .addExpectedValues("1", "2147483647", "0", "-2147483647") + .build()); + } + + // time without time zone + for (final String fullSourceType : Set.of("time", "time without time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("time") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE) + // time column will ignore time zone + .addInsertValues("null", "'13:00:01'", "'13:00:02+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.01234Z+8'", "'13:00:00Z-8'") + .addExpectedValues(null, "13:00:01.000000", "13:00:02.000000", "13:00:03.000000", "13:00:04.000000", "13:00:05.012340", + "13:00:00.000000") + .build()); + } + + // time with time zone + for (final String fullSourceType : Set.of("timetz", "time with time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timetz") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIME_WITH_TIMEZONE) + .addInsertValues("null", "'13:00:01'", "'13:00:00+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.012345Z+8'", "'13:00:06.00000Z-8'") + // A time value without time zone will use the time zone set on the database, which is Z-7, + // so 13:00:01 is returned as 13:00:01-07. + .addExpectedValues(null, "13:00:01.000000-07:00", "13:00:00.000000+08:00", "13:00:03.000000-08:00", "13:00:04.000000Z", + "13:00:05.012345-08:00", "13:00:06.000000+08:00") + .build()); + } + + // timestamp without time zone + for (final String fullSourceType : Set.of("timestamp", "timestamp without time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timestamp") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) + .addInsertValues("TIMESTAMP '2004-10-19 10:23:00'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null") + .addExpectedValues("2004-10-19T10:23:00.000000", "2004-10-19T10:23:54.123456", null) + .build()); + } + + // timestamp with time zone + for (final String fullSourceType : Set.of("timestamptz", "timestamp with time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timestamptz") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE) + .addInsertValues("TIMESTAMP '2004-10-19 10:23:00-08'", "TIMESTAMP '2004-10-19 10:23:54.123456-08'", "null") + // 2004-10-19T10:23:54Z-8 = 2004-10-19T17:23:54Z + .addExpectedValues("2004-10-19T17:23:00.000000Z", "2004-10-19T17:23:54.123456Z", null) + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("tsquery") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery") + .addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("tsvector") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("to_tsvector('The quick brown fox jumped over the lazy dog.')") + .addExpectedValues("'brown':3 'dog':9 'fox':4 'jumped':5 'lazy':8 'over':6 'quick':2 'the':1,7") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("uuid") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'", "null") + .addExpectedValues("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("xml") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues( + "XMLPARSE (DOCUMENT 'Manual...')", + "null", "''") + .addExpectedValues("Manual...", null, "") + .build()); + + // enum type + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("mood") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'happy'", "null") + .addExpectedValues("happy", null) + .build()); + + // range + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("tsrange") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'(2010-01-01 14:30, 2010-01-01 15:30)'", "null") + .addExpectedValues("(\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")", null) + .build()); + + // array + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("text") + .fullSourceDataType("text[]") + .airbyteType(JsonSchemaType.ARRAY) + .addInsertValues("'{10001, 10002, 10003, 10004}'", "null") + .addExpectedValues("[\"10001\",\"10002\",\"10003\",\"10004\"]", null) + .build()); + + // composite type + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("inventory_item") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null") + .addExpectedValues("(\"fuzzy dice\",42,1.99)", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("hstore") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues(""" + '"paperback" => "243","publisher" => "postgresqltutorial.com", + "language" => "English","ISBN-13" => "978-1449370000", + "weight" => "11.2 ounces"' + """, null) + .addExpectedValues( + """ + {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", + null) + .build()); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java index c31991f42c92..8f8ae91cd7b5 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java @@ -11,31 +11,25 @@ import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTest; -import io.airbyte.integrations.standardtest.source.TestDataHolder; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.integrations.util.HostPortResolver; import io.airbyte.protocol.models.JsonSchemaType; import java.util.List; -import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; -public class CdcPostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest { +public class CdcPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest { private static final String SCHEMA_NAME = "test"; private static final String SLOT_NAME_BASE = "debezium_slot"; private static final String PUBLICATION = "publication"; private static final int INITIAL_WAITING_SECONDS = 5; - private PostgreSQLContainer container; - private JsonNode config; - private DSLContext dslContext; @Override protected Database setupDatabase() throws Exception { - container = new PostgreSQLContainer<>("postgres:13-alpine") + container = new PostgreSQLContainer<>("postgres:14-alpine") .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), "/etc/postgresql/postgresql.conf") .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); @@ -75,10 +69,10 @@ protected Database setupDatabase() throws Exception { final Database database = new Database(dslContext); database.query(ctx -> { - ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); + ctx.execute( + "SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); ctx.execute("CREATE EXTENSION hstore;"); - return null; }); @@ -94,473 +88,10 @@ protected Database setupDatabase() throws Exception { return database; } - @Override - protected String getNameSpace() { - return SCHEMA_NAME; - } - - @Override - protected String getImageName() { - return "airbyte/source-postgres:dev"; - } - - @Override - protected JsonNode getConfig() { - return config; - } - @Override protected void tearDown(final TestDestinationEnv testEnv) { dslContext.close(); container.close(); } - @Override - protected void initTests() { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bigint") - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("-9223372036854775808", "9223372036854775807", "0", "null") - .addExpectedValues("-9223372036854775808", "9223372036854775807", "0", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bigserial") - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("1", "9223372036854775807", "0", "-9223372036854775808") - .addExpectedValues("1", "9223372036854775807", "0", "-9223372036854775808") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("serial") - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("1", "2147483647", "0", "-2147483647") - .addExpectedValues("1", "2147483647", "0", "-2147483647") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("smallserial") - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("1", "32767", "0", "-32767") - .addExpectedValues("1", "32767", "0", "-32767") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit") - .fullSourceDataType("BIT(3)") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("B'101'", "B'111'", "null") - .addExpectedValues("101", "111", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit_varying") - .fullSourceDataType("BIT VARYING(5)") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("B'101'", "null") - .addExpectedValues("101", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("boolean") - .airbyteType(JsonSchemaType.BOOLEAN) - .addInsertValues("true", "'yes'", "'1'", "false", "'no'", "'0'", "null") - .addExpectedValues("true", "true", "true", "false", "false", "false", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bytea") - .airbyteType(JsonSchemaType.OBJECT) - .addInsertValues("decode('1234', 'hex')") - .addExpectedValues("EjQ=") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("character") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'*'", "null") - .addExpectedValues("a", "*", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("character") - .fullSourceDataType("character(8)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{asb123}'", "'{asb12}'") - .addExpectedValues("{asb123}", "{asb12} ") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("varchar") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'abc'", "'Миші йдуть на південь, не питай чому;'", "'櫻花分店'", - "''", "null", "'\\xF0\\x9F\\x9A\\x80'") - .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", - null, "\\xF0\\x9F\\x9A\\x80") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("varchar") - .fullSourceDataType("character(12)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'abc'", "'Миші йдуть;'", "'櫻花分店'", - "''", "null") - .addExpectedValues("a ", "abc ", "Миші йдуть; ", "櫻花分店 ", - " ", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("cidr") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'192.168.100.128/25'", "'192.168/24'", "'192.168.1'", - "'128.1'", "'2001:4f8:3:ba::/64'") - .addExpectedValues(null, "192.168.100.128/25", "192.168.0.0/24", "192.168.1.0/24", - "128.1.0.0/16", "2001:4f8:3:ba::/64") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("date") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'January 7, 1999'", "'1999-01-08'", "'1/9/1999'", "'January 10, 99 BC'", "'January 11, 99 AD'", "null") - .addExpectedValues("1999-01-07T00:00:00Z", "1999-01-08T00:00:00Z", "1999-01-09T00:00:00Z", "0099-01-10T00:00:00Z", "1999-01-11T00:00:00Z", - null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("float8") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'123'", "'1234567890.1234567'", "'-Infinity'", "'Infinity'", "'NaN'", "null") - .addExpectedValues("123.0", "1.2345678901234567E9", "-Infinity", "Infinity", "NaN", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("float") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'123'", "'1234567890.1234567'", "'-Infinity'", "'Infinity'", "'NaN'", "null") - .addExpectedValues("123.0", "1.2345678901234567E9", "-Infinity", "Infinity", "NaN", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("inet") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'198.24.10.0/24'", "'198.24.10.0'", "'198.10/8'", "null") - .addExpectedValues("198.24.10.0/24", "198.24.10.0", "198.10.0.0/8", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("int") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("null", "-2147483648", "2147483647") - .addExpectedValues(null, "-2147483648", "2147483647") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("interval") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'P1Y2M3DT4H5M6S'", "'PT4H5M6S'", "'-300'", "'-178000000'", - "'178000000'", "'1-2'", "'3 4:05:06'", "'P0002-02-03T04:05:06'") - .addExpectedValues(null, "1 year 2 mons 3 days 04:05:06", "04:05:06", "-00:05:00", "-49444:26:40", - "49444:26:40", "1 year 2 mons 00:00:00", "3 days 04:05:06", "2 year 2 mons 3 days 04:05:06") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("json") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'{\"a\": 10, \"b\": 15}'") - .addExpectedValues(null, "{\"a\": 10, \"b\": 15}") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("jsonb") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'[1, 2, 3]'::jsonb") - .addExpectedValues(null, "[1, 2, 3]") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("macaddr") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'08:00:2b:01:02:03'", "'08-00-2b-01-02-04'", - "'08002b:010205'") - .addExpectedValues(null, "08:00:2b:01:02:03", "08:00:2b:01:02:04", "08:00:2b:01:02:05") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("macaddr8") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'08:00:2b:01:02:03:04:05'", "'08-00-2b-01-02-03-04-06'", - "'08002b:0102030407'") - .addExpectedValues(null, "08:00:2b:01:02:03:04:05", "08:00:2b:01:02:03:04:06", - "08:00:2b:01:02:03:04:07") - .build()); - - // Max values for Money type should be: "-92233720368547758.08", "92233720368547758.07", - // debezium return rounded value for values more than 999999999999999 and less than - // -999999999999999, - // we map these value as null; - // opened issue https://github.com/airbytehq/airbyte/issues/7338 - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("money") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'999.99'", "'1,000.01'", "'-999999999999.99'", "'-999999999999999'", "'999999999999.99'", "'999999999999999'", - "'-92233720368547758.08'", "'92233720368547758.07'") - .addExpectedValues(null, "999.99", "1000.01", "-999999999999.99", "-999999999999999", "999999999999.99", "999999999999999", - null, null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("numeric") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'99999'", "'NAN'", "10000000000000000000000000000000000000", null) - .addExpectedValues("99999", "NAN", "10000000000000000000000000000000000000", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("decimal") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("99999", "5.1", "0", "'NAN'", "null") - .addExpectedValues("99999", "5.1", "0", "NAN", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("numeric") - .fullSourceDataType("numeric(13,4)") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("0.1880", "10.0000", "5213.3468", "'NAN'", "null") - .addExpectedValues("0.1880", "10.0000", "5213.3468", "NAN", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("smallint") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("null", "-32768", "32767") - .addExpectedValues(null, "-32768", "32767") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("text") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'abc'", "'Миші йдуть;'", "'櫻花分店'", - "''", "null", "'\\xF0\\x9F\\x9A\\x80'") - .addExpectedValues("a", "abc", "Миші йдуть;", "櫻花分店", "", null, "\\xF0\\x9F\\x9A\\x80") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("time") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'04:05:06'", "'2021-04-12 05:06:07'", "'04:05 PM'") - .addExpectedValues(null, "04:05:06", "05:06:07", "16:05:00") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timetz") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'04:05:06+03'", "'2021-04-12 05:06:07+00'", "'060708-03'") - .addExpectedValues(null, "04:05:06+03", "05:06:07+00", "06:07:08-03") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamp") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("TIMESTAMP '2004-10-19 10:23:54'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null") - .addExpectedValues("2004-10-19T10:23:54.000000Z", "2004-10-19T10:23:54.123456Z", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamptz") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("TIMESTAMP WITH TIME ZONE '2004-10-19 10:23:54+03'", "TIMESTAMP WITH TIME ZONE '2004-10-19 10:23:54.123456+03'", "null") - .addExpectedValues("2004-10-19T07:23:54Z", "2004-10-19T07:23:54.123456Z", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsvector") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("to_tsvector('The quick brown fox jumped over the lazy dog.')") - .addExpectedValues("'brown':3 'dog':9 'fox':4 'jumped':5 'lazy':8 'over':6 'quick':2 'the':1,7") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("uuid") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'", "null") - .addExpectedValues("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("xml") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues( - "XMLPARSE (DOCUMENT 'Manual...')", - "null", "''") - .addExpectedValues("Manual...", null, "") - .build()); - - // preconditions for this test are set at the time of database creation (setupDatabase method) - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("mood") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'happy'", "null") - .addExpectedValues("happy", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("text") - .fullSourceDataType("text[]") - .airbyteType(JsonSchemaType.ARRAY) - .addInsertValues("'{10000, 10000, 10000, 10000}'", "null") - .addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("inventory_item") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null") - .addExpectedValues("(\"fuzzy dice\",42,1.99)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsrange") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(2010-01-01 14:30, 2010-01-01 15:30)'", "null") - .addExpectedValues("(\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("box") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("(15.0,18.0),(3.0,7.0)", "(0.0,0.0),(0.0,0.0)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("circle") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(5,7),10'", "'(0,0),0'", "'(-10,-4),10'", "null") - .addExpectedValues("<(5.0,7.0),10.0>", "<(0.0,0.0),0.0>", "<(-10.0,-4.0),10.0>", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("line") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{4,5,6}'", "'{0,1,0}'", "null") - .addExpectedValues("{4.0,5.0,6.0}", "{0.0,1.0,0.0}", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("lseg") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("[(3.0,7.0),(15.0,18.0)]", "[(0.0,0.0),(0.0,0.0)]", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("path") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15.5,18.2))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("((3.0,7.0),(15.5,18.2))", "((0.0,0.0),(0.0,0.0))", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("point") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'(3,7)'", "'(0,0)'", "'(999999999999999999999999,0)'", "null") - .addExpectedValues("(3.0,7.0)", "(0.0,0.0)", "(1.0E24,0.0)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("polygon") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", - "'((0,0),(999999999999999999999999,0))'", "null") - .addExpectedValues("((3.0,7.0),(15.0,18.0))", "((0.0,0.0),(0.0,0.0))", "((0.0,0.0),(1.0E24,0.0))", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("real") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'123'", "'1234567890.1234567'", "null") - .addExpectedValues("123.0", "1.23456794E9", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsvector") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("to_tsvector('The quick brown fox jumped over the lazy dog.')") - .addExpectedValues("'brown':3 'dog':9 'fox':4 'jumped':5 'lazy':8 'over':6 'quick':2 'the':1,7") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsquery") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery") - .addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("hstore") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues(""" - '"paperback" => "243","publisher" => "postgresqltutorial.com", - "language" => "English","ISBN-13" => "978-1449370000", - "weight" => "11.2 ounces"' - """, null) - .addExpectedValues( - """ - {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", - null) - .build()); - } - } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java index b3945702cf36..34ff8d39ebea 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java @@ -11,27 +11,22 @@ import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTest; -import io.airbyte.integrations.standardtest.source.TestDataHolder; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.integrations.util.HostPortResolver; import io.airbyte.protocol.models.JsonSchemaType; import java.sql.SQLException; -import java.util.Set; -import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.MountableFile; -public class PostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest { - - private PostgreSQLContainer container; - private JsonNode config; - private DSLContext dslContext; - private static final String SCHEMA_NAME = "test"; +public class PostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest { @Override protected Database setupDatabase() throws SQLException { - container = new PostgreSQLContainer<>("postgres:14-alpine"); + container = new PostgreSQLContainer<>("postgres:14-alpine") + .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), + "/etc/postgresql/postgresql.conf") + .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); container.start(); final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("method", "Standard") @@ -77,21 +72,6 @@ protected Database setupDatabase() throws SQLException { return database; } - @Override - protected String getNameSpace() { - return SCHEMA_NAME; - } - - @Override - protected String getImageName() { - return "airbyte/source-postgres:dev"; - } - - @Override - protected JsonNode getConfig() { - return config; - } - @Override protected void tearDown(final TestDestinationEnv testEnv) { dslContext.close(); @@ -103,485 +83,4 @@ public boolean testCatalog() { return true; } - // Test cases are sorted alphabetically based on the source type - // See https://www.postgresql.org/docs/14/datatype.html - @Override - protected void initTests() { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bigint") - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("-9223372036854775808", "9223372036854775807", "0", "null") - .addExpectedValues("-9223372036854775808", "9223372036854775807", "0", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bigserial") - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("1", "9223372036854775807", "0", "-9223372036854775808") - .addExpectedValues("1", "9223372036854775807", "0", "-9223372036854775808") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit") - .fullSourceDataType("BIT(1)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("B'0'") - .addExpectedValues("0") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit") - .fullSourceDataType("BIT(3)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("B'101'") - .addExpectedValues("101") - .build()); - - for (final String type : Set.of("bit varying", "varbit")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit_varying") - .fullSourceDataType("BIT VARYING(5)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("B'101'", "null") - .addExpectedValues("101", null) - .build()); - } - - for (final String type : Set.of("boolean", "bool")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.BOOLEAN) - .addInsertValues("true", "'yes'", "'1'", "false", "'no'", "'0'", "null") - .addExpectedValues("true", "true", "true", "false", "false", "false", null) - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("box") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("(15,18),(3,7)", "(0,0),(0,0)", null) - .build()); - - // bytea stores variable length binary string - // https://www.postgresql.org/docs/14/datatype-binary.html - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bytea") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "decode('1234', 'hex')", "'1234'", "'abcd'", "'\\xabcd'") - .addExpectedValues(null, "\\x1234", "\\x31323334", "\\x61626364", "\\xabcd") - .build()); - - for (final String type : Set.of("character", "char")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'*'", "null") - .addExpectedValues("a", "*", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .fullSourceDataType(type + "(8)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{asb123}'", "'{asb12}'") - .addExpectedValues("{asb123}", "{asb12} ") - .build()); - } - - for (final String type : Set.of("varchar", "text")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'abc'", "'Миші йдуть на південь, не питай чому;'", "'櫻花分店'", - "''", "null", "'\\xF0\\x9F\\x9A\\x80'") - .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", - null, "\\xF0\\x9F\\x9A\\x80") - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("varchar") - .fullSourceDataType("character varying(10)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{asb123}'", "'{asb12}'") - .addExpectedValues("{asb123}", "{asb12}") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("cidr") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'192.168.100.128/25'", "'192.168/24'", "'192.168.1'", - "'128.1'", "'2001:4f8:3:ba::/64'") - .addExpectedValues(null, "192.168.100.128/25", "192.168.0.0/24", "192.168.1.0/24", - "128.1.0.0/16", "2001:4f8:3:ba::/64") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("circle") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(5,7),10'", "'(0,0),0'", "'(-10,-4),10'", "null") - .addExpectedValues("<(5,7),10>", "<(0,0),0>", "<(-10,-4),10>", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("date") - .airbyteType(JsonSchemaType.STRING_DATE) - .addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "null") - .addExpectedValues("1999-01-08", "1990-02-10 BC", null) - .build()); - - for (final String type : Set.of("double precision", "float", "float8")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues( - "null", "'123'", "'1234567890.1234567'", - // Postgres source does not support these special values yet - // https://github.com/airbytehq/airbyte/issues/8902 - "'infinity'", "'-infinity'", "'nan'") - .addExpectedValues(null, "123.0", "1.2345678901234567E9", null, null, null) - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("inet") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'198.24.10.0/24'", "'198.24.10.0'", "'198.10/8'", "null") - .addExpectedValues("198.24.10.0/24", "198.24.10.0", "198.10.0.0/8", null) - .build()); - - for (final String type : Set.of("integer", "int", "int4")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("null", "1001", "-2147483648", "2147483647") - .addExpectedValues(null, "1001", "-2147483648", "2147483647") - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("interval") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'P1Y2M3DT4H5M6S'", "'-178000000'", "'178000000'") - .addExpectedValues(null, "1 year 2 mons 3 days 04:05:06", "-49444:26:40", "49444:26:40") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("json") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'{\"a\": 10, \"b\": 15}'") - .addExpectedValues(null, "{\"a\": 10, \"b\": 15}") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("jsonb") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'[1, 2, 3]'::jsonb") - .addExpectedValues(null, "[1, 2, 3]") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("line") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{4,5,6}'", "'{0,1,0}'", "null") - .addExpectedValues("{4,5,6}", "{0,1,0}", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("lseg") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("[(3,7),(15,18)]", "[(0,0),(0,0)]", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("macaddr") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'08:00:2b:01:02:03'", "'08-00-2b-01-02-04'", - "'08002b:010205'") - .addExpectedValues(null, "08:00:2b:01:02:03", "08:00:2b:01:02:04", "08:00:2b:01:02:05") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("macaddr8") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'08:00:2b:01:02:03:04:05'", "'08-00-2b-01-02-03-04-06'", - "'08002b:0102030407'") - .addExpectedValues(null, "08:00:2b:01:02:03:04:05", "08:00:2b:01:02:03:04:06", - "08:00:2b:01:02:03:04:07") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("money") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues( - "null", - "'999.99'", "'1,001.01'", "'-1,000'", - "'$999.99'", "'$1001.01'", "'-$1,000'", - // max values for Money type: "-92233720368547758.08", "92233720368547758.07" - "'-92233720368547758.08'", "'92233720368547758.07'") - .addExpectedValues( - null, - // Double#toString method is necessary here because sometimes the output - // has unexpected decimals, e.g. Double.toString(-1000) is -1000.0 - "999.99", "1001.01", Double.toString(-1000), - "999.99", "1001.01", Double.toString(-1000), - Double.toString(-92233720368547758.08), Double.toString(92233720368547758.07)) - .build()); - - for (final String type : Set.of("numeric", "decimal")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues( - "'123'", "null", "'1234567890.1234567'", - // Postgres source does not support these special values yet - // https://github.com/airbytehq/airbyte/issues/8902 - "'infinity'", "'-infinity'", "'nan'") - .addExpectedValues("123", null, "1.2345678901234567E9", null, null, null) - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("path") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("((3,7),(15,18))", "((0,0),(0,0))", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("pg_lsn") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'7/A25801C8'::pg_lsn", "'0/0'::pg_lsn", "null") - .addExpectedValues("7/A25801C8", "0/0", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("point") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(3,7)'", "'(0,0)'", "'(999999999999999999999999,0)'", "null") - .addExpectedValues("(3,7)", "(0,0)", "(1e+24,0)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("polygon") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", - "'((0,0),(999999999999999999999999,0))'", "null") - .addExpectedValues("((3,7),(15,18))", "((0,0),(0,0))", "((0,0),(1e+24,0))", null) - .build()); - - for (final String type : Set.of("real", "float4")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("null", "3.4145") - .addExpectedValues(null, "3.4145") - .build()); - } - - for (final String type : Set.of("smallint", "int2")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("null", "-32768", "32767") - .addExpectedValues(null, "-32768", "32767") - .build()); - } - - for (final String type : Set.of("smallserial", "serial2")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("1", "32767", "0", "-32767") - .addExpectedValues("1", "32767", "0", "-32767") - .build()); - } - - for (final String type : Set.of("serial", "serial4")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.INTEGER) - .addInsertValues("1", "2147483647", "0", "-2147483647") - .addExpectedValues("1", "2147483647", "0", "-2147483647") - .build()); - } - - // time without time zone - for (final String fullSourceType : Set.of("time", "time without time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("time") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE) - // time column will ignore time zone - .addInsertValues("null", "'13:00:01'", "'13:00:02+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.01234Z+8'", "'13:00:00Z-8'") - .addExpectedValues(null, "13:00:01.000000", "13:00:02.000000", "13:00:03.000000", "13:00:04.000000", "13:00:05.012340", - "13:00:00.000000") - .build()); - } - - // time with time zone - for (final String fullSourceType : Set.of("timetz", "time with time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timetz") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIME_WITH_TIMEZONE) - .addInsertValues("null", "'13:00:01'", "'13:00:00+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.012345Z+8'", "'13:00:06.00000Z-8'") - // A time value without time zone will use the time zone set on the database, which is Z-7, - // so 13:00:01 is returned as 13:00:01-07. - .addExpectedValues(null, "13:00:01.000000-07:00", "13:00:00.000000+08:00", "13:00:03.000000-08:00", "13:00:04.000000Z", - "13:00:05.012345-08:00", "13:00:06.000000+08:00") - .build()); - } - - // timestamp without time zone - for (final String fullSourceType : Set.of("timestamp", "timestamp without time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamp") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) - .addInsertValues("TIMESTAMP '2004-10-19 10:23:00'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null") - .addExpectedValues("2004-10-19T10:23:00.000000", "2004-10-19T10:23:54.123456", null) - .build()); - } - - // timestamp with time zone - for (final String fullSourceType : Set.of("timestamptz", "timestamp with time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamptz") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE) - .addInsertValues("TIMESTAMP '2004-10-19 10:23:00-08'", "TIMESTAMP '2004-10-19 10:23:54.123456-08'", "null") - // 2004-10-19T10:23:54Z-8 = 2004-10-19T17:23:54Z - .addExpectedValues("2004-10-19T17:23:00.000000Z", "2004-10-19T17:23:54.123456Z", null) - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsquery") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery") - .addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsvector") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("to_tsvector('The quick brown fox jumped over the lazy dog.')") - .addExpectedValues("'brown':3 'dog':9 'fox':4 'jump':5 'lazi':8 'quick':2") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("uuid") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'", "null") - .addExpectedValues("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("xml") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues( - "XMLPARSE (DOCUMENT 'Manual...')", - "null", "''") - .addExpectedValues("Manual...", null, "") - .build()); - - // enum type - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("mood") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'happy'", "null") - .addExpectedValues("happy", null) - .build()); - - // range - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsrange") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(2010-01-01 14:30, 2010-01-01 15:30)'", "null") - .addExpectedValues("(\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")", null) - .build()); - - // array - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("text") - .fullSourceDataType("text[]") - .airbyteType(JsonSchemaType.ARRAY) - .addInsertValues("'{10001, 10002, 10003, 10004}'", "null") - .addExpectedValues("[\"10001\",\"10002\",\"10003\",\"10004\"]", null) - .build()); - - // composite type - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("inventory_item") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null") - .addExpectedValues("(\"fuzzy dice\",42,1.99)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("hstore") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues(""" - '"paperback" => "243","publisher" => "postgresqltutorial.com", - "language" => "English","ISBN-13" => "978-1449370000", - "weight" => "11.2 ounces"' - """, null) - .addExpectedValues( - """ - {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", - null) - .build()); - } - } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index fb5174ae8d35..b58c25ada472 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -286,7 +286,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | `character varying`, `varchar` | string | | | `cidr` | string | | | `circle` | string | | -| `date` | string | Parsed as ISO8601 date time at midnight | +| `date` | string | Parsed as ISO8601 date time at midnight. CDC mode doesn't support era indicators. Issue: [#14590](https://github.com/airbytehq/airbyte/issues/14590) | | `double precision`, `float`, `float8` | number | `Infinity`, `-Infinity`, and `NaN` are not supported and converted to `null`. Issue: [#8902](https://github.com/airbytehq/airbyte/issues/8902). | | `hstore` | string | | | `inet` | string | | @@ -310,9 +310,9 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | `serial`, `serial4` | number | | | `text` | string | | | `time` | string | Parsed as a time string without a time-zone in the ISO-8601 calendar system. | -| `timetz` | string | Parsed as a time string with time-zone in the ISO-8601 calendar system. | +| `timetz` | string | Parsed as a time string with time-zone in the ISO-8601 calendar system. | | `timestamp` | string | Parsed as a date-time string without a time-zone in the ISO-8601 calendar system. | -| `timestamptz` | string | Parsed as a date-time string with time-zone in the ISO-8601 calendar system. | +| `timestamptz` | string | Parsed as a date-time string with time-zone in the ISO-8601 calendar system. | | `tsquery` | string | | | `tsvector` | string | | | `uuid` | string | | From 58e74972975a8685451f69a578291a64d26f5ff7 Mon Sep 17 00:00:00 2001 From: Yurii Bidiuk Date: Thu, 14 Jul 2022 17:22:37 +0300 Subject: [PATCH 2/9] format code --- .../debezium/internals/PostgresConverter.java | 48 +++++++------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index a7f67855f9ae..6e329463493f 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -4,22 +4,10 @@ package io.airbyte.integrations.debezium.internals; -import static io.airbyte.protocol.models.JsonSchemaType.AIRBYTE_TYPE; -import static io.airbyte.protocol.models.JsonSchemaType.DATE; -import static io.airbyte.protocol.models.JsonSchemaType.DATE_TIME; -import static io.airbyte.protocol.models.JsonSchemaType.FORMAT; -import static io.airbyte.protocol.models.JsonSchemaType.TIME; -import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE; -import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITH_TIMEZONE; -import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITHOUT_TIMEZONE; -import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITH_TIMEZONE; - import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; -import java.sql.Date; -import java.sql.Timestamp; import java.util.Arrays; import java.util.Locale; import java.util.Properties; @@ -57,8 +45,7 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati registerMoney(field, registration); } else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) { registerBytea(field, registration); - } - else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + } else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { registerNumber(field, registration); } } @@ -68,22 +55,23 @@ private void registerNumber(final RelationalColumn field, final ConverterRegistr if (x == null) { return DebeziumConverterUtils.convertDefaultValue(field); } -// Bad solution -// We applied a solution like this for several reasons: -// 1. Regarding #13608, CDC and nor-CDC data output format should be the same. -// 2. In the non-CDC mode 'decimal' and 'numeric' values are put to JSON node as BigDecimal value. -// According to Jackson Object mapper configuration, all trailing zeros are omitted and -// numbers with decimal places are deserialized with exponent. (e.g. 1234567890.1234567 would -// be deserialized as 1.2345678901234567E9). -// 3. In the CDC mode 'decimal' and 'numeric' values are deserialized as a regular number (e.g. -// 1234567890.1234567 would be deserialized as 1234567890.1234567). Numbers without -// decimal places (e.g 1, 24, 354) are represented with trailing zero (e.g 1.0, 24.0, 354.0). -// One of solution to align deserialization for these 2 modes is setting -// DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS as true for ObjectMapper. But this breaks -// deserialization for other data-types. -// A worked solution was to keep deserialization for non-CDC mode as it is and change it for CDC one. -// The code below strips trailing zeros for integer numbers and represents number with exponent -// if this number has decimals point. + // Bad solution + // We applied a solution like this for several reasons: + // 1. Regarding #13608, CDC and nor-CDC data output format should be the same. + // 2. In the non-CDC mode 'decimal' and 'numeric' values are put to JSON node as BigDecimal value. + // According to Jackson Object mapper configuration, all trailing zeros are omitted and + // numbers with decimal places are deserialized with exponent. (e.g. 1234567890.1234567 would + // be deserialized as 1.2345678901234567E9). + // 3. In the CDC mode 'decimal' and 'numeric' values are deserialized as a regular number (e.g. + // 1234567890.1234567 would be deserialized as 1234567890.1234567). Numbers without + // decimal places (e.g 1, 24, 354) are represented with trailing zero (e.g 1.0, 24.0, 354.0). + // One of solution to align deserialization for these 2 modes is setting + // DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS as true for ObjectMapper. But this breaks + // deserialization for other data-types. + // A worked solution was to keep deserialization for non-CDC mode as it is and change it for CDC + // one. + // The code below strips trailing zeros for integer numbers and represents number with exponent + // if this number has decimals point. final double doubleValue = Double.parseDouble(x.toString()); var valueWithTruncatedZero = BigDecimal.valueOf(doubleValue).stripTrailingZeros().toString(); return valueWithTruncatedZero.contains(".") ? String.valueOf(doubleValue) : valueWithTruncatedZero; From ad72cae065609dd4c38bcec8dc3ac7dc87de6e88 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 27 Jul 2022 11:02:06 -0700 Subject: [PATCH 3/9] update int handling --- .../sources/AbstractPostgresSourceDatatypeTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java index d383ad927f5c..b0213d5f0d93 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -46,7 +46,7 @@ protected void initTests() { addDataTypeTestData( TestDataHolder.builder() .sourceType("bigint") - .airbyteType(JsonSchemaType.NUMBER) + .airbyteType(JsonSchemaType.INTEGER) .addInsertValues("-9223372036854775808", "9223372036854775807", "0", "null") .addExpectedValues("-9223372036854775808", "9223372036854775807", "0", null) .build()); @@ -54,7 +54,7 @@ protected void initTests() { addDataTypeTestData( TestDataHolder.builder() .sourceType("bigserial") - .airbyteType(JsonSchemaType.NUMBER) + .airbyteType(JsonSchemaType.INTEGER) .addInsertValues("1", "9223372036854775807", "0", "-9223372036854775808") .addExpectedValues("1", "9223372036854775807", "0", "-9223372036854775808") .build()); @@ -210,7 +210,7 @@ protected void initTests() { addDataTypeTestData( TestDataHolder.builder() .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) + .airbyteType(JsonSchemaType.INTEGER) .addInsertValues("null", "1001", "-2147483648", "2147483647") .addExpectedValues(null, "1001", "-2147483648", "2147483647") .build()); @@ -356,7 +356,7 @@ protected void initTests() { addDataTypeTestData( TestDataHolder.builder() .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) + .airbyteType(JsonSchemaType.INTEGER) .addInsertValues("null", "-32768", "32767") .addExpectedValues(null, "-32768", "32767") .build()); @@ -366,7 +366,7 @@ protected void initTests() { addDataTypeTestData( TestDataHolder.builder() .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) + .airbyteType(JsonSchemaType.INTEGER) .addInsertValues("1", "32767", "0", "-32767") .addExpectedValues("1", "32767", "0", "-32767") .build()); @@ -376,7 +376,7 @@ protected void initTests() { addDataTypeTestData( TestDataHolder.builder() .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) + .airbyteType(JsonSchemaType.INTEGER) .addInsertValues("1", "2147483647", "0", "-2147483647") .addExpectedValues("1", "2147483647", "0", "-2147483647") .build()); From 146081eb82a63f3002002910870190c1d7aa9ca3 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 27 Jul 2022 15:22:13 -0700 Subject: [PATCH 4/9] fix build --- .../destination/clickhouse/ClickhouseTestSourceOperations.java | 2 +- .../destination/clickhouse/ClickhouseTestSourceOperations.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java index cc90ad1e528f..b35dd3be4b16 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java @@ -22,7 +22,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res @Override protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); + final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class); final LocalDate date = timestamp.toLocalDate(); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern( diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java index cc90ad1e528f..b35dd3be4b16 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java @@ -22,7 +22,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res @Override protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); + final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class); final LocalDate date = timestamp.toLocalDate(); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern( From 242b773b7f7d28126e6f32ba9c4fb0792ca7c30f Mon Sep 17 00:00:00 2001 From: Yurii Bidiuk Date: Thu, 28 Jul 2022 14:41:12 +0300 Subject: [PATCH 5/9] fix PR remarks --- .../debezium/internals/DateTimeConverter.java | 7 +++++++ .../source/postgres/PostgresSourceOperations.java | 2 +- .../AbstractPostgresSourceDatatypeTest.java | 14 +++++++------- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java index f6062b8d8ff1..27b0cc85ab51 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java @@ -59,6 +59,13 @@ public static String convertToTime(Object time) { return localTime.format(TIME_FORMATTER); } + /** + * This method converts '-YYYY-mm-dd' to 'YYYY-mm-dd BC' if date is before current era. + * + * @param date local date + * @param value formatted date + * @return value with era indicator if date is before current era + */ public static String resolveEra(LocalDate date, String value) { return isBCE(date) ? value.substring(1) + " BC" : value; } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 658b02fcd4e4..84db799f9fa7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -184,7 +184,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex); case "circle" -> putObject(json, columnName, resultSet, colIndex, PGcircle.class); case "box" -> putObject(json, columnName, resultSet, colIndex, PGbox.class); - case "double precision", "float", "float8" -> putFloat8(json, columnName, resultSet, colIndex); + case "double precision", "float", "float8" -> putDouble(json, columnName, resultSet, colIndex); case "line" -> putObject(json, columnName, resultSet, colIndex, PGline.class); case "lseg" -> putObject(json, columnName, resultSet, colIndex, PGlseg.class); case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class); diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java index b0213d5f0d93..8c07db50ac2c 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -176,13 +176,13 @@ protected void initTests() { // Debezium does not handle era indicators (AD nd BC) // https://github.com/airbytehq/airbyte/issues/14590 - // addDataTypeTestData( - // TestDataHolder.builder() - // .sourceType("date") - // .airbyteType(JsonSchemaType.STRING_DATE) - // .addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "null") - // .addExpectedValues("1999-01-08", "1990-02-10 BC", null) - // .build()); + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("date") + .airbyteType(JsonSchemaType.STRING_DATE) + .addInsertValues("'1999-01-08'", /* "'1991-02-10 BC'",*/ "null") + .addExpectedValues("1999-01-08", /* "1990-02-10 BC", */ null) + .build()); for (final String type : Set.of("double precision", "float", "float8")) { addDataTypeTestData( From 3c1be9b186cf95ddcba783d1b55cdf77d181736c Mon Sep 17 00:00:00 2001 From: Yurii Bidiuk Date: Mon, 1 Aug 2022 14:51:51 +0300 Subject: [PATCH 6/9] revert changes for money type that are broken by #7338 --- .../debezium/internals/PostgresConverter.java | 6 +++++- .../sources/AbstractPostgresSourceDatatypeTest.java | 10 ++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index 6e329463493f..4df42d5f8f6b 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -138,7 +138,11 @@ private void registerMoney(final RelationalColumn field, final ConverterRegistra return DebeziumConverterUtils.convertDefaultValue(field); } else if (x instanceof Double) { final BigDecimal result = BigDecimal.valueOf((Double) x); - return Double.toString(result.doubleValue()); + if (result.compareTo(new BigDecimal("999999999999999")) == 1 + || result.compareTo(new BigDecimal("-999999999999999")) == -1) { + return null; + } + return result.toString(); } else { return x.toString(); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java index 8c07db50ac2c..909f08627997 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -282,16 +282,18 @@ protected void initTests() { .addInsertValues( "null", "'999.99'", "'1,001.01'", "'-1,000'", - "'$999.99'", "'$1001.01'", "'-$1,000'", + "'$999.99'", "'$1001.01'", "'-$1,000'" // max values for Money type: "-92233720368547758.08", "92233720368547758.07" - "'-92233720368547758.08'", "'92233720368547758.07'") + // Debezium has wrong parsing for values more than 999999999999999 and less than -999999999999999 + // https://github.com/airbytehq/airbyte/issues/7338 + /*"'-92233720368547758.08'", "'92233720368547758.07'"*/) .addExpectedValues( null, // Double#toString method is necessary here because sometimes the output // has unexpected decimals, e.g. Double.toString(-1000) is -1000.0 "999.99", "1001.01", Double.toString(-1000), - "999.99", "1001.01", Double.toString(-1000), - Double.toString(-92233720368547758.08), Double.toString(92233720368547758.07)) + "999.99", "1001.01", Double.toString(-1000) + /*"-92233720368547758.08", "92233720368547758.07"*/) .build()); // Blocked by https://github.com/airbytehq/airbyte/issues/8902 From dd31e2ff8eff4359cd94a6040c95da7db946d576 Mon Sep 17 00:00:00 2001 From: Yurii Bidiuk Date: Wed, 3 Aug 2022 11:59:28 +0300 Subject: [PATCH 7/9] bump version --- .../connectors/source-postgres/Dockerfile | 2 +- .../source/postgres/PostgresSourceOperations.java | 11 ----------- docs/integrations/sources/postgres.md | 3 ++- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 5c18da098743..8b5e5df204ca 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.38 +LABEL io.airbyte.version=0.4.39 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 84db799f9fa7..953a948baff7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -280,17 +280,6 @@ protected void putBoolean(final ObjectNode node, final String columnName, final node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t")); } - @Override - protected void putFloat8(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) - throws SQLException { - final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index)); - if (bigDecimal != null) { - node.put(columnName, bigDecimal.setScale(resultSet.getMetaData().getScale(index), RoundingMode.HALF_EVEN).doubleValue()); - } else { - node.put(columnName, (BigDecimal) null); - } - } - protected void putObject(final ObjectNode node, final String columnName, final ResultSet resultSet, diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index b58c25ada472..136f9d14d302 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -354,7 +354,8 @@ Possible solutions include: | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------| -| 0.5.0 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | +| 0.4.39 | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers | +| 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | | 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected | | 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable | | 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors | From 755aa12f3c117da81bd6cf1f2c46956dd4d1e983 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 3 Aug 2022 08:09:00 -0700 Subject: [PATCH 8/9] =?UTF-8?q?=F0=9F=90=9B=20Source=20Postgres:=20Improve?= =?UTF-8?q?=20BCE=20date=20handling=20(#15187)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 13608 & 12026 - align regular and CDC integration tests and data mappers * format code * update int handling * borked merge - re-delete deleted methods * enable catalog tests for postgres * fix build * fix PR remarks * revert changes for money type that are broken by #7338 * update BCE handling in JDBC * reuse existing method * handle bce dates * inline methods * fix JDBC BCE year inconsistency * use correct data type in test * format * Update airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java Co-authored-by: Edward Gao * pmd fix * use class.getname() * fix pmd * format * bump version * handle incremental mode * clean up diff * more comments * unused imports * format * versions+changelog Co-authored-by: Yurii Bidiuk Co-authored-by: Yurii Bidiuk <35812734+yurii-bidiuk@users.noreply.github.com> --- ...bstractJdbcCompatibleSourceOperations.java | 52 ++++++++- .../debezium/internals/DateTimeConverter.java | 105 ++++++++++++------ .../debezium/internals/PostgresConverter.java | 2 - .../source-postgres-strict-encrypt/Dockerfile | 2 +- .../connectors/source-postgres/Dockerfile | 2 +- .../postgres/PostgresCdcProperties.java | 3 +- .../postgres/PostgresSourceOperations.java | 8 +- .../AbstractPostgresSourceDatatypeTest.java | 44 ++++++-- .../CdcPostgresSourceDatatypeTest.java | 3 + .../sources/PostgresSourceDatatypeTest.java | 1 - docs/integrations/sources/postgres.md | 3 +- 11 files changed, 174 insertions(+), 51 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index 435f7281a93d..8d4560cf4335 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -37,6 +37,11 @@ */ public abstract class AbstractJdbcCompatibleSourceOperations implements JdbcCompatibleSourceOperations { + /** + * A Date representing the earliest date in CE. Any date before this is in BCE. + */ + private static final Date ONE_CE = Date.valueOf("0001-01-01"); + @Override public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { // the first call communicates with the database. after that the result is cached. @@ -268,12 +273,53 @@ protected void putTimestampWithTimezone(ObjectNode node, String columnName, Resu node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER))); } - protected String resolveEra(LocalDate date, String value) { - return isBCE(date) ? value.substring(1) + " BC" : value; + /** + * Modifies a string representation of a date/timestamp and normalizes its era indicator. + * Specifically, if this is a BCE value: + *
    + *
  • The leading negative sign will be removed if present
  • + *
  • The "BC" suffix will be appended, if not already present
  • + *
+ * + * You most likely would prefer to call one of the overloaded methods, which accept temporal types. + */ + public static String resolveEra(boolean isBce, String value) { + String mangledValue = value; + if (isBce) { + if (mangledValue.startsWith("-")) { + mangledValue = mangledValue.substring(1); + } + if (!mangledValue.endsWith(" BC")) { + mangledValue += " BC"; + } + } + return mangledValue; } - public static boolean isBCE(LocalDate date) { + public static boolean isBce(LocalDate date) { return date.getEra().equals(IsoEra.BCE); } + public static String resolveEra(LocalDate date, String value) { + return resolveEra(isBce(date), value); + } + + /** + * java.sql.Date objects don't properly represent their era (for example, using toLocalDate() always + * returns an object in CE). So to determine the era, we just check whether the date is before 1 AD. + * + * This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.), but + * my understanding is that {@link #ONE_CE} has the same weirdness, so it cancels out. + */ + public static String resolveEra(Date date, String value) { + return resolveEra(date.before(ONE_CE), value); + } + + /** + * See {@link #resolveEra(Date, String)} for explanation. + */ + public static String resolveEra(Timestamp timestamp, String value) { + return resolveEra(timestamp.before(ONE_CE), value); + } + } diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java index 27b0cc85ab51..719eda0995fc 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java @@ -8,7 +8,8 @@ import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER; import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER; import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER; -import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBCE; +import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBce; +import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.resolveEra; import java.sql.Date; import java.sql.Timestamp; @@ -19,6 +20,7 @@ import java.time.OffsetDateTime; import java.time.OffsetTime; import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; public class DateTimeConverter { @@ -32,26 +34,84 @@ public static String convertToTimeWithTimezone(Object time) { } public static String convertToTimestampWithTimezone(Object timestamp) { - OffsetDateTime timestamptz = OffsetDateTime.ofInstant(toInstant(timestamp), ZoneOffset.UTC); - LocalDate localDate = timestamptz.toLocalDate(); - return resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)); + if (timestamp instanceof Timestamp t) { + // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type. + // Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually mangles the + // value for ancient dates, because leap years weren't applied consistently in ye olden days. + // Additionally, toInstant() (and toLocalDateTime()) actually lose the era indicator, so we can't + // rely on their getEra() methods. + // So we have special handling for this case, which sidesteps the toInstant conversion. + ZonedDateTime timestamptz = t.toLocalDateTime().atZone(ZoneOffset.UTC); + String value = timestamptz.format(TIMESTAMPTZ_FORMATTER); + return resolveEra(t, value); + } else if (timestamp instanceof OffsetDateTime t) { + // In incremental mode, debezium emits java.time.OffsetDateTime objects. + // java.time classes have a year 0, but the standard AD/BC system does not. For example, + // "0001-01-01 BC" is represented as LocalDate("0000-01-01"). + // We just subtract one year to hack around this difference. + LocalDate localDate = t.toLocalDate(); + if (isBce(localDate)) { + t = t.minusYears(1); + } + return resolveEra(localDate, t.toString()); + } else { + // This case probably isn't strictly necessary, but I'm leaving it just in case there's some weird + // situation that I'm not aware of. + Instant instant = Instant.parse(timestamp.toString()); + OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(instant, ZoneOffset.UTC); + ZonedDateTime timestamptz = ZonedDateTime.from(offsetDateTime); + LocalDate localDate = timestamptz.toLocalDate(); + String value = timestamptz.format(TIMESTAMPTZ_FORMATTER); + return resolveEra(localDate, value); + } } + /** + * See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening + * here. + */ public static String convertToTimestamp(Object timestamp) { - final LocalDateTime localDateTime = LocalDateTime.ofInstant(toInstant(timestamp), - ZoneOffset.UTC); - final LocalDate date = localDateTime.toLocalDate(); - return resolveEra(date, localDateTime.format(TIMESTAMP_FORMATTER)); + if (timestamp instanceof Timestamp t) { + // Snapshot mode + LocalDateTime localDateTime = t.toLocalDateTime(); + String value = localDateTime.format(TIMESTAMP_FORMATTER); + return resolveEra(t, value); + } else if (timestamp instanceof Instant i) { + // Incremental mode + LocalDate localDate = i.atZone(ZoneOffset.UTC).toLocalDate(); + if (isBce(localDate)) { + // i.minus(1, ChronoUnit.YEARS) would be nice, but it throws an exception because you can't subtract + // YEARS from an Instant + i = i.atZone(ZoneOffset.UTC).minusYears(1).toInstant(); + } + return resolveEra(localDate, i.toString()); + } else { + LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString()); + final LocalDate date = localDateTime.toLocalDate(); + String value = localDateTime.format(TIMESTAMP_FORMATTER); + return resolveEra(date, value); + } } + /** + * See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening + * here. + */ public static Object convertToDate(Object date) { - LocalDate localDate; - if (date instanceof Date) { - localDate = ((Date) date).toLocalDate(); + if (date instanceof Date d) { + // Snapshot mode + LocalDate localDate = ((Date) date).toLocalDate(); + return resolveEra(d, localDate.toString()); + } else if (date instanceof LocalDate d) { + // Incremental mode + if (isBce(d)) { + d = d.minusYears(1); + } + return resolveEra(d, d.toString()); } else { - localDate = LocalDate.parse(date.toString()); + LocalDate localDate = LocalDate.parse(date.toString()); + return resolveEra(localDate, localDate.toString()); } - return resolveEra(localDate, localDate.toString()); } public static String convertToTime(Object time) { @@ -59,23 +119,4 @@ public static String convertToTime(Object time) { return localTime.format(TIME_FORMATTER); } - /** - * This method converts '-YYYY-mm-dd' to 'YYYY-mm-dd BC' if date is before current era. - * - * @param date local date - * @param value formatted date - * @return value with era indicator if date is before current era - */ - public static String resolveEra(LocalDate date, String value) { - return isBCE(date) ? value.substring(1) + " BC" : value; - } - - private static Instant toInstant(Object timestamp) { - if (timestamp instanceof Timestamp) { - return ((Timestamp) timestamp).toInstant(); - } else { - return Instant.parse(timestamp.toString()); - } - } - } diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index 4df42d5f8f6b..e3b7889a2c84 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -112,8 +112,6 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x); case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone(x); case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp(x); - // Debezium doesn't handle era indicators - // https://github.com/airbytehq/airbyte/issues/14590 case "DATE" -> DateTimeConverter.convertToDate(x); case "TIME" -> DateTimeConverter.convertToTime(x); case "INTERVAL" -> convertInterval((PGInterval) x); diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 43c49007b434..0f6f37e5b26e 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.38 +LABEL io.airbyte.version=0.4.40 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 8b5e5df204ca..5d19399e67e2 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.39 +LABEL io.airbyte.version=0.4.40 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java index 874bb310b815..2e47c6bc1487 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.postgres; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.debezium.internals.PostgresConverter; import java.util.Properties; public class PostgresCdcProperties { @@ -27,7 +28,7 @@ private static Properties commonProperties() { props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); props.setProperty("converters", "datetime"); - props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.PostgresConverter"); + props.setProperty("datetime.type", PostgresConverter.class.getName()); return props; } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 953a948baff7..8e9be8666cee 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -23,7 +23,6 @@ import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; -import java.math.RoundingMode; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -215,7 +214,11 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob @Override protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalDate date = getObject(resultSet, index, LocalDate.class); + LocalDate date = getObject(resultSet, index, LocalDate.class); + if (isBce(date)) { + // java.time uses a year 0, but the standard AD/BC system does not. So we just subtract one to hack around this difference. + date = date.minusYears(1); + } node.put(columnName, resolveEra(date, date.toString())); } @@ -290,6 +293,7 @@ protected void putObject(final ObjectNode node, node.put(columnName, object.getValue()); } + @Override protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) { final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index)); if (bigDecimal != null) { diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java index 909f08627997..3f85a5f3c637 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -180,8 +180,8 @@ protected void initTests() { TestDataHolder.builder() .sourceType("date") .airbyteType(JsonSchemaType.STRING_DATE) - .addInsertValues("'1999-01-08'", /* "'1991-02-10 BC'",*/ "null") - .addExpectedValues("1999-01-08", /* "1990-02-10 BC", */ null) + .addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "null") + .addExpectedValues("1999-01-08", "1991-02-10 BC", null) .build()); for (final String type : Set.of("double precision", "float", "float8")) { @@ -420,8 +420,23 @@ protected void initTests() { .sourceType("timestamp") .fullSourceDataType(fullSourceType) .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) - .addInsertValues("TIMESTAMP '2004-10-19 10:23:00'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null") - .addExpectedValues("2004-10-19T10:23:00.000000", "2004-10-19T10:23:54.123456", null) + .addInsertValues( + "TIMESTAMP '2004-10-19 10:23:00'", + "TIMESTAMP '2004-10-19 10:23:54.123456'", + // A random BCE date. Old enough that converting it to/from an Instant results in discrepancies from inconsistent leap year handling + "TIMESTAMP '3004-10-19 10:23:54.123456 BC'", + // The earliest possible timestamp in CE + "TIMESTAMP '0001-01-01 00:00:00.000000'", + // The last possible timestamp in BCE + "TIMESTAMP '0001-12-31 23:59:59.999999 BC'", + "null") + .addExpectedValues( + "2004-10-19T10:23:00.000000", + "2004-10-19T10:23:54.123456", + "3004-10-19T10:23:54.123456 BC", + "0001-01-01T00:00:00.000000", + "0001-12-31T23:59:59.999999 BC", + null) .build()); } @@ -432,9 +447,24 @@ protected void initTests() { .sourceType("timestamptz") .fullSourceDataType(fullSourceType) .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE) - .addInsertValues("TIMESTAMP '2004-10-19 10:23:00-08'", "TIMESTAMP '2004-10-19 10:23:54.123456-08'", "null") - // 2004-10-19T10:23:54Z-8 = 2004-10-19T17:23:54Z - .addExpectedValues("2004-10-19T17:23:00.000000Z", "2004-10-19T17:23:54.123456Z", null) + .addInsertValues( + // 10:23-08 == 18:23Z + "TIMESTAMP WITH TIME ZONE '2004-10-19 10:23:00-08'", + "TIMESTAMP WITH TIME ZONE '2004-10-19 10:23:54.123456-08'", + // A random BCE date. Old enough that converting it to/from an Instant results in discrepancies from inconsistent leap year handling + "TIMESTAMP WITH TIME ZONE '3004-10-19 10:23:54.123456-08 BC'", + // The earliest possible timestamp in CE (16:00-08 == 00:00Z) + "TIMESTAMP WITH TIME ZONE '0001-12-31 16:00:00.000000-08 BC'", + // The last possible timestamp in BCE (15:59-08 == 23:59Z) + "TIMESTAMP WITH TIME ZONE '0001-12-31 15:59:59.999999-08 BC'", + "null") + .addExpectedValues( + "2004-10-19T18:23:00.000000Z", + "2004-10-19T18:23:54.123456Z", + "3004-10-19T18:23:54.123456Z BC", + "0001-01-01T00:00:00.000000Z", + "0001-12-31T23:59:59.999999Z BC", + null) .build()); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java index 8f8ae91cd7b5..f6b980e42c21 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java @@ -94,4 +94,7 @@ protected void tearDown(final TestDestinationEnv testEnv) { container.close(); } + public boolean testCatalog() { + return true; + } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java index 34ff8d39ebea..370ff22f893f 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java @@ -13,7 +13,6 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.integrations.util.HostPortResolver; -import io.airbyte.protocol.models.JsonSchemaType; import java.sql.SQLException; import org.jooq.SQLDialect; import org.testcontainers.containers.PostgreSQLContainer; diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 136f9d14d302..477ff580b5d5 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -354,7 +354,8 @@ Possible solutions include: | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------| -| 0.4.39 | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers | +| 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps | +| unpublished | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers | | 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | | 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected | | 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable | From 12e96a423bad2bf02bcb10739998cd5fc415db9e Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Wed, 3 Aug 2022 18:36:59 +0000 Subject: [PATCH 9/9] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 699a1ce2f9a7..9c902a016ed7 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -762,7 +762,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 0.4.39 + dockerImageTag: 0.4.40 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 02e89b721df2..86d653b423f4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7134,7 +7134,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:0.4.39" +- dockerImage: "airbyte/source-postgres:0.4.40" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: