Skip to content

Commit

Permalink
Fix cluster config environment interaction (#55)
Browse files Browse the repository at this point in the history
* Fix errors

* Fix version

* Don't check SASL mechanism if running from cluster config

* Update README

* Update README

* Fix README

* Fix README

* Update go version to 1.17

* Fix README

* Bump version
  • Loading branch information
yolken-segment authored Nov 18, 2021
1 parent 17ef91f commit f6bafce
Show file tree
Hide file tree
Showing 24 changed files with 154 additions and 94 deletions.
29 changes: 21 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ that makes it easy to tail and summarize structured data in Kafka.

Either:

1. Run `GO111MODULE="on" go get github.com/segmentio/topicctl/cmd/topicctl`
1. Run `go install github.com/segmentio/topicctl/cmd/topicctl@latest`
2. Clone this repo and run `make install` in the repo root
3. Use the Docker image: `docker pull segment/topicctl`

Expand Down Expand Up @@ -228,7 +228,12 @@ independently of an `apply` workflow.
### Version compatibility

We've tested `topicctl` on Kafka clusters with versions between `0.10.1` and `2.7.1`, inclusive.
If you run into any compatibility issues, please file a bug.

Note, however, that clusters at versions prior to `2.4.0` cannot use broker APIs for applying and
thus also require ZooKeeper API access for full functionality. See the
[cluster access details](#cluster-access-details) section below for more details.

If you run into any unexpected compatibility issues, please file a bug.

## Config formats

Expand All @@ -254,13 +259,15 @@ meta:
spec:
bootstrapAddrs: # One or more broker bootstrap addresses
- my-cluster.example.com:9092
clusterID: abc-123-xyz # Expected cluster ID for cluster (optional, used as safety check only)
clusterID: abc-123-xyz # Expected cluster ID for cluster (optional,
# used as safety check only)

# ZooKeeper access settings (only required for pre-v2 clusters; leave off to force exclusive use
# of broker APIs)
zkAddrs: # One or more cluster zookeeper addresses; if these are
- zk.example.com:2181 # omitted, then the cluster will only be accessed via broker APIs;
# see the section below on cluster access for more details.
- zk.example.com:2181 # omitted, then the cluster will only be accessed via
# broker APIs; see the section below on cluster access for
# more details.
zkPrefix: my-cluster # Prefix for zookeeper nodes if using zookeeper access
zkLockPath: /topicctl/locks # Path used for apply locks (optional)

Expand All @@ -274,16 +281,22 @@ spec:
# SASL settings (optional, not supported if using ZooKeeper)
sasl:
enabled: true # Whether SASL is enabled
mechanism: SCRAM-SHA-512 # Mechanism to use; choices are PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512
username: my-username # Username; can also be set via TOPICCTL_SASL_USERNAME environment variable
password: my-password # Password; can also be set via TOPICCTL_SASL_PASSWORD environment variable
mechanism: SCRAM-SHA-512 # Mechanism to use;
# choices are PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512
username: my-username # SASL username
password: my-password # SASL password
```
Note that the `name`, `environment`, `region`, and `description` fields are used
for description/identification only, and don't appear in any API calls. They can
be set arbitrarily, provided that they match up with the values set in the
associated topic configs.

If the tool is run with the `--expand-env` option, then the cluster config will be prepreocessed
using [`os.ExpandEnv`](https://pkg.go.dev/os#ExpandEnv) at load time. The latter will replace
references of the form `$ENV_VAR_NAME` or `${ENV_VAR_NAME}` with the associated values from the
environment.

### Topics

Each topic is configured in a YAML file. The following is an
Expand Down
2 changes: 1 addition & 1 deletion cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func applyTopic(
return err
}

clusterConfig, err := config.LoadClusterFile(clusterConfigPath)
clusterConfig, err := config.LoadClusterFile(clusterConfigPath, applyConfig.shared.expandEnv)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/topicctl/subcmd/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func bootstrapRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clusterConfig, err := config.LoadClusterFile(bootstrapConfig.shared.clusterConfig)
clusterConfig, err := config.LoadClusterFile(
bootstrapConfig.shared.clusterConfig,
bootstrapConfig.shared.expandEnv,
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/topicctl/subcmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func checkTopicFile(
return false, err
}

clusterConfig, err := config.LoadClusterFile(clusterConfigPath)
clusterConfig, err := config.LoadClusterFile(clusterConfigPath, checkConfig.shared.expandEnv)
if err != nil {
return false, err
}
Expand Down
27 changes: 22 additions & 5 deletions cmd/topicctl/subcmd/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type sharedOptions struct {
brokerAddr string
clusterConfig string
expandEnv bool
saslMechanism string
saslPassword string
saslUsername string
Expand Down Expand Up @@ -47,10 +48,14 @@ func (s sharedOptions) validate() error {
}
if s.clusterConfig != "" &&
(s.zkAddr != "" || s.zkPrefix != "" || s.brokerAddr != "" || s.tlsCACert != "" ||
s.tlsCert != "" || s.tlsKey != "" || s.saslMechanism != "") {
s.tlsCert != "" || s.tlsKey != "" || s.tlsServerName != "" || s.saslMechanism != "") {
log.Warn("Broker and zk flags are ignored when using cluster-config")
}

if s.clusterConfig != "" {
return err
}

useTLS := s.tlsEnabled || s.tlsCACert != "" || s.tlsCert != "" || s.tlsKey != ""
useSASL := s.saslMechanism != "" || s.saslPassword != "" || s.saslUsername != ""

Expand All @@ -76,14 +81,14 @@ func (s sharedOptions) getAdminClient(
readOnly bool,
) (admin.Client, error) {
if s.clusterConfig != "" {
clusterConfig, err := config.LoadClusterFile(s.clusterConfig)
clusterConfig, err := config.LoadClusterFile(s.clusterConfig, s.expandEnv)
if err != nil {
return nil, err
}
return clusterConfig.NewAdminClient(
ctx,
sess,
true,
readOnly,
s.saslUsername,
s.saslPassword,
)
Expand Down Expand Up @@ -125,8 +130,6 @@ func (s sharedOptions) getAdminClient(
ZKAddrs: []string{s.zkAddr},
ZKPrefix: s.zkPrefix,
Sess: sess,
// Run in read-only mode to ensure that tailing doesn't make any changes
// in the cluster
ReadOnly: readOnly,
},
)
Expand All @@ -141,6 +144,13 @@ func addSharedFlags(cmd *cobra.Command, options *sharedOptions) {
"",
"Broker address",
)
cmd.Flags().BoolVarP(
&options.expandEnv,
"expand-env",
"",
false,
"Expand environment in cluster config",
)
cmd.Flags().StringVar(
&options.clusterConfig,
"cluster-config",
Expand Down Expand Up @@ -223,6 +233,13 @@ func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) {
os.Getenv("TOPICCTL_CLUSTER_CONFIG"),
"Cluster config",
)
cmd.Flags().BoolVarP(
&options.expandEnv,
"expand-env",
"",
false,
"Expand environment in cluster config",
)
cmd.Flags().StringVar(
&options.saslPassword,
"sasl-password",
Expand Down
32 changes: 26 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/segmentio/topicctl

go 1.16
go 1.17

// Use something like this to use a local kafka-go (useful for testing purposes).
// replace github.com/segmentio/kafka-go => /Users/benjamin.yolken/dev/src/github.com/segmentio/kafka-go
Expand All @@ -12,20 +12,40 @@ require (
github.com/fatih/color v1.9.0
github.com/ghodss/yaml v1.0.0
github.com/hashicorp/go-multierror v1.1.0
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-tty v0.0.3 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/olekukonko/tablewriter v0.0.4
github.com/onsi/gomega v1.5.0 // indirect
github.com/pkg/term v0.0.0-20200520122047-c3ffed290a03 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/segmentio/kafka-go v0.4.21-0.20211001205616-c03923d67699
github.com/sirupsen/logrus v1.2.0
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.6.1
github.com/x-cray/logrus-prefixed-formatter v0.5.2
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
github.com/klauspost/compress v1.9.8 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mattn/go-runewidth v0.0.7 // indirect
github.com/mattn/go-tty v0.0.3 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/onsi/ginkgo v1.6.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pkg/term v0.0.0-20200520122047-c3ffed290a03 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae // indirect
golang.org/x/text v0.3.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da h1:p3Vo3i64TCLY7gIfzeQaUJ+kppEO5WQG3cL8iE8tGHU=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/segmentio/kafka-go v0.4.21-0.20211001180856-4d75f822c8b8 h1:H45Dpqb99IyjZoq/+WCuVz0UWe2JOrajBVmg6QsYyaQ=
github.com/segmentio/kafka-go v0.4.21-0.20211001180856-4d75f822c8b8/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/segmentio/kafka-go v0.4.21-0.20211001205616-c03923d67699 h1:DM1XDA47wY0myfsik7hUz+pkmI2uhkfKsa6ogOTNLxw=
github.com/segmentio/kafka-go v0.4.21-0.20211001205616-c03923d67699/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
Expand Down
4 changes: 2 additions & 2 deletions pkg/admin/throttles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func TestParseBrokerThrottles(t *testing.T) {
},
}
_, _, err = ParseBrokerThrottles(badBrokers)
assert.NotNil(t, err)
assert.Error(t, err)
}

func TestParsePartitionThrottles(t *testing.T) {
Expand Down Expand Up @@ -385,5 +385,5 @@ func TestParsePartitionThrottles(t *testing.T) {
},
}
_, _, err = ParsePartitionThrottles(badTopic)
assert.NotNil(t, err)
assert.Error(t, err)
}
8 changes: 4 additions & 4 deletions pkg/admin/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,16 @@ func TestTopicRackHelpers(t *testing.T) {

brokerRacks := BrokerRacks(testBrokers)
minRacks, maxRacks, err := testTopic.RackCounts(brokerRacks)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, minRacks, 1)
assert.Equal(t, maxRacks, 3)

numRacks, err := testTopic.Partitions[0].NumRacks(brokerRacks)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, 3, numRacks)

racks, err := testTopic.Partitions[0].Racks(brokerRacks)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, []string{"rack1", "rack2", "rack3"}, racks)
}

Expand Down Expand Up @@ -343,7 +343,7 @@ func TestPartitionAssignmentHelpers(t *testing.T) {
result,
)
replicas, err := AssignmentsToReplicas(result)
assert.Nil(t, err)
assert.NoError(t, err)

assert.Equal(
t,
Expand Down
Loading

0 comments on commit f6bafce

Please sign in to comment.