diff --git a/lib/postgres/cast.go b/lib/postgres/cast.go index 0cb5dce4..a7a9b227 100644 --- a/lib/postgres/cast.go +++ b/lib/postgres/cast.go @@ -34,7 +34,8 @@ func castColumn(col schema.Column) string { schema.UserDefinedText, schema.Text, schema.Money, schema.VariableNumeric, schema.Numeric, schema.Boolean, schema.Bit, - schema.Date, schema.Timestamp, schema.HStore, schema.JSON: + schema.Date, schema.Timestamp, schema.HStore, schema.JSON, + schema.Point, schema.Geography, schema.Geometry: // These are all the columns that do not need to be escaped. return colName default: diff --git a/sources/postgres/integration_test/main.go b/sources/postgres/integration_test/main.go index 34d3142a..2acaf228 100644 --- a/sources/postgres/integration_test/main.go +++ b/sources/postgres/integration_test/main.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "math/rand/v2" + "strings" _ "github.com/jackc/pgx/v5/stdlib" @@ -47,18 +48,32 @@ func rawMessageTimestamp(message lib.RawMessage) int64 { } func checkDifference(name, expected, actual string) bool { - if expected != actual { - fmt.Printf("Expected %s:\n", name) - fmt.Println("--------------------------------------------------------------------------------") - fmt.Println(expected) - fmt.Println("--------------------------------------------------------------------------------") - fmt.Printf("Actual %s:\n", name) - fmt.Println("--------------------------------------------------------------------------------") - fmt.Println(actual) - fmt.Println("--------------------------------------------------------------------------------") - return true + if expected == actual { + return false } - return false + expectedLines := strings.Split(expected, "\n") + actualLines := strings.Split(actual, "\n") + fmt.Printf("Expected %s:\n", name) + fmt.Println("--------------------------------------------------------------------------------") + for i, line := range expectedLines { + prefix := " " + if i >= len(actualLines) || line != actualLines[i] { + prefix = ">" + } + fmt.Println(prefix + line) + } + fmt.Println("--------------------------------------------------------------------------------") + fmt.Printf("Actual %s:\n", name) + fmt.Println("--------------------------------------------------------------------------------") + for i, line := range actualLines { + prefix := " " + if i >= len(expectedLines) || line != expectedLines[i] { + prefix = ">" + } + fmt.Println(prefix + line) + } + fmt.Println("--------------------------------------------------------------------------------") + return true } func readTable(db *sql.DB, tableName string) ([]lib.RawMessage, error) { @@ -133,8 +148,10 @@ CREATE TABLE %s ( -- c_txid_snapshot txid_snapshot, c_uuid uuid, c_xml xml, - -- Additional types - c_hstore hstore + -- User defined + c_hstore hstore, + c_geometry geometry, + c_geography geography(Point) ) ` @@ -225,7 +242,11 @@ INSERT INTO %s VALUES ( -- c_xml 'HelloWorld', -- c_hstore - '"a" => "b", "c" => "d", "e" => "f"' + '"a" => "b", "c" => "d", "e" => "f"', + -- c_geometry + 'POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))', + -- c_geography + 'POINT(-118.4079 33.9434)' ) ` @@ -492,6 +513,22 @@ const expectedPayloadTemplate = `{ "field": "c_hstore", "name": "", "parameters": null + }, + { + "type": "struct", + "optional": false, + "default": null, + "field": "c_geometry", + "name": "io.debezium.data.geometry.Geometry", + "parameters": null + }, + { + "type": "struct", + "optional": false, + "default": null, + "field": "c_geography", + "name": "io.debezium.data.geometry.Geography", + "parameters": null } ], "optional": false, @@ -511,6 +548,14 @@ const expectedPayloadTemplate = `{ "c_cidr": "192.168.100.128/25", "c_date": 18263, "c_double_precision": 123.456, + "c_geography": { + "srid": null, + "wkb": "AQEAACDmEAAAdQKaCBuaXcDwhclUwfhAQA==" + }, + "c_geometry": { + "srid": null, + "wkb": "AQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADwPwAAAAAAAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAAAAAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAA" + }, "c_hstore": { "a": "b", "c": "d", @@ -559,7 +604,12 @@ func testTypes(db *sql.DB) error { if err != nil { return fmt.Errorf("unable to create temporary table: %w", err) } - defer db.Exec(fmt.Sprintf("DROP TABLE %s", tempTableName)) + defer func() { + slog.Info("Dropping temporary table...", slog.String("table", tempTableName)) + if _, err := db.Exec(fmt.Sprintf("DROP TABLE %s", tempTableName)); err != nil { + slog.Error("Failed to drop table", slog.Any("err", err)) + } + }() slog.Info("Inserting data...") _, err = db.Exec(fmt.Sprintf(insertQuery, tempTableName))