Skip to content

Commit

Permalink
connectors: Iceberg connector guide
Browse files Browse the repository at this point in the history
  • Loading branch information
bochenekmartin committed Aug 8, 2023
1 parent 94a4cbf commit 81bb903
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 2 deletions.
194 changes: 194 additions & 0 deletions backend/pkg/connector/interceptor/iceberg_sink_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package interceptor

import (
"github.com/redpanda-data/console/backend/pkg/connector/model"
)

// KafkaConnectToConsoleIcebergSinkHook adds Iceberg sink specific config options
// missing in Validate Kafka Connect response
func KafkaConnectToConsoleIcebergSinkHook(response model.ValidationResponse, config map[string]any) model.ValidationResponse {
response.Configs = append(response.Configs, model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.type",
Type: "STRING",
DefaultValue: "rest",
Importance: model.ConfigDefinitionImportanceMedium,
Required: false,
DisplayName: "Iceberg catalog type",
Documentation: "The Iceberg catalog type",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.type",
Value: "rest",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
Metadata: model.ConfigDefinitionMetadata{
ComponentType: model.ComponentRadioGroup,
RecommendedValues: []model.RecommendedValueWithMetadata{
{Value: "rest", DisplayName: "REST"},
{Value: "hive", DisplayName: "HIVE"},
{Value: "hadoop", DisplayName: "HADOOP"},
},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.uri",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: true,
DisplayName: "Iceberg catalog uri",
Documentation: "Iceberg catalog uri",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.uri",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.s3.secret-access-key",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: true,
DisplayName: "iceberg.catalog.s3.secret-access-key",
Documentation: "iceberg.catalog.s3.secret-access-key",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.s3.secret-access-key",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},

model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.s3.access-key-id",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: true,
DisplayName: "iceberg.catalog.s3.access-key-id",
Documentation: "iceberg.catalog.s3.access-key-id",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.s3.access-key-id",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},

model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.s3.endpoint",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: true,
DisplayName: "iceberg.catalog.s3.endpoint",
Documentation: "iceberg.catalog.s3.endpoint",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.s3.endpoint",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.s3.path-style-access",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: true,
DisplayName: "iceberg.catalog.s3.path-style-access",
Documentation: "iceberg.catalog.s3.path-style-access",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.s3.path-style-access",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.client.region",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: true,
DisplayName: "iceberg.catalog.client.region",
Documentation: "iceberg.catalog.client.region",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.client.region",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.credential",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: true,
DisplayName: "Iceberg catalog credential",
Documentation: "Iceberg catalog credential",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.credential",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
model.ConfigDefinition{
Definition: model.ConfigDefinitionKey{
Name: "iceberg.catalog.warehouse",
Type: "STRING",
DefaultValue: "",
Importance: model.ConfigDefinitionImportanceHigh,
Required: true,
DisplayName: "Iceberg catalog warehouse",
Documentation: "Iceberg catalog warehouse",
Dependents: []string{},
},
Value: model.ConfigDefinitionValue{
Name: "iceberg.catalog.warehouse",
Value: "",
RecommendedValues: []string{},
Visible: true,
Errors: []string{},
},
},
)

return KafkaConnectToConsoleJSONSchemaHook(response, config)
}
2 changes: 2 additions & 0 deletions backend/pkg/connector/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func CommunityPatches() []patch.ConfigPatch {
patch.NewConfigPatchJdbcSink(),
patch.NewConfigPatchJdbcSource(),
patch.NewConfigPatchHTTPSource(),
patch.NewConfigPatchIcebergSink(),
patch.NewConfigPatchMirrorSource(),
patch.NewConfigPatchMirrorHeartbeat(),
patch.NewConfigPatchMongoDB(),
Expand Down Expand Up @@ -88,6 +89,7 @@ func CommunityGuides(opts ...guide.Option) []guide.Guide {
guide.NewJdbcSourceGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleTopicCreationHook)),
guide.NewHTTPSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectHTTPSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleHTTPSourceHook)),
guide.NewIcebergSinkGuide(guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleIcebergSinkHook)),
guide.NewMirrorSourceGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook),
guide.WithKafkaConnectToConsoleHookFn(KafkaConnectToConsoleMirrorSourceHook)),
guide.NewMirrorCheckpointGuide(guide.WithConsoleToKafkaConnectHookFn(ConsoleToKafkaConnectMirrorSourceHook),
Expand Down
6 changes: 4 additions & 2 deletions backend/pkg/connector/patch/iceberg_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ func (*ConfigPatchIcebergSink) PatchDefinition(d model.ConfigDefinition, _ strin
// Misc patches
switch d.Definition.Name {
case keyConverter:
d.SetImportance(model.ConfigDefinitionImportanceHigh)
d.SetImportance(model.ConfigDefinitionImportanceHigh).
ClearRecommendedValuesWithMetadata().
AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON").
SetDefaultValue("org.apache.kafka.connect.json.JsonConverter")
case valueConverter:
d.ClearRecommendedValuesWithMetadata().
AddRecommendedValueWithMetadata("io.confluent.connect.avro.AvroConverter", "AVRO").
AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON").
SetDefaultValue("org.apache.kafka.connect.json.JsonConverter")
case name:
Expand Down

0 comments on commit 81bb903

Please sign in to comment.