From 5a82f8b4045a3b54cd14a0c2872c3b85e4745c69 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Fri, 1 Jul 2022 16:51:23 +0100 Subject: [PATCH] Add ability to configure TLS for Kafka and HTTP --- .gitignore | 3 + Makefile | 3 - cmd/cli/cli.go | 6 + config/component_test.go | 43 ++++--- config/config_test.go | 1 + .../failure-targets/http-extended.hcl | 8 +- .../failure-targets/kafka-extended.hcl | 6 +- config/examples/targets/http-extended.hcl | 8 +- config/examples/targets/kafka-extended.hcl | 6 +- config/test-fixtures/target-http-extended.hcl | 6 +- .../test-fixtures/target-kafka-extended.hcl | 6 +- integration/http/localhost.key | 2 +- integration/http/rootCA.crt | 2 +- pkg/common/helpers.go | 118 ++++++++++++++++-- pkg/common/helpers_test.go | 86 ++++++++++++- pkg/target/http.go | 27 ++-- pkg/target/http_test.go | 38 +++++- pkg/target/kafka.go | 16 ++- pkg/target/targetutil.go | 42 ------- 19 files changed, 310 insertions(+), 117 deletions(-) delete mode 100644 pkg/target/targetutil.go diff --git a/.gitignore b/.gitignore index 21bf590a..c414211c 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ vendor/ build/ dist/ .localstack/ + +#temporary directory created by tests +tmp_replicator/ \ No newline at end of file diff --git a/Makefile b/Makefile index 6c132d60..6bf6cc67 100644 --- a/Makefile +++ b/Makefile @@ -11,8 +11,6 @@ go_dirs = `go list ./... | grep -v /build/ | grep -v /vendor/` build_dir = build vendor_dir = vendor integration_dir = integration -cert_dir = $(integration_dir)/http -abs_cert_dir = $$(pwd)/$(cert_dir) ngrok_path = ${NGROK_DIR}ngrok # Set NGROK_DIR to `/path/to/directory/` for local setup coverage_dir = $(build_dir)/coverage @@ -144,7 +142,6 @@ test: test-setup GO111MODULE=on go tool cover -func=$(coverage_out) integration-test: test-setup - export CERT_DIR=$(abs_cert_dir); \ GO111MODULE=on go test $(go_dirs) -v -covermode=count -coverprofile=$(coverage_out) GO111MODULE=on go tool cover -html=$(coverage_out) -o $(coverage_html) GO111MODULE=on go tool cover -func=$(coverage_out) diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index 5608195f..08036017 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -22,6 +22,7 @@ import ( _ "net/http/pprof" "github.com/snowplow-devops/stream-replicator/cmd" + "github.com/snowplow-devops/stream-replicator/pkg/common" "github.com/snowplow-devops/stream-replicator/pkg/failure/failureiface" "github.com/snowplow-devops/stream-replicator/pkg/models" "github.com/snowplow-devops/stream-replicator/pkg/observer" @@ -118,6 +119,11 @@ func RunCli(supportedSourceConfigPairs []sourceconfig.ConfigPair) { stop <- struct{}{} }() + err := common.DeleteTemporaryDir() + if err != nil { + log.Debugf(`error deleting tmp directory: %v`, err) + } + select { case <-stop: log.Debug("source.Stop() finished successfully!") diff --git a/config/component_test.go b/config/component_test.go index f9515d12..c76adb30 100644 --- a/config/component_test.go +++ b/config/component_test.go @@ -8,6 +8,7 @@ package config import ( "errors" + "os" "path/filepath" "reflect" "testing" @@ -19,6 +20,10 @@ import ( "github.com/snowplow-devops/stream-replicator/pkg/target" ) +func init() { + os.Clearenv() +} + func TestCreateTargetComponentHCL(t *testing.T) { testCases := []struct { File string @@ -73,9 +78,9 @@ func TestCreateTargetComponentHCL(t *testing.T) { Headers: "", BasicAuthUsername: "", BasicAuthPassword: "", - CertFile: "", - KeyFile: "", - CaFile: "", + TLSCert: "", + TLSKey: "", + TLSCa: "", SkipVerifyTLS: false, }, }, @@ -90,9 +95,9 @@ func TestCreateTargetComponentHCL(t *testing.T) { Headers: "{\"Accept-Language\":\"en-US\"}", BasicAuthUsername: "testUsername", BasicAuthPassword: "testPass", - CertFile: "test.cert", - KeyFile: "test.key", - CaFile: "test.ca", + TLSCert: "dGVzdC5jZXJ0", + TLSKey: "dGVzdC5rZXkK", + TLSCa: "dGVzdC5jYQ==", SkipVerifyTLS: true, }, }, @@ -112,9 +117,9 @@ func TestCreateTargetComponentHCL(t *testing.T) { SASLUsername: "", SASLPassword: "", SASLAlgorithm: "sha512", - CertFile: "", - KeyFile: "", - CaFile: "", + TLSCert: "", + TLSKey: "", + TLSCa: "", SkipVerifyTLS: false, ForceSync: false, FlushFrequency: 0, @@ -138,9 +143,9 @@ func TestCreateTargetComponentHCL(t *testing.T) { SASLUsername: "testUsername", SASLPassword: "testPass", SASLAlgorithm: "sha256", - CertFile: "test.cert", - KeyFile: "test.key", - CaFile: "test.ca", + TLSCert: "dGVzdC5jZXJ0", + TLSKey: "dGVzdC5rZXkK", + TLSCa: "dGVzdC5jYQ==", SkipVerifyTLS: true, ForceSync: true, FlushFrequency: 2, @@ -217,9 +222,9 @@ func TestCreateFailureTargetComponentENV(t *testing.T) { SASLUsername: "testUsername", SASLPassword: "testPass", SASLAlgorithm: "sha256", - CertFile: "test.cert", - KeyFile: "test.key", - CaFile: "test.ca", + TLSCert: "dGVzdA==", + TLSKey: "dGVzdA==", + TLSCa: "dGVzdA==", SkipVerifyTLS: true, ForceSync: true, FlushFrequency: 2, @@ -230,7 +235,7 @@ func TestCreateFailureTargetComponentENV(t *testing.T) { t.Run(testCase.Name, func(t *testing.T) { assert := assert.New(t) - + t.Setenv("STREAM_REPLICATOR_CONFIG_FILE", "") t.Setenv("FAILURE_TARGET_NAME", "kafka") t.Setenv("FAILURE_TARGET_KAFKA_BROKERS", "testBrokers") t.Setenv("FAILURE_TARGET_KAFKA_TOPIC_NAME", "testTopic") @@ -244,9 +249,9 @@ func TestCreateFailureTargetComponentENV(t *testing.T) { t.Setenv("FAILURE_TARGET_KAFKA_SASL_USERNAME", "testUsername") t.Setenv("FAILURE_TARGET_KAFKA_SASL_PASSWORD", "testPass") t.Setenv("FAILURE_TARGET_KAFKA_SASL_ALGORITHM", "sha256") - t.Setenv("FAILURE_TARGET_KAFKA_TLS_CERT_FILE", "test.cert") - t.Setenv("FAILURE_TARGET_KAFKA_TLS_KEY_FILE", "test.key") - t.Setenv("FAILURE_TARGET_KAFKA_TLS_CA_FILE", "test.ca") + t.Setenv("FAILURE_TARGET_KAFKA_TLS_CERT_B64", "dGVzdA==") + t.Setenv("FAILURE_TARGET_KAFKA_TLS_KEY_B64", "dGVzdA==") + t.Setenv("FAILURE_TARGET_KAFKA_TLS_CA_B64", "dGVzdA==") t.Setenv("FAILURE_TARGET_KAFKA_TLS_SKIP_VERIFY_TLS", "true") t.Setenv("FAILURE_TARGET_KAFKA_FORCE_SYNC_PRODUCER", "true") t.Setenv("FAILURE_TARGET_KAFKA_FLUSH_FREQUENCY", "2") diff --git a/config/config_test.go b/config/config_test.go index c42d2f32..ec242f8e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -44,6 +44,7 @@ func TestNewConfig(t *testing.T) { observer, err := c.GetObserver(map[string]string{}) assert.NotNil(observer) assert.Nil(err) + os.RemoveAll(`tmp_replicator`) } func TestNewConfig_FromEnv(t *testing.T) { diff --git a/config/examples/failure-targets/http-extended.hcl b/config/examples/failure-targets/http-extended.hcl index 68f319f4..9f267f6a 100644 --- a/config/examples/failure-targets/http-extended.hcl +++ b/config/examples/failure-targets/http-extended.hcl @@ -27,16 +27,16 @@ failure_target { basic_auth_password = env.MY_AUTH_PASSWORD # The optional certificate file for client authentication - cert_file = "myLocalhost.crt" + tls_cert = "dGVzdCBzdHJpbmc=" # The optional key file for client authentication - key_file = "MyLocalhost.key" + tls_key = "c29tZSBzdHJpbmc=" # The optional certificate authority file for TLS client authentication - ca_file = "myRootCA.crt" + tls_ca = "b3RoZXIgc3RyaW5ncw==" # Whether to skip verifying ssl certificates chain (default: false) - # If cert_file and key_file are not provided, this setting is not applied. + # If tls_cert and tls_key are not provided, this setting is not applied. skip_verify_tls = true } } diff --git a/config/examples/failure-targets/kafka-extended.hcl b/config/examples/failure-targets/kafka-extended.hcl index 4ef2ba60..12c93e26 100644 --- a/config/examples/failure-targets/kafka-extended.hcl +++ b/config/examples/failure-targets/kafka-extended.hcl @@ -39,13 +39,13 @@ failure_target { sasl_algorithm = "sha256" # The optional certificate file for client authentication - cert_file = "myLocalhost.crt" + tls_cert = "dGVzdCBzdHJpbmc=" # The optional key file for client authentication - key_file = "MyLocalhost.key" + tls_key = "c29tZSBzdHJpbmc=" # The optional certificate authority file for TLS client authentication - ca_file = "myRootCA.crt" + tls_ca = "b3RoZXIgc3RyaW5ncw==" # Whether to skip verifying ssl certificates chain (default: false) skip_verify_tls = true diff --git a/config/examples/targets/http-extended.hcl b/config/examples/targets/http-extended.hcl index a70ca33e..a02be9b6 100644 --- a/config/examples/targets/http-extended.hcl +++ b/config/examples/targets/http-extended.hcl @@ -27,16 +27,16 @@ target { basic_auth_password = env.MY_AUTH_PASSWORD # The optional certificate file for client authentication - cert_file = "myLocalhost.crt" + tls_cert = "dGVzdCBzdHJpbmc=" # The optional key file for client authentication - key_file = "MyLocalhost.key" + tls_key = "c29tZSBzdHJpbmc=" # The optional certificate authority file for TLS client authentication - ca_file = "myRootCA.crt" + tls_ca = "b3RoZXIgc3RyaW5ncw==" # Whether to skip verifying ssl certificates chain (default: false) - # If cert_file and key_file are not provided, this setting is not applied. + # If tls_cert and tls_key are not provided, this setting is not applied. skip_verify_tls = true } } diff --git a/config/examples/targets/kafka-extended.hcl b/config/examples/targets/kafka-extended.hcl index 16bdcc9c..8e699d67 100644 --- a/config/examples/targets/kafka-extended.hcl +++ b/config/examples/targets/kafka-extended.hcl @@ -39,13 +39,13 @@ target { sasl_algorithm = "sha256" # The optional certificate file for client authentication - cert_file = "myLocalhost.crt" + tls_cert = "dGVzdCBzdHJpbmc=" # The optional key file for client authentication - key_file = "MyLocalhost.key" + tls_key = "c29tZSBzdHJpbmc=" # The optional certificate authority file for TLS client authentication - ca_file = "myRootCA.crt" + tls_ca = "b3RoZXIgc3RyaW5ncw==" # Whether to skip verifying ssl certificates chain (default: false) skip_verify_tls = true diff --git a/config/test-fixtures/target-http-extended.hcl b/config/test-fixtures/target-http-extended.hcl index 7dfadbf4..9f6ee249 100644 --- a/config/test-fixtures/target-http-extended.hcl +++ b/config/test-fixtures/target-http-extended.hcl @@ -9,9 +9,9 @@ target { headers = "{\"Accept-Language\":\"en-US\"}" basic_auth_username = "testUsername" basic_auth_password = "testPass" - cert_file = "test.cert" - key_file = "test.key" - ca_file = "test.ca" + tls_cert = "dGVzdC5jZXJ0" + tls_key = "dGVzdC5rZXkK" + tls_ca = "dGVzdC5jYQ==" skip_verify_tls = true } } diff --git a/config/test-fixtures/target-kafka-extended.hcl b/config/test-fixtures/target-kafka-extended.hcl index d0faa9f8..e44b8f18 100644 --- a/config/test-fixtures/target-kafka-extended.hcl +++ b/config/test-fixtures/target-kafka-extended.hcl @@ -14,9 +14,9 @@ target { sasl_username = "testUsername" sasl_password = "testPass" sasl_algorithm = "sha256" - cert_file = "test.cert" - key_file = "test.key" - ca_file = "test.ca" + tls_cert = "dGVzdC5jZXJ0" + tls_key = "dGVzdC5rZXkK" + tls_ca = "dGVzdC5jYQ==" skip_verify_tls = true force_sync_producer = true flush_frequency = 2 diff --git a/integration/http/localhost.key b/integration/http/localhost.key index 93424361..11dec0e6 100644 --- a/integration/http/localhost.key +++ b/integration/http/localhost.key @@ -24,4 +24,4 @@ itxdXvoomlhwDKZv0Y+vPm4V9SBx/36ubf6bM6vKoZTSuv2+ktA/uInFW+y/1mLH KD1JlQKBgQDNZry00fN3iJ9stUNEYVaAtXQ1a0/LY/r2NuC04IwemwOyFUvzY7G9 sNXeIxTYjQ9OCp9+EE1n6Q3yg63MmTrNuD51f0h2tftokYBaoYBny34HuQf0N7qF laOI6yiORZ4eGdYrpCq+q+J0fAkRca0M4Nq/lDEw4bric38WpPxV3Q== ------END RSA PRIVATE KEY----- +-----END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/integration/http/rootCA.crt b/integration/http/rootCA.crt index 79bbfc13..df8b1a27 100644 --- a/integration/http/rootCA.crt +++ b/integration/http/rootCA.crt @@ -27,4 +27,4 @@ Ggg1Qo5z0+XT2l+2KhOC02ydgHV1/tT6cVVX3ZkBvvb/WPHmVp9bT8zqeJzrMQkM 9DaKEyZKw+LYy7sZp4p4giE/JAzBLidsfIdznhYguPjKgboPMfiJvapzyZPEJsDu ShYb5uIlytHwAVlGiUgjx+z/YXBQN1vWsCm5pVL4RGdXdcq5HZzZRaJxAUBrfmiU uCJPEnUJ1emIqakgSy3yA+9WtQ== ------END CERTIFICATE----- +-----END CERTIFICATE----- \ No newline at end of file diff --git a/pkg/common/helpers.go b/pkg/common/helpers.go index 0c44df1a..05a7068f 100644 --- a/pkg/common/helpers.go +++ b/pkg/common/helpers.go @@ -7,8 +7,11 @@ package common import ( + "crypto/tls" + "crypto/x509" "encoding/base64" "fmt" + "io/ioutil" "os" "time" @@ -26,25 +29,47 @@ import ( // and attempt to create a JSON file on disk within the /tmp directory // for later use in creating GCP clients. func GetGCPServiceAccountFromBase64(serviceAccountB64 string) (string, error) { - sDec, err := base64.StdEncoding.DecodeString(serviceAccountB64) + targetFile := fmt.Sprintf(`tmp_replicator/stream-replicator-service-account-%s.json`, uuid.NewV4().String()) + err := DecodeB64ToFile(serviceAccountB64, targetFile) if err != nil { - return "", errors.Wrap(err, "Failed to Base64 decode service account") + return ``, err } + return targetFile, nil +} - targetFile := fmt.Sprintf("/tmp/stream-replicator-service-account-%s.json", uuid.NewV4().String()) +// DeleteTemporaryDir deletes the temp directory we created to store credentials +func DeleteTemporaryDir() error { + err := os.RemoveAll(`tmp_replicator`) + return err +} + +// DecodeB64ToFile takes a B64-encoded credential, decodes it, and writes it to a file +func DecodeB64ToFile(b64String, filename string) error { + tls, decodeErr := base64.StdEncoding.DecodeString(b64String) + if decodeErr != nil { + return errors.Wrap(decodeErr, "Failed to Base64 decode for creating file "+filename) + } - f, err := os.Create(targetFile) + err := createTempDir(`tmp_replicator`) if err != nil { - return "", errors.Wrap(err, fmt.Sprintf("Failed to create target file '%s' for service account", targetFile)) + return err } - defer f.Close() - _, err2 := f.WriteString(string(sDec)) - if err2 != nil { - return "", errors.Wrap(err, fmt.Sprintf("Failed to write decoded service account to target file '%s'", targetFile)) + f, createErr := os.Create(filename) + if createErr != nil { + return errors.Wrap(createErr, fmt.Sprintf("Failed to create file '%s'", filename)) } - return targetFile, nil + _, writeErr := f.WriteString(string(tls)) + if writeErr != nil { + return errors.Wrap(decodeErr, fmt.Sprintf("Failed to write decoded base64 string to target file '%s'", filename)) + } + err = f.Close() + if err != nil { + return err + } + + return nil } // GetAWSSession is a general tool to handle generating an AWS session @@ -87,3 +112,76 @@ func GetAverageFromDuration(sum time.Duration, total int64) time.Duration { } return time.Duration(0) } + +func createTempDir(dirName string) error { + dir, statErr := os.Stat(dirName) + if statErr != nil && !errors.Is(statErr, os.ErrNotExist) { + return errors.Wrap(statErr, fmt.Sprintf("Failed checking for existence of %s dir", dirName)) + } + + if dir == nil { + dirErr := os.Mkdir(dirName, 0700) + if dirErr != nil && !errors.Is(dirErr, os.ErrExist) { + return errors.Wrap(dirErr, fmt.Sprintf("Failed to create %s directory", dirName)) + } + } + return nil +} + +// CreateTLSConfiguration creates a TLS configuration for use in a target +func CreateTLSConfiguration(crt, key, ca, destination string, skipVerify bool) (*tls.Config, error) { + if crt == `` || key == `` || ca == `` { + return nil, nil + } + tlsStrings := map[string]string{ + `.crt`: crt, + `.key`: key, + `_ca.crt`: ca, + } + // try to create /tmp_replicator/tls directory + err := createTempDir(`tmp_replicator`) + if err != nil { + return nil, err + } + + err = createTempDir(`tmp_replicator/tls`) + if err != nil { + return nil, err + } + + // create destination directory + err = os.Mkdir(`tmp_replicator/tls/`+destination, 0700) + if err != nil { + return nil, err + } + for key, tlsString := range tlsStrings { + err := DecodeB64ToFile(tlsString, fmt.Sprintf(`tmp_replicator/tls/%s/%s%s`, destination, destination, key)) + if err != nil { + return nil, err + } + } + + cert, err := tls.LoadX509KeyPair( + fmt.Sprintf(`tmp_replicator/tls/%s/%s.crt`, destination, destination), + fmt.Sprintf(`tmp_replicator/tls/%s/%s.key`, destination, destination)) + if err != nil { + return nil, err + } + + caCert, err := ioutil.ReadFile(fmt.Sprintf(`tmp_replicator/tls/%s/%s_ca.crt`, destination, destination)) + if err != nil { + return nil, err + } + + caCertPool, err := x509.SystemCertPool() + if err != nil { + return nil, err + } + caCertPool.AppendCertsFromPEM(caCert) + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: skipVerify, + }, nil +} diff --git a/pkg/common/helpers_test.go b/pkg/common/helpers_test.go index ee06b640..fa68a331 100644 --- a/pkg/common/helpers_test.go +++ b/pkg/common/helpers_test.go @@ -7,6 +7,10 @@ package common import ( + "bytes" + "encoding/base64" + "io/ioutil" + "os" "strings" "testing" "time" @@ -14,16 +18,21 @@ import ( "github.com/stretchr/testify/assert" ) +func init() { + os.Clearenv() +} + // --- Cloud Helpers func TestGetGCPServiceAccountFromBase64(t *testing.T) { assert := assert.New(t) + defer DeleteTemporaryDir() path, err := GetGCPServiceAccountFromBase64("ewogICJoZWxsbyI6IndvcmxkIgp9") assert.NotEqual(path, "") assert.Nil(err) - assert.True(strings.HasPrefix(path, "/tmp/stream-replicator-service-account-")) + assert.True(strings.HasPrefix(path, "tmp_replicator/stream-replicator-service-account-")) assert.True(strings.HasSuffix(path, ".json")) } @@ -34,12 +43,13 @@ func TestGetGCPServiceAccountFromBase64_NotBase64(t *testing.T) { assert.Equal(path, "") assert.NotNil(err) - assert.True(strings.HasPrefix(err.Error(), "Failed to Base64 decode service account: ")) + assert.True(strings.HasPrefix(err.Error(), "Failed to Base64 decode")) } func TestGetAWSSession(t *testing.T) { assert := assert.New(t) + t.Setenv("AWS_SHARED_CREDENTIALS_FILE", "") sess, cfg, accID, err := GetAWSSession("us-east-1", "") assert.NotNil(sess) assert.Nil(cfg) @@ -64,3 +74,75 @@ func TestGetAverageFromDuration(t *testing.T) { duration2 := GetAverageFromDuration(time.Duration(10)*time.Second, 2) assert.Equal(time.Duration(5)*time.Second, duration2) } + +func TestCreateTLSConfiguration(t *testing.T) { + assert := assert.New(t) + crt, err := os.ReadFile(`../../integration/http/localhost.crt`) + if err != nil { + return + } + encodedCrt := base64.StdEncoding.EncodeToString(crt) + key, err := os.ReadFile(`../../integration/http/localhost.key`) + if err != nil { + return + } + encodedKey := base64.StdEncoding.EncodeToString(key) + ca, err := os.ReadFile(`../../integration/http/rootCA.crt`) + if err != nil { + return + } + encodedCa := base64.StdEncoding.EncodeToString(ca) + _, err = CreateTLSConfiguration(encodedCrt, encodedKey, encodedCa, `kafka`, false) + files, readErr := ioutil.ReadDir("./tmp_replicator/tls/kafka") + if readErr != nil { + return + } + + assert.Nil(err) + assert.Equal(len(files), 3) + assert.Equal(files[0].Name(), `kafka.crt`) + f, err := os.ReadFile(`tmp_replicator/tls/kafka/kafka.crt`) + + assert.Nil(err) + assert.True(bytes.Equal(f, crt)) + + f, err = os.ReadFile(`tmp_replicator/tls/kafka/kafka.key`) + + assert.Nil(err) + assert.True(bytes.Equal(f, key)) + + f, err = os.ReadFile(`tmp_replicator/tls/kafka/kafka_ca.crt`) + + assert.Nil(err) + assert.True(bytes.Equal(f, ca)) + os.RemoveAll(`tmp_replicator`) +} + +func TestCreateTLSConfiguration_DirExists(t *testing.T) { + os.MkdirAll(`tmp_replicator/tls/kafka`, 0777) + assert := assert.New(t) + _, err := CreateTLSConfiguration("dGVzdA==", "dGVzdA==", "dGVzdA==", `kafka`, false) + files, readErr := ioutil.ReadDir("./tmp_replicator/tls/kafka") + if readErr != nil { + return + } + + assert.Error(err) + assert.Equal(len(files), 0) + + os.RemoveAll(`tmp_replicator`) +} + +func TestCreateTLSConfiguration_NotB64(t *testing.T) { + assert := assert.New(t) + _, err := CreateTLSConfiguration("helloworld", "helloworld", "helloworld", `kafka`, false) + files, readErr := ioutil.ReadDir("./tmp_replicator/tls/kafka") + if readErr != nil { + return + } + + assert.True(strings.HasPrefix(err.Error(), `Failed to Base64 decode for creating file `)) + assert.Equal(len(files), 0) + + os.RemoveAll(`tmp_replicator`) +} diff --git a/pkg/target/http.go b/pkg/target/http.go index 98c33937..523f0f1c 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -10,6 +10,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/snowplow-devops/stream-replicator/pkg/common" "io/ioutil" "net/http" "net/url" @@ -18,9 +19,14 @@ import ( "github.com/hashicorp/go-multierror" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/snowplow-devops/stream-replicator/pkg/models" ) +const ( + httpTarget = `http_target` +) + // HTTPTargetConfig configures the destination for records consumed type HTTPTargetConfig struct { HTTPURL string `hcl:"url" env:"TARGET_HTTP_URL"` @@ -30,9 +36,9 @@ type HTTPTargetConfig struct { Headers string `hcl:"headers,optional" env:"TARGET_HTTP_HEADERS" ` BasicAuthUsername string `hcl:"basic_auth_username,optional" env:"TARGET_HTTP_BASICAUTH_USERNAME"` BasicAuthPassword string `hcl:"basic_auth_password,optional" env:"TARGET_HTTP_BASICAUTH_PASSWORD"` - CertFile string `hcl:"cert_file,optional" env:"TARGET_HTTP_TLS_CERT_FILE"` - KeyFile string `hcl:"key_file,optional" env:"TARGET_HTTP_TLS_KEY_FILE"` - CaFile string `hcl:"ca_file,optional" env:"TARGET_HTTP_TLS_CA_FILE"` + TLSCert string `hcl:"tls_cert,optional" env:"TARGET_HTTP_TLS_CERT_B64"` + TLSKey string `hcl:"tls_key,optional" env:"TARGET_HTTP_TLS_KEY_B64"` + TLSCa string `hcl:"tls_ca,optional" env:"TARGET_HTTP_TLS_CA_B64"` SkipVerifyTLS bool `hcl:"skip_verify_tls,optional" env:"TARGET_HTTP_TLS_SKIP_VERIFY_TLS"` // false } @@ -96,11 +102,16 @@ func NewHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentTyp if err1 != nil { return nil, err1 } - tlsConfig, err2 := CreateTLSConfiguration(certFile, keyFile, caFile, skipVerifyTLS) + transport := &http.Transport{} + + tlsConfig, err2 := common.CreateTLSConfiguration(certFile, keyFile, caFile, httpTarget, skipVerifyTLS) if err2 != nil { return nil, err2 } - transport := &http.Transport{TLSClientConfig: tlsConfig} + if tlsConfig != nil { + transport.TLSClientConfig = tlsConfig + } + return &HTTPTarget{ client: &http.Client{ Transport: transport, @@ -126,9 +137,9 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) { c.Headers, c.BasicAuthUsername, c.BasicAuthPassword, - c.CertFile, - c.KeyFile, - c.CaFile, + c.TLSCert, + c.TLSKey, + c.TLSCa, c.SkipVerifyTLS, ) } diff --git a/pkg/target/http_test.go b/pkg/target/http_test.go index 6a5ce820..4a252f97 100644 --- a/pkg/target/http_test.go +++ b/pkg/target/http_test.go @@ -8,6 +8,7 @@ package target import ( "bytes" + "encoding/base64" "encoding/json" "io/ioutil" "net/http" @@ -287,6 +288,24 @@ func TestHttpWrite_TLS(t *testing.T) { } assert := assert.New(t) + defer os.RemoveAll(`tmp_replicator`) + + crt, err := os.ReadFile(`../../integration/http/localhost.crt`) + if err != nil { + return + } + encodedCrt := base64.StdEncoding.EncodeToString(crt) + key, err := os.ReadFile(`../../integration/http/localhost.key`) + if err != nil { + return + } + encodedKey := base64.StdEncoding.EncodeToString(key) + ca, err := os.ReadFile(`../../integration/http/rootCA.crt`) + if err != nil { + return + } + encodedCa := base64.StdEncoding.EncodeToString(ca) + // Test that https requests work with manually provided certs target, err := NewHTTPTarget("https://localhost:8999/hello", 5, @@ -295,9 +314,9 @@ func TestHttpWrite_TLS(t *testing.T) { "", "", "", - os.Getenv("CERT_DIR")+"/localhost.crt", - os.Getenv("CERT_DIR")+"/localhost.key", - os.Getenv("CERT_DIR")+"/rootCA.crt", + string(encodedCrt), + string(encodedKey), + string(encodedCa), false) if err != nil { panic(err) @@ -319,6 +338,8 @@ func TestHttpWrite_TLS(t *testing.T) { ngrokAddress := getNgrokAddress() + "/hello" + os.RemoveAll(`tmp_replicator`) + // Test that https requests work for different endpoints when different certs are provided manually target2, err2 := NewHTTPTarget(ngrokAddress, 5, @@ -327,11 +348,12 @@ func TestHttpWrite_TLS(t *testing.T) { "", "", "", - os.Getenv("CERT_DIR")+"/localhost.crt", - os.Getenv("CERT_DIR")+"/localhost.key", - os.Getenv("CERT_DIR")+"/rootCA.crt", + string(encodedCrt), + string(encodedKey), + string(encodedCa), false) if err2 != nil { + os.RemoveAll(`tmp_replicator`) panic(err2) } @@ -342,6 +364,8 @@ func TestHttpWrite_TLS(t *testing.T) { assert.Equal(int64(20), ackOps) + os.RemoveAll(`tmp_replicator`) + // Test that https works when certs aren't manually provided // Test that https requests work for different endpoints when different certs are provided manually @@ -357,6 +381,7 @@ func TestHttpWrite_TLS(t *testing.T) { "", false) if err4 != nil { + os.RemoveAll(`tmp_replicator`) panic(err4) } @@ -366,6 +391,7 @@ func TestHttpWrite_TLS(t *testing.T) { assert.Equal(10, len(writeResult3.Sent)) assert.Equal(int64(30), ackOps) + os.RemoveAll(`tmp_replicator`) } type ngrokAPIObject struct { diff --git a/pkg/target/kafka.go b/pkg/target/kafka.go index 61247151..af2e7b91 100644 --- a/pkg/target/kafka.go +++ b/pkg/target/kafka.go @@ -10,6 +10,7 @@ import ( "crypto/sha256" "crypto/sha512" "fmt" + "github.com/snowplow-devops/stream-replicator/pkg/common" "hash" "strings" "time" @@ -18,8 +19,13 @@ import ( "github.com/hashicorp/go-multierror" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/snowplow-devops/stream-replicator/pkg/models" "github.com/xdg/scram" + + "github.com/snowplow-devops/stream-replicator/pkg/models" +) + +const ( + kafkaTarget = `kafka_target` ) // KafkaConfig contains configurable options for the kafka target @@ -36,9 +42,9 @@ type KafkaConfig struct { SASLUsername string `hcl:"sasl_username,optional" env:"TARGET_KAFKA_SASL_USERNAME" ` SASLPassword string `hcl:"sasl_password,optional" env:"TARGET_KAFKA_SASL_PASSWORD"` SASLAlgorithm string `hcl:"sasl_algorithm,optional" env:"TARGET_KAFKA_SASL_ALGORITHM"` - CertFile string `hcl:"cert_file,optional" env:"TARGET_KAFKA_TLS_CERT_FILE"` - KeyFile string `hcl:"key_file,optional" env:"TARGET_KAFKA_TLS_KEY_FILE"` - CaFile string `hcl:"ca_file,optional" env:"TARGET_KAFKA_TLS_CA_FILE"` + TLSCert string `hcl:"tls_cert,optional" env:"TARGET_KAFKA_TLS_CERT_B64"` + TLSKey string `hcl:"tls_key,optional" env:"TARGET_KAFKA_TLS_KEY_B64"` + TLSCa string `hcl:"tls_ca,optional" env:"TARGET_KAFKA_TLS_CA_B64"` SkipVerifyTLS bool `hcl:"skip_verify_tls,optional" env:"TARGET_KAFKA_TLS_SKIP_VERIFY_TLS"` ForceSync bool `hcl:"force_sync_producer,optional" env:"TARGET_KAFKA_FORCE_SYNC_PRODUCER"` FlushFrequency int `hcl:"flush_frequency,optional" env:"TARGET_KAFKA_FLUSH_FREQUENCY"` @@ -116,7 +122,7 @@ func NewKafkaTarget(cfg *KafkaConfig) (*KafkaTarget, error) { } } - tlsConfig, err := CreateTLSConfiguration(cfg.CertFile, cfg.KeyFile, cfg.CaFile, cfg.SkipVerifyTLS) + tlsConfig, err := common.CreateTLSConfiguration(cfg.TLSCert, cfg.TLSKey, cfg.TLSCa, kafkaTarget, cfg.SkipVerifyTLS) if err != nil { return nil, err } diff --git a/pkg/target/targetutil.go b/pkg/target/targetutil.go deleted file mode 100644 index 7b920ae1..00000000 --- a/pkg/target/targetutil.go +++ /dev/null @@ -1,42 +0,0 @@ -// PROPRIETARY AND CONFIDENTIAL -// -// Unauthorized copying of this file via any medium is strictly prohibited. -// -// Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved. - -package target - -import ( - "crypto/tls" - "crypto/x509" - "io/ioutil" -) - -// CreateTLSConfiguration creates a TLS configuration for use in a target -func CreateTLSConfiguration(certFile string, keyFile string, caFile string, skipVerify bool) (*tls.Config, error) { - if certFile == "" || keyFile == "" { - return nil, nil - } - - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, err - } - - caCert, err := ioutil.ReadFile(caFile) - if err != nil { - return nil, err - } - - caCertPool, err := x509.SystemCertPool() - if err != nil { - return nil, err - } - caCertPool.AppendCertsFromPEM(caCert) - - return &tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: caCertPool, - InsecureSkipVerify: skipVerify, - }, nil -}