From 41ce8b8c1f4e4128fecf7d05217484967e1a6892 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 6 Jun 2024 17:48:28 -0700 Subject: [PATCH 1/6] Support deletion --- cmd/topicctl/subcmd/apply.go | 8 ++++++++ cmd/topicctl/subcmd/rebalance.go | 1 + pkg/apply/apply.go | 27 +++++++++++++++++++++------ pkg/config/settings.go | 16 ++++++++++++++++ 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 5e4e6ee4..13d7f520 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -36,6 +36,7 @@ type applyCmdConfig struct { retentionDropStepDurationStr string skipConfirm bool ignoreFewerPartitionsError bool + allowSettingsDeletion bool sleepLoopDuration time.Duration failFast bool @@ -107,6 +108,12 @@ func init() { false, "Don't return error when topic's config specifies fewer partitions than it currently has", ) + applyCmd.Flags().BoolVar( + &applyConfig.allowSettingsDeletion, + "allow-settings-deletion", + false, + "Deletes topic settings from the broker if the setting is set on the broker but not in config", + ) applyCmd.Flags().DurationVar( &applyConfig.sleepLoopDuration, "sleep-loop-duration", @@ -259,6 +266,7 @@ func applyTopic( RetentionDropStepDuration: applyConfig.retentionDropStepDuration, SkipConfirm: applyConfig.skipConfirm, IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError, + AllowSettingsDeletion: applyConfig.allowSettingsDeletion, SleepLoopDuration: applyConfig.sleepLoopDuration, TopicConfig: topicConfig, } diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 7823a251..d5c22da0 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -307,6 +307,7 @@ func rebalanceApplyTopic( AutoContinueRebalance: true, // to continue without prompts RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance SkipConfirm: true, // to enforce action: rebalance + AllowSettingsDeletion: false, // Irrelevant here SleepLoopDuration: rebalanceConfig.sleepLoopDuration, TopicConfig: topicConfig, } diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 20eede18..babbe9ba 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -37,6 +37,7 @@ type TopicApplierConfig struct { RetentionDropStepDuration time.Duration SkipConfirm bool IgnoreFewerPartitionsError bool + AllowSettingsDeletion bool SleepLoopDuration time.Duration TopicConfig config.TopicConfig } @@ -392,6 +393,8 @@ func (t *TopicApplier) updateSettings( return err } + configEntries := []kafka.ConfigEntry{} + if len(diffKeys) > 0 { diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys) if err != nil { @@ -416,6 +419,23 @@ func (t *TopicApplier) updateSettings( ) } + configEntries, err = topicSettings.ToConfigEntries(diffKeys) + if err != nil { + return err + } + } + + if len(missingKeys) > 0 && t.config.AllowSettingsDeletion { + log.Infof( + "Found %d key(s) set in cluster but missing from config for deletion:\n%s", + len(missingKeys), + FormatMissingKeys(topicInfo.Config, missingKeys), + ) + + configEntries = append(configEntries, topicSettings.ToEmptyConfigEntries(missingKeys)...) + } + + if len(configEntries) > 0 { if t.config.DryRun { log.Infof("Skipping update because dryRun is set to true") return nil @@ -430,11 +450,6 @@ func (t *TopicApplier) updateSettings( } log.Infof("OK, updating") - configEntries, err := topicSettings.ToConfigEntries(diffKeys) - if err != nil { - return err - } - _, err = t.adminClient.UpdateTopicConfig( ctx, t.topicName, @@ -446,7 +461,7 @@ func (t *TopicApplier) updateSettings( } } - if len(missingKeys) > 0 { + if len(missingKeys) > 0 && !t.config.AllowSettingsDeletion { log.Warnf( "Found %d key(s) set in cluster but missing from config:\n%s\nThese will be left as-is.", len(missingKeys), diff --git a/pkg/config/settings.go b/pkg/config/settings.go index 1a5c6621..bfbe6b1a 100644 --- a/pkg/config/settings.go +++ b/pkg/config/settings.go @@ -346,6 +346,22 @@ func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, erro return entries, nil } +// Produces a slice of kafka-go config entries with empty value. Thus used +// for deletion of the setting. +func (t TopicSettings) ToEmptyConfigEntries(keys []string) []kafka.ConfigEntry { + entries := []kafka.ConfigEntry{} + + if keys != nil { + for _, key := range keys { + entries = append( + entries, + kafka.ConfigEntry{ConfigName: key, ConfigValue: ""}, + ) + } + } + return entries +} + // HasKey returns whether the current settings instance contains the argument key. func (t TopicSettings) HasKey(key string) bool { _, ok := t[key] From 34895e555d01f026499e299b8d1f1abc267392e7 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 7 Jun 2024 11:49:05 -0700 Subject: [PATCH 2/6] Add test --- pkg/apply/apply_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/pkg/apply/apply_test.go b/pkg/apply/apply_test.go index 033cdff9..b3ac6de0 100644 --- a/pkg/apply/apply_test.go +++ b/pkg/apply/apply_test.go @@ -80,6 +80,27 @@ func TestApplyBasicUpdates(t *testing.T) { applier.topicConfig.Spec.ReplicationFactor = 3 err = applier.Apply(ctx) require.NotNil(t, err) + applier.topicConfig.Spec.ReplicationFactor = 2 + + // Settings are not deleted if AllowSettingsDeletion is false. They are + // if it is true + delete(applier.topicConfig.Spec.Settings, "cleanup.policy") + err = applier.Apply(ctx) + require.NoError(t, err) + topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true) + require.NoError(t, err) + + assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"]) + + applier.config.AllowSettingsDeletion = true + err = applier.Apply(ctx) + require.NoError(t, err) + topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true) + require.NoError(t, err) + + _, present := topicInfo.Config["cleanup.policy"] + assert.False(t, present) + } func TestApplyPlacementUpdates(t *testing.T) { From b1cf1150d7ebc94a3790a257cc1030854f349a16 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 7 Jun 2024 14:26:18 -0700 Subject: [PATCH 3/6] Rename flag --- cmd/topicctl/subcmd/apply.go | 9 ++++----- cmd/topicctl/subcmd/rebalance.go | 2 +- pkg/apply/apply.go | 6 +++--- pkg/apply/apply_test.go | 4 ++-- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 13d7f520..ee789dd1 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -36,7 +36,7 @@ type applyCmdConfig struct { retentionDropStepDurationStr string skipConfirm bool ignoreFewerPartitionsError bool - allowSettingsDeletion bool + destructive bool sleepLoopDuration time.Duration failFast bool @@ -109,8 +109,8 @@ func init() { "Don't return error when topic's config specifies fewer partitions than it currently has", ) applyCmd.Flags().BoolVar( - &applyConfig.allowSettingsDeletion, - "allow-settings-deletion", + &applyConfig.destructive, + "destructive", false, "Deletes topic settings from the broker if the setting is set on the broker but not in config", ) @@ -266,8 +266,7 @@ func applyTopic( RetentionDropStepDuration: applyConfig.retentionDropStepDuration, SkipConfirm: applyConfig.skipConfirm, IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError, - AllowSettingsDeletion: applyConfig.allowSettingsDeletion, - SleepLoopDuration: applyConfig.sleepLoopDuration, + Destructive: applyConfig.destructive, TopicConfig: topicConfig, } diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index d5c22da0..2b629023 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -307,7 +307,7 @@ func rebalanceApplyTopic( AutoContinueRebalance: true, // to continue without prompts RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance SkipConfirm: true, // to enforce action: rebalance - AllowSettingsDeletion: false, // Irrelevant here + Destructive: false, // Irrelevant here SleepLoopDuration: rebalanceConfig.sleepLoopDuration, TopicConfig: topicConfig, } diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index babbe9ba..61a9dc45 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -37,7 +37,7 @@ type TopicApplierConfig struct { RetentionDropStepDuration time.Duration SkipConfirm bool IgnoreFewerPartitionsError bool - AllowSettingsDeletion bool + Destructive bool SleepLoopDuration time.Duration TopicConfig config.TopicConfig } @@ -425,7 +425,7 @@ func (t *TopicApplier) updateSettings( } } - if len(missingKeys) > 0 && t.config.AllowSettingsDeletion { + if len(missingKeys) > 0 && t.config.Destructive { log.Infof( "Found %d key(s) set in cluster but missing from config for deletion:\n%s", len(missingKeys), @@ -461,7 +461,7 @@ func (t *TopicApplier) updateSettings( } } - if len(missingKeys) > 0 && !t.config.AllowSettingsDeletion { + if len(missingKeys) > 0 && !t.config.Destructive { log.Warnf( "Found %d key(s) set in cluster but missing from config:\n%s\nThese will be left as-is.", len(missingKeys), diff --git a/pkg/apply/apply_test.go b/pkg/apply/apply_test.go index b3ac6de0..78d4c9ad 100644 --- a/pkg/apply/apply_test.go +++ b/pkg/apply/apply_test.go @@ -82,7 +82,7 @@ func TestApplyBasicUpdates(t *testing.T) { require.NotNil(t, err) applier.topicConfig.Spec.ReplicationFactor = 2 - // Settings are not deleted if AllowSettingsDeletion is false. They are + // Settings are not deleted if Destructive is false. They are // if it is true delete(applier.topicConfig.Spec.Settings, "cleanup.policy") err = applier.Apply(ctx) @@ -92,7 +92,7 @@ func TestApplyBasicUpdates(t *testing.T) { assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"]) - applier.config.AllowSettingsDeletion = true + applier.config.Destructive = true err = applier.Apply(ctx) require.NoError(t, err) topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true) From a968418ef59ca203dd1c8871639b809c9dd5064d Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 7 Jun 2024 16:47:30 -0700 Subject: [PATCH 4/6] Add notes on the README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index eb1bf995..bbc618dc 100644 --- a/README.md +++ b/README.md @@ -523,6 +523,8 @@ The `apply` subcommand can make changes, but under the following conditions: 8. Before applying, the tool checks the cluster ID against the expected value in the cluster config. This can help prevent errors around applying in the wrong cluster when multiple clusters are accessed through the same address, e.g `localhost:2181`. +9. If the `destructive` CLI argument is passed, `apply` deletes the settings that are + set on the broker but not set in configuration. The `reset-offsets` command can also make changes in the cluster and should be used carefully. From 66f419cb38f88120c5c1f8f22327f497a4cee654 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 7 Jun 2024 17:12:27 -0700 Subject: [PATCH 5/6] Revert accidental change --- cmd/topicctl/subcmd/apply.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index ee789dd1..9044268e 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -112,7 +112,7 @@ func init() { &applyConfig.destructive, "destructive", false, - "Deletes topic settings from the broker if the setting is set on the broker but not in config", + "Deletes topic settings from the broker if the settings are present on the broker but not in the config", ) applyCmd.Flags().DurationVar( &applyConfig.sleepLoopDuration, @@ -267,6 +267,7 @@ func applyTopic( SkipConfirm: applyConfig.skipConfirm, IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError, Destructive: applyConfig.destructive, + SleepLoopDuration: applyConfig.sleepLoopDuration, TopicConfig: topicConfig, } From 6972a4cc3df32c335138117f62adc13634f158a2 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 7 Jun 2024 17:14:37 -0700 Subject: [PATCH 6/6] Change info message --- pkg/apply/apply.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 61a9dc45..408dbf8b 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -427,7 +427,7 @@ func (t *TopicApplier) updateSettings( if len(missingKeys) > 0 && t.config.Destructive { log.Infof( - "Found %d key(s) set in cluster but missing from config for deletion:\n%s", + "Found %d key(s) set in cluster but missing from config to be deleted:\n%s", len(missingKeys), FormatMissingKeys(topicInfo.Config, missingKeys), )