diff --git a/backend/pkg/connector/interceptor/iceberg_sink_hook.go b/backend/pkg/connector/interceptor/iceberg_sink_hook.go new file mode 100644 index 000000000..a160f06b7 --- /dev/null +++ b/backend/pkg/connector/interceptor/iceberg_sink_hook.go @@ -0,0 +1,194 @@ +package interceptor + +import ( + "github.com/redpanda-data/console/backend/pkg/connector/model" +) + +// KafkaConnectToConsoleIcebergSinkHook adds Iceberg sink specific config options +// missing in Validate Kafka Connect response +func KafkaConnectToConsoleIcebergSinkHook(response model.ValidationResponse, config map[string]any) model.ValidationResponse { + response.Configs = append(response.Configs, model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.type", + Type: "STRING", + DefaultValue: "rest", + Importance: model.ConfigDefinitionImportanceMedium, + Required: false, + DisplayName: "Iceberg catalog type", + Documentation: "The Iceberg catalog type", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.type", + Value: "rest", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + Metadata: model.ConfigDefinitionMetadata{ + ComponentType: model.ComponentRadioGroup, + RecommendedValues: []model.RecommendedValueWithMetadata{ + {Value: "rest", DisplayName: "REST"}, + {Value: "hive", DisplayName: "HIVE"}, + {Value: "hadoop", DisplayName: "HADOOP"}, + }, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.uri", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: true, + DisplayName: "Iceberg catalog uri", + Documentation: "Iceberg catalog uri", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.uri", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.secret-access-key", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: true, + DisplayName: "iceberg.catalog.s3.secret-access-key", + Documentation: "iceberg.catalog.s3.secret-access-key", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.secret-access-key", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.access-key-id", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: true, + DisplayName: "iceberg.catalog.s3.access-key-id", + Documentation: "iceberg.catalog.s3.access-key-id", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.access-key-id", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.endpoint", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: true, + DisplayName: "iceberg.catalog.s3.endpoint", + Documentation: "iceberg.catalog.s3.endpoint", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.endpoint", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.s3.path-style-access", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: true, + DisplayName: "iceberg.catalog.s3.path-style-access", + Documentation: "iceberg.catalog.s3.path-style-access", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.s3.path-style-access", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.client.region", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: true, + DisplayName: "iceberg.catalog.client.region", + Documentation: "iceberg.catalog.client.region", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.client.region", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.credential", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: true, + DisplayName: "Iceberg catalog credential", + Documentation: "Iceberg catalog credential", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.credential", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "iceberg.catalog.warehouse", + Type: "STRING", + DefaultValue: "", + Importance: model.ConfigDefinitionImportanceHigh, + Required: true, + DisplayName: "Iceberg catalog warehouse", + Documentation: "Iceberg catalog warehouse", + Dependents: []string{}, + }, + Value: model.ConfigDefinitionValue{ + Name: "iceberg.catalog.warehouse", + Value: "", + RecommendedValues: []string{}, + Visible: true, + Errors: []string{}, + }, + }, + ) + + return KafkaConnectToConsoleJSONSchemaHook(response, config) +} diff --git a/backend/pkg/connector/interceptor/interceptor.go b/backend/pkg/connector/interceptor/interceptor.go index d00e1f3c7..46b3b7678 100644 --- a/backend/pkg/connector/interceptor/interceptor.go +++ b/backend/pkg/connector/interceptor/interceptor.go @@ -57,6 +57,7 @@ func CommunityPatches() []patch.ConfigPatch { patch.NewConfigPatchJdbcSink(), patch.NewConfigPatchJdbcSource(), patch.NewConfigPatchHTTPSource(), + patch.NewConfigPatchIcebergSink(), patch.NewConfigPatchMirrorSource(), patch.NewConfigPatchMirrorHeartbeat(), patch.NewConfigPatchMongoDB(), @@ -88,6 +89,7 @@ func CommunityGuides(opts ...guide.Option) []guide.Guide { guide.NewJdbcSourceGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleTopicCreationHook)), guide.NewHTTPSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectHTTPSourceHook), guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleHTTPSourceHook)), + guide.NewIcebergSinkGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleIcebergSinkHook)), guide.NewMirrorSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook), guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMirrorSourceHook)), guide.NewMirrorCheckpointGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook), diff --git a/backend/pkg/connector/patch/iceberg_sink.go b/backend/pkg/connector/patch/iceberg_sink.go index 63af00c14..a8f792653 100644 --- a/backend/pkg/connector/patch/iceberg_sink.go +++ b/backend/pkg/connector/patch/iceberg_sink.go @@ -50,10 +50,12 @@ func (*ConfigPatchIcebergSink) PatchDefinition(d model.ConfigDefinition, _ strin // Misc patches switch d.Definition.Name { case keyConverter: - d.SetImportance(model.ConfigDefinitionImportanceHigh) + d.SetImportance(model.ConfigDefinitionImportanceHigh). + ClearRecommendedValuesWithMetadata(). + AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON"). + SetDefaultValue("org.apache.kafka.connect.json.JsonConverter") case valueConverter: d.ClearRecommendedValuesWithMetadata(). - AddRecommendedValueWithMetadata("io.confluent.connect.avro.AvroConverter", "AVRO"). AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON"). SetDefaultValue("org.apache.kafka.connect.json.JsonConverter") case name: