diff --git a/go.mod b/go.mod index 745b1d27..d50af7af 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22 require ( github.com/DataDog/datadog-go/v5 v5.5.0 - github.com/artie-labs/transfer v1.25.23 + github.com/artie-labs/transfer v1.25.26 github.com/aws/aws-sdk-go v1.44.327 github.com/aws/aws-sdk-go-v2 v1.18.1 github.com/aws/aws-sdk-go-v2/config v1.18.19 diff --git a/go.sum b/go.sum index 5a6356d9..3b27dc84 100644 --- a/go.sum +++ b/go.sum @@ -95,8 +95,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo= github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q= -github.com/artie-labs/transfer v1.25.23 h1:9pL6wO/87H6vmFyN8NfOuLAP28w3CjmxeELIrvWKePk= -github.com/artie-labs/transfer v1.25.23/go.mod h1:PxZjjW1+OnZDgRRJwVXUoiGY2iPsLnY2TUMrdcY3zfg= +github.com/artie-labs/transfer v1.25.26 h1:lgNRw9Esx7iuxLL0kg2NIvm+Y1HH7sEpSgLdS4yjdmk= +github.com/artie-labs/transfer v1.25.26/go.mod h1:PxZjjW1+OnZDgRRJwVXUoiGY2iPsLnY2TUMrdcY3zfg= github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY= github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= diff --git a/writers/transfer/writer.go b/writers/transfer/writer.go index 0876e60b..57e3a5e7 100644 --- a/writers/transfer/writer.go +++ b/writers/transfer/writer.go @@ -7,6 +7,7 @@ import ( "log/slog" "time" + bqDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/mssql/dialect" "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/cdc/mongo" @@ -208,7 +209,7 @@ func (w *Writer) flush(reason string) error { tableData.InMemoryColumns().DeleteColumn(constants.DeleteColumnMarker) } - if err = w.destination.Append(tableData.TableData); err != nil { + if err = w.destination.Append(tableData.TableData, isBigQuery(w.destination)); err != nil { tags["what"] = "merge_fail" tags["retryable"] = fmt.Sprint(w.destination.IsRetryableError(err)) return fmt.Errorf("failed to append data to destination: %w", err) @@ -264,3 +265,13 @@ func isMicrosoftSQLServer(baseline destination.Baseline) bool { _, isOk = dwh.Dialect().(dialect.MSSQLDialect) return isOk } + +func isBigQuery(baseline destination.Baseline) bool { + dwh, isOk := baseline.(destination.DataWarehouse) + if !isOk { + return false + } + + _, isOk = dwh.Dialect().(bqDialect.BigQueryDialect) + return isOk +}