Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete topic settings from broker if they are not present in the topic config #198

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ The `apply` subcommand can make changes, but under the following conditions:
8. Before applying, the tool checks the cluster ID against the expected value in the
cluster config. This can help prevent errors around applying in the wrong cluster when multiple
clusters are accessed through the same address, e.g `localhost:2181`.
9. If the `destructive` CLI argument is passed, `apply` deletes the settings that are
set on the broker but not set in configuration.

The `reset-offsets` command can also make changes in the cluster and should be used carefully.

Expand Down
8 changes: 8 additions & 0 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type applyCmdConfig struct {
retentionDropStepDurationStr string
skipConfirm bool
ignoreFewerPartitionsError bool
destructive bool
sleepLoopDuration time.Duration
failFast bool

Expand Down Expand Up @@ -107,6 +108,12 @@ func init() {
false,
"Don't return error when topic's config specifies fewer partitions than it currently has",
)
applyCmd.Flags().BoolVar(
&applyConfig.destructive,
"destructive",
false,
"Deletes topic settings from the broker if the settings are present on the broker but not in the config",
)
applyCmd.Flags().DurationVar(
&applyConfig.sleepLoopDuration,
"sleep-loop-duration",
Expand Down Expand Up @@ -259,6 +266,7 @@ func applyTopic(
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
SkipConfirm: applyConfig.skipConfirm,
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
Destructive: applyConfig.destructive,
SleepLoopDuration: applyConfig.sleepLoopDuration,
TopicConfig: topicConfig,
}
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func rebalanceApplyTopic(
AutoContinueRebalance: true, // to continue without prompts
RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance
SkipConfirm: true, // to enforce action: rebalance
Destructive: false, // Irrelevant here
SleepLoopDuration: rebalanceConfig.sleepLoopDuration,
TopicConfig: topicConfig,
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type TopicApplierConfig struct {
RetentionDropStepDuration time.Duration
SkipConfirm bool
IgnoreFewerPartitionsError bool
Destructive bool
SleepLoopDuration time.Duration
TopicConfig config.TopicConfig
}
Expand Down Expand Up @@ -392,6 +393,8 @@ func (t *TopicApplier) updateSettings(
return err
}

configEntries := []kafka.ConfigEntry{}

if len(diffKeys) > 0 {
diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys)
if err != nil {
Expand All @@ -416,6 +419,23 @@ func (t *TopicApplier) updateSettings(
)
}

configEntries, err = topicSettings.ToConfigEntries(diffKeys)
if err != nil {
return err
}
}

if len(missingKeys) > 0 && t.config.Destructive {
log.Infof(
"Found %d key(s) set in cluster but missing from config to be deleted:\n%s",
len(missingKeys),
FormatMissingKeys(topicInfo.Config, missingKeys),
)

configEntries = append(configEntries, topicSettings.ToEmptyConfigEntries(missingKeys)...)
}

if len(configEntries) > 0 {
if t.config.DryRun {
log.Infof("Skipping update because dryRun is set to true")
return nil
Expand All @@ -430,11 +450,6 @@ func (t *TopicApplier) updateSettings(
}
log.Infof("OK, updating")

configEntries, err := topicSettings.ToConfigEntries(diffKeys)
if err != nil {
return err
}

_, err = t.adminClient.UpdateTopicConfig(
ctx,
t.topicName,
Expand All @@ -446,7 +461,7 @@ func (t *TopicApplier) updateSettings(
}
}

if len(missingKeys) > 0 {
if len(missingKeys) > 0 && !t.config.Destructive {
log.Warnf(
"Found %d key(s) set in cluster but missing from config:\n%s\nThese will be left as-is.",
len(missingKeys),
Expand Down
21 changes: 21 additions & 0 deletions pkg/apply/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@ func TestApplyBasicUpdates(t *testing.T) {
applier.topicConfig.Spec.ReplicationFactor = 3
err = applier.Apply(ctx)
require.NotNil(t, err)
applier.topicConfig.Spec.ReplicationFactor = 2

// Settings are not deleted if Destructive is false. They are
// if it is true
delete(applier.topicConfig.Spec.Settings, "cleanup.policy")
err = applier.Apply(ctx)
require.NoError(t, err)
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
require.NoError(t, err)

assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"])

applier.config.Destructive = true
err = applier.Apply(ctx)
require.NoError(t, err)
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
require.NoError(t, err)

_, present := topicInfo.Config["cleanup.policy"]
assert.False(t, present)

}

func TestApplyPlacementUpdates(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,22 @@ func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, erro
return entries, nil
}

// Produces a slice of kafka-go config entries with empty value. Thus used
// for deletion of the setting.
func (t TopicSettings) ToEmptyConfigEntries(keys []string) []kafka.ConfigEntry {
entries := []kafka.ConfigEntry{}

if keys != nil {
for _, key := range keys {
entries = append(
entries,
kafka.ConfigEntry{ConfigName: key, ConfigValue: ""},
)
}
}
return entries
}

// HasKey returns whether the current settings instance contains the argument key.
func (t TopicSettings) HasKey(key string) bool {
_, ok := t[key]
Expand Down