Skip to content

Commit

Permalink
Upgrading Artie Transfer + using Transfer's kafkalib.NewConnection (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jul 12, 2024
1 parent 85fbcef commit 3960063
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 61 deletions.
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@ go 1.22.4

require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/artie-labs/transfer v1.25.32
github.com/artie-labs/transfer v1.25.39
github.com/aws/aws-sdk-go v1.44.327
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.19
github.com/cockroachdb/apd/v3 v3.2.1
github.com/getsentry/sentry-go v0.27.0
github.com/go-sql-driver/mysql v1.7.1
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.5.5
github.com/jackc/pgx/v5 v5.6.0
github.com/lmittmann/tint v1.0.4
github.com/mattn/go-isatty v0.0.20
github.com/microsoft/go-mssqldb v1.7.1
github.com/samber/slog-multi v1.0.2
github.com/samber/slog-sentry/v2 v2.5.0
github.com/segmentio/kafka-go v0.4.47
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0
github.com/stretchr/testify v1.9.0
go.mongodb.org/mongo-driver v1.15.0
gopkg.in/yaml.v3 v3.0.1
Expand All @@ -46,6 +44,7 @@ require (
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.19 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.18 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.59 // indirect
Expand Down Expand Up @@ -100,7 +99,6 @@ require (
github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/jwx v1.2.29 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
Expand All @@ -111,6 +109,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/samber/slog-common v0.16.0 // indirect
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/snowflakedb/gosnowflake v1.6.23-0.20230622060049-8cb65fd3db4a // indirect
github.com/twpayne/go-geom v1.5.3 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/artie-labs/transfer v1.25.32 h1:M2g+lvEPoPzdRLk559hU9coK/1LxckTSPQDPdXm0kbQ=
github.com/artie-labs/transfer v1.25.32/go.mod h1:pMn7/nkM2gQVw4rgjNIWIGUKtCkCO7yS6Y1IVDgrS3k=
github.com/artie-labs/transfer v1.25.39 h1:hFK1BhlXt4JCxcZF42MgLkpjjWW6wcSzrpKEly1qeOA=
github.com/artie-labs/transfer v1.25.39/go.mod h1:BClIu43kgZgqyx0Rq/vrm9EP7tdz61ou9a3adHrzcOc=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY=
github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
Expand Down Expand Up @@ -314,8 +314,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
Expand Down
64 changes: 12 additions & 52 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@ package kafkalib

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"log/slog"
"time"

awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/artie-labs/transfer/lib/jitter"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/size"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
"github.com/segmentio/kafka-go/sasl/scram"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/transfer/lib/jitter"
"github.com/artie-labs/transfer/lib/size"
)

const (
Expand All @@ -27,62 +24,25 @@ const (

func newWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) {
slog.Info("Setting kafka bootstrap URLs", slog.Any("urls", cfg.BootstrapAddresses()))
kafkaConn := kafkalib.NewConnection(cfg.AwsEnabled, cfg.DisableTLS, cfg.Username, cfg.Password)
transport, err := kafkaConn.Transport(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create kafka transport: %w", err)
}

writer := &kafka.Writer{
Addr: kafka.TCP(cfg.BootstrapAddresses()...),
Compression: kafka.Gzip,
AllowAutoTopicCreation: true,
Balancer: &kafka.LeastBytes{},
Compression: kafka.Gzip,
Transport: transport,
WriteTimeout: 5 * time.Second,
AllowAutoTopicCreation: true,
}

if cfg.MaxRequestSize > 0 {
writer.BatchBytes = int64(cfg.MaxRequestSize)
}

switch cfg.Mechanism() {
case config.AwsMskIam:
// If using AWS MSK IAM, we expect this to be set in the ENV VAR
// (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.)
saslCfg, err := awsCfg.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load AWS configuration: %w", err)
}

transport := &kafka.Transport{
DialTimeout: 10 * time.Second,
SASL: aws_msk_iam_v2.NewMechanism(saslCfg),
// Enable TLS by default
TLS: &tls.Config{},
}

if cfg.DisableTLS {
transport.TLS = nil
}

writer.Transport = transport
case config.ScramSha512:
mechanism, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password)
if err != nil {
return nil, fmt.Errorf("failed to create scram mechanism: %w", err)
}

transport := &kafka.Transport{
DialTimeout: 10 * time.Second,
SASL: mechanism,
TLS: &tls.Config{},
}

if cfg.DisableTLS {
transport.TLS = nil
}

writer.Transport = transport
case config.None:
// No mechanism
default:
return nil, fmt.Errorf("unsupported kafka mechanism: %s", cfg.Mechanism())
}

return writer, nil
}

Expand Down

0 comments on commit 3960063

Please sign in to comment.