Skip to content

Commit

Permalink
Clean up tls configuration (closes #177)
Browse files Browse the repository at this point in the history
  • Loading branch information
TiganeteaRobert authored and colmsnowplow committed Aug 12, 2022
1 parent 3ae1054 commit 1a410fb
Show file tree
Hide file tree
Showing 18 changed files with 180 additions and 93 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ vendor/
build/
dist/
.localstack/

#temporary directory created by tests
tmp_replicator/
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ go_dirs = `go list ./... | grep -v /build/ | grep -v /vendor/`
build_dir = build
vendor_dir = vendor
integration_dir = integration
cert_dir = $(integration_dir)/http
abs_cert_dir = $$(pwd)/$(cert_dir)
ngrok_path = ${NGROK_DIR}ngrok # Set NGROK_DIR to `/path/to/directory/` for local setup

coverage_dir = $(build_dir)/coverage
Expand Down Expand Up @@ -144,7 +142,6 @@ test: test-setup
GO111MODULE=on go tool cover -func=$(coverage_out)

integration-test: test-setup
export CERT_DIR=$(abs_cert_dir); \
GO111MODULE=on go test $(go_dirs) -v -covermode=count -coverprofile=$(coverage_out)
GO111MODULE=on go tool cover -html=$(coverage_out) -o $(coverage_html)
GO111MODULE=on go tool cover -func=$(coverage_out)
Expand Down
11 changes: 11 additions & 0 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
_ "net/http/pprof"

"github.com/snowplow-devops/stream-replicator/cmd"
"github.com/snowplow-devops/stream-replicator/pkg/common"
"github.com/snowplow-devops/stream-replicator/pkg/failure/failureiface"
"github.com/snowplow-devops/stream-replicator/pkg/models"
"github.com/snowplow-devops/stream-replicator/pkg/observer"
Expand Down Expand Up @@ -121,13 +122,23 @@ func RunCli(supportedSourceConfigPairs []sourceconfig.ConfigPair) {
select {
case <-stop:
log.Debug("source.Stop() finished successfully!")

err := common.DeleteTemporaryDir()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
case <-time.After(5 * time.Second):
log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...")

t.Close()
ft.Close()
o.Stop()

err := common.DeleteTemporaryDir()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}

os.Exit(1)
}
}()
Expand Down
31 changes: 18 additions & 13 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package config

import (
"errors"
"os"
"path/filepath"
"reflect"
"testing"
Expand All @@ -19,6 +20,10 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/target"
)

func init() {
os.Clearenv()
}

func TestCreateTargetComponentHCL(t *testing.T) {
testCases := []struct {
File string
Expand Down Expand Up @@ -90,9 +95,9 @@ func TestCreateTargetComponentHCL(t *testing.T) {
Headers: "{\"Accept-Language\":\"en-US\"}",
BasicAuthUsername: "testUsername",
BasicAuthPassword: "testPass",
CertFile: "test.cert",
KeyFile: "test.key",
CaFile: "test.ca",
CertFile: "myLocalhost.crt",
KeyFile: "MyLocalhost.key",
CaFile: "myRootCA.crt",
SkipVerifyTLS: true,
},
},
Expand Down Expand Up @@ -138,9 +143,9 @@ func TestCreateTargetComponentHCL(t *testing.T) {
SASLUsername: "testUsername",
SASLPassword: "testPass",
SASLAlgorithm: "sha256",
CertFile: "test.cert",
KeyFile: "test.key",
CaFile: "test.ca",
CertFile: "myLocalhost.crt",
KeyFile: "MyLocalhost.key",
CaFile: "myRootCA.crt",
SkipVerifyTLS: true,
ForceSync: true,
FlushFrequency: 2,
Expand Down Expand Up @@ -217,9 +222,9 @@ func TestCreateFailureTargetComponentENV(t *testing.T) {
SASLUsername: "testUsername",
SASLPassword: "testPass",
SASLAlgorithm: "sha256",
CertFile: "test.cert",
KeyFile: "test.key",
CaFile: "test.ca",
CertFile: "test/certfile.crt",
KeyFile: "test/keyfile.key",
CaFile: "test/cafile.crt",
SkipVerifyTLS: true,
ForceSync: true,
FlushFrequency: 2,
Expand All @@ -230,7 +235,7 @@ func TestCreateFailureTargetComponentENV(t *testing.T) {

t.Run(testCase.Name, func(t *testing.T) {
assert := assert.New(t)

t.Setenv("STREAM_REPLICATOR_CONFIG_FILE", "")
t.Setenv("FAILURE_TARGET_NAME", "kafka")
t.Setenv("FAILURE_TARGET_KAFKA_BROKERS", "testBrokers")
t.Setenv("FAILURE_TARGET_KAFKA_TOPIC_NAME", "testTopic")
Expand All @@ -244,9 +249,9 @@ func TestCreateFailureTargetComponentENV(t *testing.T) {
t.Setenv("FAILURE_TARGET_KAFKA_SASL_USERNAME", "testUsername")
t.Setenv("FAILURE_TARGET_KAFKA_SASL_PASSWORD", "testPass")
t.Setenv("FAILURE_TARGET_KAFKA_SASL_ALGORITHM", "sha256")
t.Setenv("FAILURE_TARGET_KAFKA_TLS_CERT_FILE", "test.cert")
t.Setenv("FAILURE_TARGET_KAFKA_TLS_KEY_FILE", "test.key")
t.Setenv("FAILURE_TARGET_KAFKA_TLS_CA_FILE", "test.ca")
t.Setenv("FAILURE_TARGET_KAFKA_TLS_CERT_FILE", "test/certfile.crt")
t.Setenv("FAILURE_TARGET_KAFKA_TLS_KEY_FILE", "test/keyfile.key")
t.Setenv("FAILURE_TARGET_KAFKA_TLS_CA_FILE", "test/cafile.crt")
t.Setenv("FAILURE_TARGET_KAFKA_TLS_SKIP_VERIFY_TLS", "true")
t.Setenv("FAILURE_TARGET_KAFKA_FORCE_SYNC_PRODUCER", "true")
t.Setenv("FAILURE_TARGET_KAFKA_FLUSH_FREQUENCY", "2")
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestNewConfig(t *testing.T) {
observer, err := c.GetObserver(map[string]string{})
assert.NotNil(observer)
assert.Nil(err)
os.RemoveAll(`tmp_replicator`)
}

func TestNewConfig_FromEnv(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion config/examples/failure-targets/http-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ failure_target {
ca_file = "myRootCA.crt"

# Whether to skip verifying ssl certificates chain (default: false)
# If cert_file and key_file are not provided, this setting is not applied.
# If tls_cert and tls_key are not provided, this setting is not applied.
skip_verify_tls = true
}
}
2 changes: 1 addition & 1 deletion config/examples/targets/http-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ target {
ca_file = "myRootCA.crt"

# Whether to skip verifying ssl certificates chain (default: false)
# If cert_file and key_file are not provided, this setting is not applied.
# If tls_cert and tls_key are not provided, this setting is not applied.
skip_verify_tls = true
}
}
6 changes: 3 additions & 3 deletions config/examples/targets/kafka-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ target {
sasl_algorithm = "sha256"

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"
cert_file = "myLocalhost.crt"

# The optional key file for client authentication
key_file = "MyLocalhost.key"
key_file = "MyLocalhost.key"

# The optional certificate authority file for TLS client authentication
ca_file = "myRootCA.crt"
ca_file = "myRootCA.crt"

# Whether to skip verifying ssl certificates chain (default: false)
skip_verify_tls = true
Expand Down
6 changes: 3 additions & 3 deletions config/test-fixtures/target-http-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ target {
headers = "{\"Accept-Language\":\"en-US\"}"
basic_auth_username = "testUsername"
basic_auth_password = "testPass"
cert_file = "test.cert"
key_file = "test.key"
ca_file = "test.ca"
cert_file = "myLocalhost.crt"
key_file = "MyLocalhost.key"
ca_file = "myRootCA.crt"
skip_verify_tls = true
}
}
6 changes: 3 additions & 3 deletions config/test-fixtures/target-kafka-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ target {
sasl_username = "testUsername"
sasl_password = "testPass"
sasl_algorithm = "sha256"
cert_file = "test.cert"
key_file = "test.key"
ca_file = "test.ca"
cert_file = "myLocalhost.crt"
key_file = "MyLocalhost.key"
ca_file = "myRootCA.crt"
skip_verify_tls = true
force_sync_producer = true
flush_frequency = 2
Expand Down
2 changes: 1 addition & 1 deletion integration/http/localhost.key
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ itxdXvoomlhwDKZv0Y+vPm4V9SBx/36ubf6bM6vKoZTSuv2+ktA/uInFW+y/1mLH
KD1JlQKBgQDNZry00fN3iJ9stUNEYVaAtXQ1a0/LY/r2NuC04IwemwOyFUvzY7G9
sNXeIxTYjQ9OCp9+EE1n6Q3yg63MmTrNuD51f0h2tftokYBaoYBny34HuQf0N7qF
laOI6yiORZ4eGdYrpCq+q+J0fAkRca0M4Nq/lDEw4bric38WpPxV3Q==
-----END RSA PRIVATE KEY-----
-----END RSA PRIVATE KEY-----
2 changes: 1 addition & 1 deletion integration/http/rootCA.crt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ Ggg1Qo5z0+XT2l+2KhOC02ydgHV1/tT6cVVX3ZkBvvb/WPHmVp9bT8zqeJzrMQkM
9DaKEyZKw+LYy7sZp4p4giE/JAzBLidsfIdznhYguPjKgboPMfiJvapzyZPEJsDu
ShYb5uIlytHwAVlGiUgjx+z/YXBQN1vWsCm5pVL4RGdXdcq5HZzZRaJxAUBrfmiU
uCJPEnUJ1emIqakgSy3yA+9WtQ==
-----END CERTIFICATE-----
-----END CERTIFICATE-----
89 changes: 79 additions & 10 deletions pkg/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
package common

import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"io/ioutil"
"os"
"time"

Expand All @@ -26,25 +29,47 @@ import (
// and attempt to create a JSON file on disk within the /tmp directory
// for later use in creating GCP clients.
func GetGCPServiceAccountFromBase64(serviceAccountB64 string) (string, error) {
sDec, err := base64.StdEncoding.DecodeString(serviceAccountB64)
targetFile := fmt.Sprintf(`tmp_replicator/stream-replicator-service-account-%s.json`, uuid.NewV4().String())
err := DecodeB64ToFile(serviceAccountB64, targetFile)
if err != nil {
return "", errors.Wrap(err, "Failed to Base64 decode service account")
return ``, err
}
return targetFile, nil
}

// DeleteTemporaryDir deletes the temp directory we created to store credentials
func DeleteTemporaryDir() error {
err := os.RemoveAll(`tmp_replicator`)
return err
}

targetFile := fmt.Sprintf("/tmp/stream-replicator-service-account-%s.json", uuid.NewV4().String())
// DecodeB64ToFile takes a B64-encoded credential, decodes it, and writes it to a file
func DecodeB64ToFile(b64String, filename string) error {
tls, decodeErr := base64.StdEncoding.DecodeString(b64String)
if decodeErr != nil {
return errors.Wrap(decodeErr, "Failed to Base64 decode for creating file "+filename)
}

f, err := os.Create(targetFile)
err := createTempDir(`tmp_replicator`)
if err != nil {
return "", errors.Wrap(err, fmt.Sprintf("Failed to create target file '%s' for service account", targetFile))
return err
}
defer f.Close()

_, err2 := f.WriteString(string(sDec))
if err2 != nil {
return "", errors.Wrap(err, fmt.Sprintf("Failed to write decoded service account to target file '%s'", targetFile))
f, createErr := os.Create(filename)
if createErr != nil {
return errors.Wrap(createErr, fmt.Sprintf("Failed to create file '%s'", filename))
}

return targetFile, nil
_, writeErr := f.WriteString(string(tls))
if writeErr != nil {
return errors.Wrap(decodeErr, fmt.Sprintf("Failed to write decoded base64 string to target file '%s'", filename))
}
err = f.Close()
if err != nil {
return err
}

return nil
}

// GetAWSSession is a general tool to handle generating an AWS session
Expand Down Expand Up @@ -87,3 +112,47 @@ func GetAverageFromDuration(sum time.Duration, total int64) time.Duration {
}
return time.Duration(0)
}

func createTempDir(dirName string) error {
dir, statErr := os.Stat(dirName)
if statErr != nil && !errors.Is(statErr, os.ErrNotExist) {
return errors.Wrap(statErr, fmt.Sprintf("Failed checking for existence of %s dir", dirName))
}

if dir == nil {
dirErr := os.Mkdir(dirName, 0700)
if dirErr != nil && !errors.Is(dirErr, os.ErrExist) {
return errors.Wrap(dirErr, fmt.Sprintf("Failed to create %s directory", dirName))
}
}
return nil
}

// CreateTLSConfiguration creates a TLS configuration for use in a target
func CreateTLSConfiguration(certFile string, keyFile string, caFile string, skipVerify bool) (*tls.Config, error) {
if certFile == "" || keyFile == "" {
return nil, nil
}

cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}

caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, err
}

caCertPool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
caCertPool.AppendCertsFromPEM(caCert)

return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: skipVerify,
}, nil
}
21 changes: 19 additions & 2 deletions pkg/common/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,30 @@
package common

import (
"crypto/tls"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func init() {
os.Clearenv()
}

// --- Cloud Helpers

func TestGetGCPServiceAccountFromBase64(t *testing.T) {
assert := assert.New(t)
defer DeleteTemporaryDir()

path, err := GetGCPServiceAccountFromBase64("ewogICJoZWxsbyI6IndvcmxkIgp9")

assert.NotEqual(path, "")
assert.Nil(err)
assert.True(strings.HasPrefix(path, "/tmp/stream-replicator-service-account-"))
assert.True(strings.HasPrefix(path, "tmp_replicator/stream-replicator-service-account-"))
assert.True(strings.HasSuffix(path, ".json"))
}

Expand All @@ -34,12 +41,13 @@ func TestGetGCPServiceAccountFromBase64_NotBase64(t *testing.T) {

assert.Equal(path, "")
assert.NotNil(err)
assert.True(strings.HasPrefix(err.Error(), "Failed to Base64 decode service account: "))
assert.True(strings.HasPrefix(err.Error(), "Failed to Base64 decode"))
}

func TestGetAWSSession(t *testing.T) {
assert := assert.New(t)

t.Setenv("AWS_SHARED_CREDENTIALS_FILE", "")
sess, cfg, accID, err := GetAWSSession("us-east-1", "")
assert.NotNil(sess)
assert.Nil(cfg)
Expand All @@ -64,3 +72,12 @@ func TestGetAverageFromDuration(t *testing.T) {
duration2 := GetAverageFromDuration(time.Duration(10)*time.Second, 2)
assert.Equal(time.Duration(5)*time.Second, duration2)
}

func TestCreateTLSConfiguration(t *testing.T) {
assert := assert.New(t)

conf, err := CreateTLSConfiguration(`../../integration/http/localhost.crt`, `../../integration/http/localhost.key`, `../../integration/http/rootCA.crt`, false)

assert.Nil(err)
assert.IsType(tls.Config{}, *conf)
}
Loading

0 comments on commit 1a410fb

Please sign in to comment.