diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index ea4910c16518..435f7281a93d 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -253,17 +253,17 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection, return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName; } - protected DateTime getDateTimeObject(ResultSet resultSet, int index, Class clazz) throws SQLException { + protected ObjectType getObject(ResultSet resultSet, int index, Class clazz) throws SQLException { return resultSet.getObject(index, clazz); } protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class); + OffsetTime timetz = getObject(resultSet, index, OffsetTime.class); node.put(columnName, timetz.format(TIMETZ_FORMATTER)); } protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class); + OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class); LocalDate localDate = timestamptz.toLocalDate(); node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER))); } diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java new file mode 100644 index 000000000000..6b10e661f4f5 --- /dev/null +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER; +import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBCE; + +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +public class DateTimeConverter { + + public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern( + "HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]"); + + public static String convertToTimeWithTimezone(String time) { + OffsetTime timetz = OffsetTime.parse(time, TIME_WITH_TIMEZONE_FORMATTER); + return timetz.format(TIMETZ_FORMATTER); + } + + public static String convertToTimestampWithTimezone(Timestamp timestamp) { + OffsetDateTime timestamptz = OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneOffset.UTC); + LocalDate localDate = timestamptz.toLocalDate(); + return resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)); + } + + public static String convertToTimestamp(Timestamp timestamp) { + final LocalDateTime localDateTime = LocalDateTime.ofInstant(timestamp.toInstant(), + ZoneOffset.UTC); + final LocalDate date = localDateTime.toLocalDate(); + return resolveEra(date, localDateTime.format(TIMESTAMP_FORMATTER)); + } + + public static Object convertToDate(Date date) { + LocalDate localDate = date.toLocalDate(); + return resolveEra(localDate, localDate.toString()); + } + + public static String convertToTime(String time) { + LocalTime localTime = LocalTime.parse(time); + return localTime.format(TIME_FORMATTER); + } + + public static String resolveEra(LocalDate date, String value) { + return isBCE(date) ? value.substring(1) + " BC" : value; + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index aee741b6aaca..570186af3dea 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -4,12 +4,26 @@ package io.airbyte.integrations.debezium.internals; +import static io.airbyte.protocol.models.JsonSchemaType.AIRBYTE_TYPE; +import static io.airbyte.protocol.models.JsonSchemaType.DATE; +import static io.airbyte.protocol.models.JsonSchemaType.DATE_TIME; +import static io.airbyte.protocol.models.JsonSchemaType.FORMAT; +import static io.airbyte.protocol.models.JsonSchemaType.TIME; +import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE; +import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITH_TIMEZONE; +import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITHOUT_TIMEZONE; +import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITH_TIMEZONE; + import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Timestamp; import java.util.Arrays; +import java.util.Locale; import java.util.Properties; +import org.apache.commons.codec.binary.Hex; import org.apache.kafka.connect.data.SchemaBuilder; import org.postgresql.util.PGInterval; import org.slf4j.Logger; @@ -19,12 +33,14 @@ public class PostgresConverter implements CustomConverter s.equalsIgnoreCase(field.typeName()))) { registerMoney(field, registration); + } else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) { + registerBytea(field, registration); + } else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerNumber(field, registration); } } + private void registerNumber(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.float64().optional(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + return new BigDecimal(x.toString()).doubleValue(); + }); + } + + private void registerBytea(final RelationalColumn field, final ConverterRegistration registration) { + registration.register(SchemaBuilder.string().optional(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + return "\\x" + Hex.encodeHexString((byte[]) x); + }); + } + private void registerText(final RelationalColumn field, final ConverterRegistration registration) { registration.register(SchemaBuilder.string().optional(), x -> { if (x == null) { @@ -57,14 +95,23 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat } private void registerDate(final RelationalColumn field, final ConverterRegistration registration) { - registration.register(SchemaBuilder.string().optional(), x -> { + final var fieldType = field.typeName(); + + registration.register(getJsonSchema(fieldType).optional(), x -> { if (x == null) { return DebeziumConverterUtils.convertDefaultValue(field); - } else if (x instanceof PGInterval) { - return convertInterval((PGInterval) x); - } else { - return DebeziumConverterUtils.convertDate(x); } + return switch (fieldType.toUpperCase(Locale.ROOT)) { + case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x.toString()); + case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone((Timestamp) x); + case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp((Timestamp) x); + // Debezium doesn't handle era indicators + // https://github.com/airbytehq/airbyte/issues/14590 + case "DATE" -> DateTimeConverter.convertToDate((Date) x); + case "TIME" -> DateTimeConverter.convertToTime(x.toString()); + case "INTERVAL" -> convertInterval((PGInterval) x); + default -> DebeziumConverterUtils.convertDate(x); + }; }); } @@ -84,11 +131,7 @@ private void registerMoney(final RelationalColumn field, final ConverterRegistra return DebeziumConverterUtils.convertDefaultValue(field); } else if (x instanceof Double) { final BigDecimal result = BigDecimal.valueOf((Double) x); - if (result.compareTo(new BigDecimal("999999999999999")) == 1 - || result.compareTo(new BigDecimal("-999999999999999")) == -1) { - return null; - } - return result.toString(); + return Double.toString(result.doubleValue()); } else { return x.toString(); } @@ -131,4 +174,15 @@ private boolean isNegativeTime(final PGInterval pgInterval) { || pgInterval.getWholeSeconds() < 0; } + private SchemaBuilder getJsonSchema(final String fieldType) { + return switch (fieldType.toUpperCase(Locale.ROOT)) { + case "TIMETZ" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITH_TIMEZONE); + case "TIMESTAMPTZ" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITH_TIMEZONE); + case "TIMESTAMP" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITHOUT_TIMEZONE); + case "DATE" -> SchemaBuilder.string().parameter(FORMAT, DATE); + case "TIME" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITHOUT_TIMEZONE); + default -> SchemaBuilder.string(); + }; + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 8984fa088b1f..e99fd14ab2de 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -23,6 +23,7 @@ import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; +import java.math.RoundingMode; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -35,7 +36,15 @@ import java.time.OffsetTime; import java.time.format.DateTimeParseException; import java.util.Collections; +import org.postgresql.geometric.PGbox; +import org.postgresql.geometric.PGcircle; +import org.postgresql.geometric.PGline; +import org.postgresql.geometric.PGlseg; +import org.postgresql.geometric.PGpath; +import org.postgresql.geometric.PGpoint; +import org.postgresql.geometric.PGpolygon; import org.postgresql.jdbc.PgResultSetMetaData; +import org.postgresql.util.PGobject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,6 +182,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex); case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex); + case "circle" -> putObject(json, columnName, resultSet, colIndex, PGcircle.class); + case "box" -> putObject(json, columnName, resultSet, colIndex, PGbox.class); + case "double precision", "float", "float8" -> putFloat8(json, columnName, resultSet, colIndex); + case "line" -> putObject(json, columnName, resultSet, colIndex, PGline.class); + case "lseg" -> putObject(json, columnName, resultSet, colIndex, PGlseg.class); + case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class); + case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class); + case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class); default -> { switch (columnType) { case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); @@ -198,19 +215,19 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob @Override protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class); + final LocalDate date = getObject(resultSet, index, LocalDate.class); node.put(columnName, resolveEra(date, date.toString())); } @Override protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class); + final LocalTime time = getObject(resultSet, index, LocalTime.class); node.put(columnName, time.format(TIME_FORMATTER)); } @Override protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); + final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class); final LocalDate date = timestamp.toLocalDate(); node.put(columnName, resolveEra(date, timestamp.format(TIMESTAMP_FORMATTER))); } @@ -261,10 +278,30 @@ protected void putBoolean(final ObjectNode node, final String columnName, final node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t")); } + protected void putFloat8(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) + throws SQLException { + final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index)); + if (bigDecimal != null) { + node.put(columnName, bigDecimal.setScale(resultSet.getMetaData().getScale(index), RoundingMode.HALF_EVEN).doubleValue()); + } else { + node.put(columnName, (BigDecimal) null); + } + } + + protected void putObject(final ObjectNode node, + final String columnName, + final ResultSet resultSet, + final int index, + Class clazz) + throws SQLException { + final T object = getObject(resultSet, index, clazz); + node.put(columnName, object.getValue()); + } + protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) { final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index)); if (bigDecimal != null) { - node.put(columnName, bigDecimal); + node.put(columnName, bigDecimal.doubleValue()); } else { // Special values (Infinity, -Infinity, and NaN) is default to null for now. // https://github.com/airbytehq/airbyte/issues/8902 diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java new file mode 100644 index 000000000000..f89bcd4c2e5b --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -0,0 +1,526 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTest; +import io.airbyte.integrations.standardtest.source.TestDataHolder; +import io.airbyte.protocol.models.JsonSchemaType; +import java.util.Set; +import org.jooq.DSLContext; +import org.testcontainers.containers.PostgreSQLContainer; + +public abstract class AbstractPostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest { + + protected PostgreSQLContainer container; + protected JsonNode config; + protected DSLContext dslContext; + protected static final String SCHEMA_NAME = "test"; + + @Override + protected String getNameSpace() { + return SCHEMA_NAME; + } + + @Override + protected String getImageName() { + return "airbyte/source-postgres:dev"; + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + public boolean testCatalog() { + return true; + } + + // Test cases are sorted alphabetically based on the source type + // See https://www.postgresql.org/docs/14/datatype.html + @Override + protected void initTests() { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bigint") + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("-9223372036854775808", "9223372036854775807", "0", "null") + .addExpectedValues("-9223372036854775808", "9223372036854775807", "0", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bigserial") + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("1", "9223372036854775807", "0", "-9223372036854775808") + .addExpectedValues("1", "9223372036854775807", "0", "-9223372036854775808") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bit") + .fullSourceDataType("BIT(1)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("B'0'") + .addExpectedValues("0") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bit") + .fullSourceDataType("BIT(3)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("B'101'") + .addExpectedValues("101") + .build()); + + for (final String type : Set.of("bit varying", "varbit")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bit_varying") + .fullSourceDataType("BIT VARYING(5)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("B'101'", "null") + .addExpectedValues("101", null) + .build()); + } + + for (final String type : Set.of("boolean", "bool")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.BOOLEAN) + .addInsertValues("true", "'yes'", "'1'", "false", "'no'", "'0'", "null") + .addExpectedValues("true", "true", "true", "false", "false", "false", null) + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("box") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") + .addExpectedValues("(15.0,18.0),(3.0,7.0)", "(0.0,0.0),(0.0,0.0)", null) + .build()); + + // bytea stores variable length binary string + // https://www.postgresql.org/docs/14/datatype-binary.html + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bytea") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "decode('1234', 'hex')", "'1234'", "'abcd'", "'\\xabcd'") + .addExpectedValues(null, "\\x1234", "\\x31323334", "\\x61626364", "\\xabcd") + .build()); + + for (final String type : Set.of("character", "char")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'a'", "'*'", "null") + .addExpectedValues("a", "*", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .fullSourceDataType(type + "(8)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'{asb123}'", "'{asb12}'") + .addExpectedValues("{asb123}", "{asb12} ") + .build()); + } + + for (final String type : Set.of("varchar", "text")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'a'", "'abc'", "'Миші йдуть на південь, не питай чому;'", "'櫻花分店'", + "''", "null", "'\\xF0\\x9F\\x9A\\x80'") + .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", + null, "\\xF0\\x9F\\x9A\\x80") + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("varchar") + .fullSourceDataType("character varying(10)") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'{asb123}'", "'{asb12}'") + .addExpectedValues("{asb123}", "{asb12}") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("cidr") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'192.168.100.128/25'", "'192.168/24'", "'192.168.1'", + "'128.1'", "'2001:4f8:3:ba::/64'") + .addExpectedValues(null, "192.168.100.128/25", "192.168.0.0/24", "192.168.1.0/24", + "128.1.0.0/16", "2001:4f8:3:ba::/64") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("circle") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'(5,7),10'", "'(0,0),0'", "'(-10,-4),10'", "null") + .addExpectedValues("<(5.0,7.0),10.0>", "<(0.0,0.0),0.0>", "<(-10.0,-4.0),10.0>", null) + .build()); + + // Debezium does not handle era indicators (AD nd BC) + // https://github.com/airbytehq/airbyte/issues/14590 + // addDataTypeTestData( + // TestDataHolder.builder() + // .sourceType("date") + // .airbyteType(JsonSchemaType.STRING_DATE) + // .addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "null") + // .addExpectedValues("1999-01-08", "1990-02-10 BC", null) + // .build()); + + for (final String type : Set.of("double precision", "float", "float8")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("'123'", "'1234567890.1234567'", "null") + // Postgres source does not support these special values yet + // https://github.com/airbytehq/airbyte/issues/8902 + // "'-Infinity'", "'Infinity'", "'NaN'", "null") + .addExpectedValues("123.0", "1.2345678901234567E9", null) + // "-Infinity", "Infinity", "NaN", null) + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("inet") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'198.24.10.0/24'", "'198.24.10.0'", "'198.10/8'", "null") + .addExpectedValues("198.24.10.0/24", "198.24.10.0", "198.10.0.0/8", null) + .build()); + + for (final String type : Set.of("integer", "int", "int4")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("null", "1001", "-2147483648", "2147483647") + .addExpectedValues(null, "1001", "-2147483648", "2147483647") + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("interval") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'P1Y2M3DT4H5M6S'", "'-178000000'", "'178000000'") + .addExpectedValues(null, "1 year 2 mons 3 days 04:05:06", "-49444:26:40", "49444:26:40") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("json") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'{\"a\": 10, \"b\": 15}'") + .addExpectedValues(null, "{\"a\": 10, \"b\": 15}") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("jsonb") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'[1, 2, 3]'::jsonb") + .addExpectedValues(null, "[1, 2, 3]") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("line") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'{4,5,6}'", "'{0,1,0}'", "null") + .addExpectedValues("{4.0,5.0,6.0}", "{0.0,1.0,0.0}", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("lseg") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") + .addExpectedValues("[(3.0,7.0),(15.0,18.0)]", "[(0.0,0.0),(0.0,0.0)]", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("macaddr") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'08:00:2b:01:02:03'", "'08-00-2b-01-02-04'", + "'08002b:010205'") + .addExpectedValues(null, "08:00:2b:01:02:03", "08:00:2b:01:02:04", "08:00:2b:01:02:05") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("macaddr8") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'08:00:2b:01:02:03:04:05'", "'08-00-2b-01-02-03-04-06'", + "'08002b:0102030407'") + .addExpectedValues(null, "08:00:2b:01:02:03:04:05", "08:00:2b:01:02:03:04:06", + "08:00:2b:01:02:03:04:07") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("money") + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues( + "null", + "'999.99'", "'1,001.01'", "'-1,000'", + "'$999.99'", "'$1001.01'", "'-$1,000'", + // max values for Money type: "-92233720368547758.08", "92233720368547758.07" + "'-92233720368547758.08'", "'92233720368547758.07'") + .addExpectedValues( + null, + // Double#toString method is necessary here because sometimes the output + // has unexpected decimals, e.g. Double.toString(-1000) is -1000.0 + "999.99", "1001.01", Double.toString(-1000), + "999.99", "1001.01", Double.toString(-1000), + Double.toString(-92233720368547758.08), Double.toString(92233720368547758.07)) + .build()); + + // Blocked by https://github.com/airbytehq/airbyte/issues/8902 + for (final String type : Set.of("numeric", "decimal")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues( + "'123'", "null", "'1234567890.1234567'") + // Postgres source does not support these special values yet + // https://github.com/airbytehq/airbyte/issues/8902 + // "'infinity'", "'-infinity'", "'nan'" + .addExpectedValues("123.0", null, "1.2345678901234567E9") + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("path") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") + .addExpectedValues("((3.0,7.0),(15.0,18.0))", "((0.0,0.0),(0.0,0.0))", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("pg_lsn") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'7/A25801C8'::pg_lsn", "'0/0'::pg_lsn", "null") + .addExpectedValues("7/A25801C8", "0/0", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("point") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'(3,7)'", "'(0,0)'", "'(999999999999999999999999,0)'", "null") + .addExpectedValues("(3.0,7.0)", "(0.0,0.0)", "(1.0E24,0.0)", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("polygon") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", + "'((0,0),(999999999999999999999999,0))'", "null") + .addExpectedValues("((3.0,7.0),(15.0,18.0))", "((0.0,0.0),(0.0,0.0))", "((0.0,0.0),(1.0E24,0.0))", null) + .build()); + + for (final String type : Set.of("real", "float4")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("null", "3.4145") + .addExpectedValues(null, "3.4145") + .build()); + } + + for (final String type : Set.of("smallint", "int2")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("null", "-32768", "32767") + .addExpectedValues(null, "-32768", "32767") + .build()); + } + + for (final String type : Set.of("smallserial", "serial2")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("1", "32767", "0", "-32767") + .addExpectedValues("1", "32767", "0", "-32767") + .build()); + } + + for (final String type : Set.of("serial", "serial4")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType(type) + .airbyteType(JsonSchemaType.NUMBER) + .addInsertValues("1", "2147483647", "0", "-2147483647") + .addExpectedValues("1", "2147483647", "0", "-2147483647") + .build()); + } + + // time without time zone + for (final String fullSourceType : Set.of("time", "time without time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("time") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE) + // time column will ignore time zone + .addInsertValues("null", "'13:00:01'", "'13:00:02+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.01234Z+8'", "'13:00:00Z-8'") + .addExpectedValues(null, "13:00:01.000000", "13:00:02.000000", "13:00:03.000000", "13:00:04.000000", "13:00:05.012340", + "13:00:00.000000") + .build()); + } + + // time with time zone + for (final String fullSourceType : Set.of("timetz", "time with time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timetz") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIME_WITH_TIMEZONE) + .addInsertValues("null", "'13:00:01'", "'13:00:00+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.012345Z+8'", "'13:00:06.00000Z-8'") + // A time value without time zone will use the time zone set on the database, which is Z-7, + // so 13:00:01 is returned as 13:00:01-07. + .addExpectedValues(null, "13:00:01.000000-07:00", "13:00:00.000000+08:00", "13:00:03.000000-08:00", "13:00:04.000000Z", + "13:00:05.012345-08:00", "13:00:06.000000+08:00") + .build()); + } + + // timestamp without time zone + for (final String fullSourceType : Set.of("timestamp", "timestamp without time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timestamp") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) + .addInsertValues("TIMESTAMP '2004-10-19 10:23:00'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null") + .addExpectedValues("2004-10-19T10:23:00.000000", "2004-10-19T10:23:54.123456", null) + .build()); + } + + // timestamp with time zone + for (final String fullSourceType : Set.of("timestamptz", "timestamp with time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timestamptz") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE) + .addInsertValues("TIMESTAMP '2004-10-19 10:23:00-08'", "TIMESTAMP '2004-10-19 10:23:54.123456-08'", "null") + // 2004-10-19T10:23:54Z-8 = 2004-10-19T17:23:54Z + .addExpectedValues("2004-10-19T17:23:00.000000Z", "2004-10-19T17:23:54.123456Z", null) + .build()); + } + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("tsquery") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery") + .addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("tsvector") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("to_tsvector('The quick brown fox jumped over the lazy dog.')") + .addExpectedValues("'brown':3 'dog':9 'fox':4 'jumped':5 'lazy':8 'over':6 'quick':2 'the':1,7") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("uuid") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'", "null") + .addExpectedValues("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("xml") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues( + "XMLPARSE (DOCUMENT 'Manual...')", + "null", "''") + .addExpectedValues("Manual...", null, "") + .build()); + + // enum type + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("mood") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'happy'", "null") + .addExpectedValues("happy", null) + .build()); + + // range + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("tsrange") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("'(2010-01-01 14:30, 2010-01-01 15:30)'", "null") + .addExpectedValues("(\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")", null) + .build()); + + // array + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("text") + .fullSourceDataType("text[]") + .airbyteType(JsonSchemaType.ARRAY) + .addInsertValues("'{10001, 10002, 10003, 10004}'", "null") + .addExpectedValues("[\"10001\",\"10002\",\"10003\",\"10004\"]", null) + .build()); + + // composite type + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("inventory_item") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null") + .addExpectedValues("(\"fuzzy dice\",42,1.99)", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("hstore") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues(""" + '"paperback" => "243","publisher" => "postgresqltutorial.com", + "language" => "English","ISBN-13" => "978-1449370000", + "weight" => "11.2 ounces"' + """, null) + .addExpectedValues( + """ + {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", + null) + .build()); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java index 52103a736076..2fd388d10310 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java @@ -10,29 +10,22 @@ import io.airbyte.db.Database; import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; -import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTest; -import io.airbyte.integrations.standardtest.source.TestDataHolder; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; -import io.airbyte.protocol.models.JsonSchemaType; import java.util.List; -import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; -public class CdcPostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest { +public class CdcPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest { private static final String SCHEMA_NAME = "test"; private static final String SLOT_NAME_BASE = "debezium_slot"; private static final String PUBLICATION = "publication"; - private PostgreSQLContainer container; - private JsonNode config; - private DSLContext dslContext; @Override protected Database setupDatabase() throws Exception { - container = new PostgreSQLContainer<>("postgres:13-alpine") + container = new PostgreSQLContainer<>("postgres:14-alpine") .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), "/etc/postgresql/postgresql.conf") .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); @@ -71,10 +64,11 @@ protected Database setupDatabase() throws Exception { final Database database = new Database(dslContext); database.query(ctx -> { - ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); + ctx.execute( + "SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); ctx.execute("CREATE EXTENSION hstore;"); - + ctx.execute("SET lc_monetary TO 'en_US.utf8';"); return null; }); @@ -90,473 +84,10 @@ protected Database setupDatabase() throws Exception { return database; } - @Override - protected String getNameSpace() { - return SCHEMA_NAME; - } - - @Override - protected String getImageName() { - return "airbyte/source-postgres:dev"; - } - - @Override - protected JsonNode getConfig() { - return config; - } - @Override protected void tearDown(final TestDestinationEnv testEnv) { dslContext.close(); container.close(); } - @Override - protected void initTests() { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bigint") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("-9223372036854775808", "9223372036854775807", "0", "null") - .addExpectedValues("-9223372036854775808", "9223372036854775807", "0", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bigserial") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("1", "9223372036854775807", "0", "-9223372036854775808") - .addExpectedValues("1", "9223372036854775807", "0", "-9223372036854775808") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("serial") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("1", "2147483647", "0", "-2147483647") - .addExpectedValues("1", "2147483647", "0", "-2147483647") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("smallserial") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("1", "32767", "0", "-32767") - .addExpectedValues("1", "32767", "0", "-32767") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit") - .fullSourceDataType("BIT(3)") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("B'101'", "B'111'", "null") - .addExpectedValues("101", "111", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit_varying") - .fullSourceDataType("BIT VARYING(5)") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("B'101'", "null") - .addExpectedValues("101", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("boolean") - .airbyteType(JsonSchemaType.BOOLEAN) - .addInsertValues("true", "'yes'", "'1'", "false", "'no'", "'0'", "null") - .addExpectedValues("true", "true", "true", "false", "false", "false", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bytea") - .airbyteType(JsonSchemaType.OBJECT) - .addInsertValues("decode('1234', 'hex')") - .addExpectedValues("EjQ=") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("character") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'*'", "null") - .addExpectedValues("a", "*", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("character") - .fullSourceDataType("character(8)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{asb123}'", "'{asb12}'") - .addExpectedValues("{asb123}", "{asb12} ") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("varchar") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'abc'", "'Миші йдуть на південь, не питай чому;'", "'櫻花分店'", - "''", "null", "'\\xF0\\x9F\\x9A\\x80'") - .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", - null, "\\xF0\\x9F\\x9A\\x80") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("varchar") - .fullSourceDataType("character(12)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'abc'", "'Миші йдуть;'", "'櫻花分店'", - "''", "null") - .addExpectedValues("a ", "abc ", "Миші йдуть; ", "櫻花分店 ", - " ", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("cidr") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'192.168.100.128/25'", "'192.168/24'", "'192.168.1'", - "'128.1'", "'2001:4f8:3:ba::/64'") - .addExpectedValues(null, "192.168.100.128/25", "192.168.0.0/24", "192.168.1.0/24", - "128.1.0.0/16", "2001:4f8:3:ba::/64") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("date") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'January 7, 1999'", "'1999-01-08'", "'1/9/1999'", "'January 10, 99 BC'", "'January 11, 99 AD'", "null") - .addExpectedValues("1999-01-07T00:00:00Z", "1999-01-08T00:00:00Z", "1999-01-09T00:00:00Z", "0099-01-10T00:00:00Z", "1999-01-11T00:00:00Z", - null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("float8") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'123'", "'1234567890.1234567'", "'-Infinity'", "'Infinity'", "'NaN'", "null") - .addExpectedValues("123.0", "1.2345678901234567E9", "-Infinity", "Infinity", "NaN", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("float") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'123'", "'1234567890.1234567'", "'-Infinity'", "'Infinity'", "'NaN'", "null") - .addExpectedValues("123.0", "1.2345678901234567E9", "-Infinity", "Infinity", "NaN", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("inet") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'198.24.10.0/24'", "'198.24.10.0'", "'198.10/8'", "null") - .addExpectedValues("198.24.10.0/24", "198.24.10.0", "198.10.0.0/8", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("int") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("null", "-2147483648", "2147483647") - .addExpectedValues(null, "-2147483648", "2147483647") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("interval") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'P1Y2M3DT4H5M6S'", "'PT4H5M6S'", "'-300'", "'-178000000'", - "'178000000'", "'1-2'", "'3 4:05:06'", "'P0002-02-03T04:05:06'") - .addExpectedValues(null, "1 year 2 mons 3 days 04:05:06", "04:05:06", "-00:05:00", "-49444:26:40", - "49444:26:40", "1 year 2 mons 00:00:00", "3 days 04:05:06", "2 year 2 mons 3 days 04:05:06") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("json") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'{\"a\": 10, \"b\": 15}'") - .addExpectedValues(null, "{\"a\": 10, \"b\": 15}") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("jsonb") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'[1, 2, 3]'::jsonb") - .addExpectedValues(null, "[1, 2, 3]") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("macaddr") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'08:00:2b:01:02:03'", "'08-00-2b-01-02-04'", - "'08002b:010205'") - .addExpectedValues(null, "08:00:2b:01:02:03", "08:00:2b:01:02:04", "08:00:2b:01:02:05") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("macaddr8") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'08:00:2b:01:02:03:04:05'", "'08-00-2b-01-02-03-04-06'", - "'08002b:0102030407'") - .addExpectedValues(null, "08:00:2b:01:02:03:04:05", "08:00:2b:01:02:03:04:06", - "08:00:2b:01:02:03:04:07") - .build()); - - // Max values for Money type should be: "-92233720368547758.08", "92233720368547758.07", - // debezium return rounded value for values more than 999999999999999 and less than - // -999999999999999, - // we map these value as null; - // opened issue https://github.com/airbytehq/airbyte/issues/7338 - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("money") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'999.99'", "'1,000.01'", "'-999999999999.99'", "'-999999999999999'", "'999999999999.99'", "'999999999999999'", - "'-92233720368547758.08'", "'92233720368547758.07'") - .addExpectedValues(null, "999.99", "1000.01", "-999999999999.99", "-999999999999999", "999999999999.99", "999999999999999", - null, null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("numeric") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'99999'", "'NAN'", "10000000000000000000000000000000000000", null) - .addExpectedValues("99999", "NAN", "10000000000000000000000000000000000000", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("decimal") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("99999", "5.1", "0", "'NAN'", "null") - .addExpectedValues("99999", "5.1", "0", "NAN", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("numeric") - .fullSourceDataType("numeric(13,4)") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("0.1880", "10.0000", "5213.3468", "'NAN'", "null") - .addExpectedValues("0.1880", "10.0000", "5213.3468", "NAN", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("smallint") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("null", "-32768", "32767") - .addExpectedValues(null, "-32768", "32767") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("text") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'abc'", "'Миші йдуть;'", "'櫻花分店'", - "''", "null", "'\\xF0\\x9F\\x9A\\x80'") - .addExpectedValues("a", "abc", "Миші йдуть;", "櫻花分店", "", null, "\\xF0\\x9F\\x9A\\x80") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("time") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'04:05:06'", "'2021-04-12 05:06:07'", "'04:05 PM'") - .addExpectedValues(null, "04:05:06", "05:06:07", "16:05:00") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timetz") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'04:05:06+03'", "'2021-04-12 05:06:07+00'", "'060708-03'") - .addExpectedValues(null, "04:05:06+03", "05:06:07+00", "06:07:08-03") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamp") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("TIMESTAMP '2004-10-19 10:23:54'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null") - .addExpectedValues("2004-10-19T10:23:54.000000Z", "2004-10-19T10:23:54.123456Z", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamptz") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("TIMESTAMP WITH TIME ZONE '2004-10-19 10:23:54+03'", "TIMESTAMP WITH TIME ZONE '2004-10-19 10:23:54.123456+03'", "null") - .addExpectedValues("2004-10-19T07:23:54Z", "2004-10-19T07:23:54.123456Z", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsvector") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("to_tsvector('The quick brown fox jumped over the lazy dog.')") - .addExpectedValues("'brown':3 'dog':9 'fox':4 'jumped':5 'lazy':8 'over':6 'quick':2 'the':1,7") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("uuid") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'", "null") - .addExpectedValues("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("xml") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues( - "XMLPARSE (DOCUMENT 'Manual...')", - "null", "''") - .addExpectedValues("Manual...", null, "") - .build()); - - // preconditions for this test are set at the time of database creation (setupDatabase method) - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("mood") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'happy'", "null") - .addExpectedValues("happy", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("text") - .fullSourceDataType("text[]") - .airbyteType(JsonSchemaType.ARRAY) - .addInsertValues("'{10000, 10000, 10000, 10000}'", "null") - .addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("inventory_item") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null") - .addExpectedValues("(\"fuzzy dice\",42,1.99)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsrange") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(2010-01-01 14:30, 2010-01-01 15:30)'", "null") - .addExpectedValues("(\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("box") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("(15.0,18.0),(3.0,7.0)", "(0.0,0.0),(0.0,0.0)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("circle") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(5,7),10'", "'(0,0),0'", "'(-10,-4),10'", "null") - .addExpectedValues("<(5.0,7.0),10.0>", "<(0.0,0.0),0.0>", "<(-10.0,-4.0),10.0>", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("line") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{4,5,6}'", "'{0,1,0}'", "null") - .addExpectedValues("{4.0,5.0,6.0}", "{0.0,1.0,0.0}", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("lseg") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("[(3.0,7.0),(15.0,18.0)]", "[(0.0,0.0),(0.0,0.0)]", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("path") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15.5,18.2))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("((3.0,7.0),(15.5,18.2))", "((0.0,0.0),(0.0,0.0))", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("point") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'(3,7)'", "'(0,0)'", "'(999999999999999999999999,0)'", "null") - .addExpectedValues("(3.0,7.0)", "(0.0,0.0)", "(1.0E24,0.0)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("polygon") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", - "'((0,0),(999999999999999999999999,0))'", "null") - .addExpectedValues("((3.0,7.0),(15.0,18.0))", "((0.0,0.0),(0.0,0.0))", "((0.0,0.0),(1.0E24,0.0))", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("real") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'123'", "'1234567890.1234567'", "null") - .addExpectedValues("123.0", "1.23456794E9", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsvector") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("to_tsvector('The quick brown fox jumped over the lazy dog.')") - .addExpectedValues("'brown':3 'dog':9 'fox':4 'jumped':5 'lazy':8 'over':6 'quick':2 'the':1,7") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsquery") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery") - .addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("hstore") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues(""" - '"paperback" => "243","publisher" => "postgresqltutorial.com", - "language" => "English","ISBN-13" => "978-1449370000", - "weight" => "11.2 ounces"' - """, null) - .addExpectedValues( - """ - {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", - null) - .build()); - } - } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java index 339c8011736d..937de392c371 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java @@ -10,26 +10,20 @@ import io.airbyte.db.Database; import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; -import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTest; -import io.airbyte.integrations.standardtest.source.TestDataHolder; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; -import io.airbyte.protocol.models.JsonSchemaType; import java.sql.SQLException; -import java.util.Set; -import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.MountableFile; -public class PostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest { - - private PostgreSQLContainer container; - private JsonNode config; - private DSLContext dslContext; - private static final String SCHEMA_NAME = "test"; +public class PostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest { @Override protected Database setupDatabase() throws SQLException { - container = new PostgreSQLContainer<>("postgres:14-alpine"); + container = new PostgreSQLContainer<>("postgres:14-alpine") + .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), + "/etc/postgresql/postgresql.conf") + .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); container.start(); final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("method", "Standard") @@ -75,21 +69,6 @@ protected Database setupDatabase() throws SQLException { return database; } - @Override - protected String getNameSpace() { - return SCHEMA_NAME; - } - - @Override - protected String getImageName() { - return "airbyte/source-postgres:dev"; - } - - @Override - protected JsonNode getConfig() { - return config; - } - @Override protected void tearDown(final TestDestinationEnv testEnv) { dslContext.close(); @@ -101,485 +80,4 @@ public boolean testCatalog() { return true; } - // Test cases are sorted alphabetically based on the source type - // See https://www.postgresql.org/docs/14/datatype.html - @Override - protected void initTests() { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bigint") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("-9223372036854775808", "9223372036854775807", "0", "null") - .addExpectedValues("-9223372036854775808", "9223372036854775807", "0", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bigserial") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("1", "9223372036854775807", "0", "-9223372036854775808") - .addExpectedValues("1", "9223372036854775807", "0", "-9223372036854775808") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit") - .fullSourceDataType("BIT(1)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("B'0'") - .addExpectedValues("0") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit") - .fullSourceDataType("BIT(3)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("B'101'") - .addExpectedValues("101") - .build()); - - for (final String type : Set.of("bit varying", "varbit")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bit_varying") - .fullSourceDataType("BIT VARYING(5)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("B'101'", "null") - .addExpectedValues("101", null) - .build()); - } - - for (final String type : Set.of("boolean", "bool")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.BOOLEAN) - .addInsertValues("true", "'yes'", "'1'", "false", "'no'", "'0'", "null") - .addExpectedValues("true", "true", "true", "false", "false", "false", null) - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("box") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("(15,18),(3,7)", "(0,0),(0,0)", null) - .build()); - - // bytea stores variable length binary string - // https://www.postgresql.org/docs/14/datatype-binary.html - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("bytea") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "decode('1234', 'hex')", "'1234'", "'abcd'", "'\\xabcd'") - .addExpectedValues(null, "\\x1234", "\\x31323334", "\\x61626364", "\\xabcd") - .build()); - - for (final String type : Set.of("character", "char")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'*'", "null") - .addExpectedValues("a", "*", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .fullSourceDataType(type + "(8)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{asb123}'", "'{asb12}'") - .addExpectedValues("{asb123}", "{asb12} ") - .build()); - } - - for (final String type : Set.of("varchar", "text")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a'", "'abc'", "'Миші йдуть на південь, не питай чому;'", "'櫻花分店'", - "''", "null", "'\\xF0\\x9F\\x9A\\x80'") - .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", - null, "\\xF0\\x9F\\x9A\\x80") - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("varchar") - .fullSourceDataType("character varying(10)") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{asb123}'", "'{asb12}'") - .addExpectedValues("{asb123}", "{asb12}") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("cidr") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'192.168.100.128/25'", "'192.168/24'", "'192.168.1'", - "'128.1'", "'2001:4f8:3:ba::/64'") - .addExpectedValues(null, "192.168.100.128/25", "192.168.0.0/24", "192.168.1.0/24", - "128.1.0.0/16", "2001:4f8:3:ba::/64") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("circle") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(5,7),10'", "'(0,0),0'", "'(-10,-4),10'", "null") - .addExpectedValues("<(5,7),10>", "<(0,0),0>", "<(-10,-4),10>", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("date") - .airbyteType(JsonSchemaType.STRING_DATE) - .addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "null") - .addExpectedValues("1999-01-08", "1990-02-10 BC", null) - .build()); - - for (final String type : Set.of("double precision", "float", "float8")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues( - "null", "'123'", "'1234567890.1234567'", - // Postgres source does not support these special values yet - // https://github.com/airbytehq/airbyte/issues/8902 - "'infinity'", "'-infinity'", "'nan'") - .addExpectedValues(null, "123.0", "1.2345678901234567E9", null, null, null) - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("inet") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'198.24.10.0/24'", "'198.24.10.0'", "'198.10/8'", "null") - .addExpectedValues("198.24.10.0/24", "198.24.10.0", "198.10.0.0/8", null) - .build()); - - for (final String type : Set.of("integer", "int", "int4")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("null", "1001", "-2147483648", "2147483647") - .addExpectedValues(null, "1001", "-2147483648", "2147483647") - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("interval") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'P1Y2M3DT4H5M6S'", "'-178000000'", "'178000000'") - .addExpectedValues(null, "1 year 2 mons 3 days 04:05:06", "-49444:26:40", "49444:26:40") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("json") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'{\"a\": 10, \"b\": 15}'") - .addExpectedValues(null, "{\"a\": 10, \"b\": 15}") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("jsonb") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'[1, 2, 3]'::jsonb") - .addExpectedValues(null, "[1, 2, 3]") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("line") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'{4,5,6}'", "'{0,1,0}'", "null") - .addExpectedValues("{4,5,6}", "{0,1,0}", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("lseg") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("[(3,7),(15,18)]", "[(0,0),(0,0)]", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("macaddr") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'08:00:2b:01:02:03'", "'08-00-2b-01-02-04'", - "'08002b:010205'") - .addExpectedValues(null, "08:00:2b:01:02:03", "08:00:2b:01:02:04", "08:00:2b:01:02:05") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("macaddr8") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'08:00:2b:01:02:03:04:05'", "'08-00-2b-01-02-03-04-06'", - "'08002b:0102030407'") - .addExpectedValues(null, "08:00:2b:01:02:03:04:05", "08:00:2b:01:02:03:04:06", - "08:00:2b:01:02:03:04:07") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("money") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues( - "null", - "'999.99'", "'1,001.01'", "'-1,000'", - "'$999.99'", "'$1001.01'", "'-$1,000'", - // max values for Money type: "-92233720368547758.08", "92233720368547758.07" - "'-92233720368547758.08'", "'92233720368547758.07'") - .addExpectedValues( - null, - // Double#toString method is necessary here because sometimes the output - // has unexpected decimals, e.g. Double.toString(-1000) is -1000.0 - "999.99", "1001.01", Double.toString(-1000), - "999.99", "1001.01", Double.toString(-1000), - Double.toString(-92233720368547758.08), Double.toString(92233720368547758.07)) - .build()); - - for (final String type : Set.of("numeric", "decimal")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues( - "'123'", "null", "'1234567890.1234567'", - // Postgres source does not support these special values yet - // https://github.com/airbytehq/airbyte/issues/8902 - "'infinity'", "'-infinity'", "'nan'") - .addExpectedValues("123", null, "1.2345678901234567E9", null, null, null) - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("path") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null") - .addExpectedValues("((3,7),(15,18))", "((0,0),(0,0))", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("pg_lsn") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'7/A25801C8'::pg_lsn", "'0/0'::pg_lsn", "null") - .addExpectedValues("7/A25801C8", "0/0", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("point") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(3,7)'", "'(0,0)'", "'(999999999999999999999999,0)'", "null") - .addExpectedValues("(3,7)", "(0,0)", "(1e+24,0)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("polygon") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", - "'((0,0),(999999999999999999999999,0))'", "null") - .addExpectedValues("((3,7),(15,18))", "((0,0),(0,0))", "((0,0),(1e+24,0))", null) - .build()); - - for (final String type : Set.of("real", "float4")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("null", "3.4145") - .addExpectedValues(null, "3.4145") - .build()); - } - - for (final String type : Set.of("smallint", "int2")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("null", "-32768", "32767") - .addExpectedValues(null, "-32768", "32767") - .build()); - } - - for (final String type : Set.of("smallserial", "serial2")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("1", "32767", "0", "-32767") - .addExpectedValues("1", "32767", "0", "-32767") - .build()); - } - - for (final String type : Set.of("serial", "serial4")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType(type) - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("1", "2147483647", "0", "-2147483647") - .addExpectedValues("1", "2147483647", "0", "-2147483647") - .build()); - } - - // time without time zone - for (final String fullSourceType : Set.of("time", "time without time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("time") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE) - // time column will ignore time zone - .addInsertValues("null", "'13:00:01'", "'13:00:02+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.01234Z+8'", "'13:00:00Z-8'") - .addExpectedValues(null, "13:00:01.000000", "13:00:02.000000", "13:00:03.000000", "13:00:04.000000", "13:00:05.012340", - "13:00:00.000000") - .build()); - } - - // time with time zone - for (final String fullSourceType : Set.of("timetz", "time with time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timetz") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIME_WITH_TIMEZONE) - .addInsertValues("null", "'13:00:01'", "'13:00:00+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.012345Z+8'", "'13:00:06.00000Z-8'") - // A time value without time zone will use the time zone set on the database, which is Z-7, - // so 13:00:01 is returned as 13:00:01-07. - .addExpectedValues(null, "13:00:01.000000-07:00", "13:00:00.000000+08:00", "13:00:03.000000-08:00", "13:00:04.000000Z", - "13:00:05.012345-08:00", "13:00:06.000000+08:00") - .build()); - } - - // timestamp without time zone - for (final String fullSourceType : Set.of("timestamp", "timestamp without time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamp") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) - .addInsertValues("TIMESTAMP '2004-10-19 10:23:00'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null") - .addExpectedValues("2004-10-19T10:23:00.000000", "2004-10-19T10:23:54.123456", null) - .build()); - } - - // timestamp with time zone - for (final String fullSourceType : Set.of("timestamptz", "timestamp with time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamptz") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE) - .addInsertValues("TIMESTAMP '2004-10-19 10:23:00-08'", "TIMESTAMP '2004-10-19 10:23:54.123456-08'", "null") - // 2004-10-19T10:23:54Z-8 = 2004-10-19T17:23:54Z - .addExpectedValues("2004-10-19T17:23:00.000000Z", "2004-10-19T17:23:54.123456Z", null) - .build()); - } - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsquery") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery") - .addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsvector") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("to_tsvector('The quick brown fox jumped over the lazy dog.')") - .addExpectedValues("'brown':3 'dog':9 'fox':4 'jump':5 'lazi':8 'quick':2") - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("uuid") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'", "null") - .addExpectedValues("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("xml") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues( - "XMLPARSE (DOCUMENT 'Manual...')", - "null", "''") - .addExpectedValues("Manual...", null, "") - .build()); - - // enum type - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("mood") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'happy'", "null") - .addExpectedValues("happy", null) - .build()); - - // range - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("tsrange") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'(2010-01-01 14:30, 2010-01-01 15:30)'", "null") - .addExpectedValues("(\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")", null) - .build()); - - // array - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("text") - .fullSourceDataType("text[]") - .airbyteType(JsonSchemaType.ARRAY) - .addInsertValues("'{10001, 10002, 10003, 10004}'", "null") - .addExpectedValues("[\"10001\",\"10002\",\"10003\",\"10004\"]", null) - .build()); - - // composite type - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("inventory_item") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null") - .addExpectedValues("(\"fuzzy dice\",42,1.99)", null) - .build()); - - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("hstore") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues(""" - '"paperback" => "243","publisher" => "postgresqltutorial.com", - "language" => "English","ISBN-13" => "978-1449370000", - "weight" => "11.2 ounces"' - """, null) - .addExpectedValues( - """ - {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", - null) - .build()); - } - } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 58090488a5d1..1677af9a12fc 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -267,7 +267,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | `character varying`, `varchar` | string | | | `cidr` | string | | | `circle` | string | | -| `date` | string | Parsed as ISO8601 date time at midnight | +| `date` | string | Parsed as ISO8601 date time at midnight. CDC mode doesn't support era indicators. Issue: [#14590](https://github.com/airbytehq/airbyte/issues/14590) | | `double precision`, `float`, `float8` | number | `Infinity`, `-Infinity`, and `NaN` are not supported and converted to `null`. Issue: [#8902](https://github.com/airbytehq/airbyte/issues/8902). | | `hstore` | string | | | `inet` | string | | @@ -291,9 +291,9 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | `serial`, `serial4` | number | | | `text` | string | | | `time` | string | Parsed as a time string without a time-zone in the ISO-8601 calendar system. | -| `timetz` | string | Parsed as a time string with time-zone in the ISO-8601 calendar system. | +| `timetz` | string | Parsed as a time string with time-zone in the ISO-8601 calendar system. | | `timestamp` | string | Parsed as a date-time string without a time-zone in the ISO-8601 calendar system. | -| `timestamptz` | string | Parsed as a date-time string with time-zone in the ISO-8601 calendar system. | +| `timestamptz` | string | Parsed as a date-time string with time-zone in the ISO-8601 calendar system. | | `tsquery` | string | | | `tsvector` | string | | | `uuid` | string | |