diff --git a/go.mod b/go.mod index c39adfbf..3d0c601b 100644 --- a/go.mod +++ b/go.mod @@ -4,22 +4,20 @@ go 1.22.4 require ( github.com/DataDog/datadog-go/v5 v5.5.0 - github.com/artie-labs/transfer v1.25.32 + github.com/artie-labs/transfer v1.25.39 github.com/aws/aws-sdk-go v1.44.327 github.com/aws/aws-sdk-go-v2 v1.18.1 - github.com/aws/aws-sdk-go-v2/config v1.18.19 github.com/cockroachdb/apd/v3 v3.2.1 github.com/getsentry/sentry-go v0.27.0 github.com/go-sql-driver/mysql v1.7.1 github.com/google/uuid v1.6.0 - github.com/jackc/pgx/v5 v5.5.5 + github.com/jackc/pgx/v5 v5.6.0 github.com/lmittmann/tint v1.0.4 github.com/mattn/go-isatty v0.0.20 github.com/microsoft/go-mssqldb v1.7.1 github.com/samber/slog-multi v1.0.2 github.com/samber/slog-sentry/v2 v2.5.0 github.com/segmentio/kafka-go v0.4.47 - github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0 github.com/stretchr/testify v1.9.0 go.mongodb.org/mongo-driver v1.15.0 gopkg.in/yaml.v3 v3.0.1 @@ -46,6 +44,7 @@ require ( github.com/apache/arrow/go/v15 v15.0.2 // indirect github.com/apache/thrift v0.17.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/config v1.18.19 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.13.18 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.1 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.59 // indirect @@ -100,7 +99,6 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/jwx v1.2.29 // indirect github.com/lestrrat-go/option v1.0.1 // indirect - github.com/lib/pq v1.10.9 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect @@ -111,6 +109,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/samber/lo v1.39.0 // indirect github.com/samber/slog-common v0.16.0 // indirect + github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/snowflakedb/gosnowflake v1.6.23-0.20230622060049-8cb65fd3db4a // indirect github.com/twpayne/go-geom v1.5.3 // indirect diff --git a/go.sum b/go.sum index 14d0595e..5c0524f4 100644 --- a/go.sum +++ b/go.sum @@ -95,8 +95,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo= github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q= -github.com/artie-labs/transfer v1.25.32 h1:M2g+lvEPoPzdRLk559hU9coK/1LxckTSPQDPdXm0kbQ= -github.com/artie-labs/transfer v1.25.32/go.mod h1:pMn7/nkM2gQVw4rgjNIWIGUKtCkCO7yS6Y1IVDgrS3k= +github.com/artie-labs/transfer v1.25.39 h1:hFK1BhlXt4JCxcZF42MgLkpjjWW6wcSzrpKEly1qeOA= +github.com/artie-labs/transfer v1.25.39/go.mod h1:BClIu43kgZgqyx0Rq/vrm9EP7tdz61ou9a3adHrzcOc= github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY= github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= @@ -314,8 +314,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= -github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 537a207a..cc312d1f 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -2,22 +2,19 @@ package kafkalib import ( "context" - "crypto/tls" "encoding/json" "fmt" "log/slog" "time" - awsCfg "github.com/aws/aws-sdk-go-v2/config" + "github.com/artie-labs/transfer/lib/jitter" + "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/artie-labs/transfer/lib/size" "github.com/segmentio/kafka-go" - "github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2" - "github.com/segmentio/kafka-go/sasl/scram" "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/mtr" - "github.com/artie-labs/transfer/lib/jitter" - "github.com/artie-labs/transfer/lib/size" ) const ( @@ -27,62 +24,25 @@ const ( func newWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) { slog.Info("Setting kafka bootstrap URLs", slog.Any("urls", cfg.BootstrapAddresses())) + kafkaConn := kafkalib.NewConnection(cfg.AwsEnabled, cfg.DisableTLS, cfg.Username, cfg.Password) + transport, err := kafkaConn.Transport(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create kafka transport: %w", err) + } + writer := &kafka.Writer{ Addr: kafka.TCP(cfg.BootstrapAddresses()...), - Compression: kafka.Gzip, + AllowAutoTopicCreation: true, Balancer: &kafka.LeastBytes{}, + Compression: kafka.Gzip, + Transport: transport, WriteTimeout: 5 * time.Second, - AllowAutoTopicCreation: true, } if cfg.MaxRequestSize > 0 { writer.BatchBytes = int64(cfg.MaxRequestSize) } - switch cfg.Mechanism() { - case config.AwsMskIam: - // If using AWS MSK IAM, we expect this to be set in the ENV VAR - // (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.) - saslCfg, err := awsCfg.LoadDefaultConfig(ctx) - if err != nil { - return nil, fmt.Errorf("failed to load AWS configuration: %w", err) - } - - transport := &kafka.Transport{ - DialTimeout: 10 * time.Second, - SASL: aws_msk_iam_v2.NewMechanism(saslCfg), - // Enable TLS by default - TLS: &tls.Config{}, - } - - if cfg.DisableTLS { - transport.TLS = nil - } - - writer.Transport = transport - case config.ScramSha512: - mechanism, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password) - if err != nil { - return nil, fmt.Errorf("failed to create scram mechanism: %w", err) - } - - transport := &kafka.Transport{ - DialTimeout: 10 * time.Second, - SASL: mechanism, - TLS: &tls.Config{}, - } - - if cfg.DisableTLS { - transport.TLS = nil - } - - writer.Transport = transport - case config.None: - // No mechanism - default: - return nil, fmt.Errorf("unsupported kafka mechanism: %s", cfg.Mechanism()) - } - return writer, nil }