From 27ae62072bd393c302a1491f5db46d4988cbfe8b Mon Sep 17 00:00:00 2001 From: Martin Bochenek Date: Tue, 15 Oct 2024 00:21:18 +0200 Subject: [PATCH] connectors: SQL Server connector guide enhancements --- .../guide/debezium_sqlserver_source.go | 2 ++ .../debezium_sqlserver_source_hook.go | 18 ++++++++++++++++++ .../connector/patch/debezium_mysql_source.go | 9 +++++++-- .../patch/debezium_sqlserver_source.go | 7 +++++++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/backend/pkg/connector/guide/debezium_sqlserver_source.go b/backend/pkg/connector/guide/debezium_sqlserver_source.go index d745d598f..41188c537 100644 --- a/backend/pkg/connector/guide/debezium_sqlserver_source.go +++ b/backend/pkg/connector/guide/debezium_sqlserver_source.go @@ -47,6 +47,7 @@ func NewDebeziumSQLServerGuide(opts ...Option) Guide { "database.instance", "database.names", "database.query.timeout.ms", + "database.encrypt", }, }, }, @@ -100,6 +101,7 @@ func NewDebeziumSQLServerGuide(opts ...Option) Guide { "snapshot.mode.configuration.based.start.stream", "snapshot.mode.configuration.based.snapshot.on.schema.error", "snapshot.mode.configuration.based.snapshot.on.data.error", + "snapshot.fetch.size", "incremental.snapshot.watermarking.strategy", "internal.log.position.check.enable", "decimal.handling.mode", diff --git a/backend/pkg/connector/interceptor/debezium_sqlserver_source_hook.go b/backend/pkg/connector/interceptor/debezium_sqlserver_source_hook.go index 224729d08..4125d4f50 100644 --- a/backend/pkg/connector/interceptor/debezium_sqlserver_source_hook.go +++ b/backend/pkg/connector/interceptor/debezium_sqlserver_source_hook.go @@ -42,6 +42,24 @@ func KafkaConnectToConsoleDebeziumSQLServerSourceHook(response model.ValidationR Visible: true, Errors: []string{}, }, + }, + model.ConfigDefinition{ + Definition: model.ConfigDefinitionKey{ + Name: "database.encrypt", + Type: "BOOLEAN", + DefaultValue: "true", + Importance: model.ConfigDefinitionImportanceMedium, + Required: false, + DisplayName: "Database encryption", + Documentation: "By default, JDBC connections to Microsoft SQL Server are protected by SSL encryption. If SSL is not enabled for a SQL Server database, or if you want to connect to the database without using SSL, you can disable SSL", + }, + Value: model.ConfigDefinitionValue{ + Name: "database.encrypt", + Value: "true", + RecommendedValues: []string{"true", "false"}, + Visible: true, + Errors: []string{}, + }, }) return KafkaConnectToConsoleTopicCreationHook(KafkaConnectToConsoleJSONSchemaHook(response, config), config) diff --git a/backend/pkg/connector/patch/debezium_mysql_source.go b/backend/pkg/connector/patch/debezium_mysql_source.go index 0790f9720..f73bdc440 100644 --- a/backend/pkg/connector/patch/debezium_mysql_source.go +++ b/backend/pkg/connector/patch/debezium_mysql_source.go @@ -24,6 +24,11 @@ type ConfigPatchDebeziumMysqlSource struct { ConnectorClassSelector IncludeExcludeSelector } +const ( + tableIgnoreBuiltin = "table.ignore.builtin" + tombstonesOnDelete = "tombstones.on.delete" +) + var _ ConfigPatch = (*ConfigPatchDebeziumMysqlSource)(nil) // NewConfigPatchDebeziumMysqlSource returns a new Patch for the Debezium Mysql source connectors. @@ -63,9 +68,9 @@ func (*ConfigPatchDebeziumMysqlSource) PatchDefinition(d model.ConfigDefinition, d.SetDocumentation("A comma-separated list of regular expressions matching fully-qualified names of columns to exclude from change events") case "database.allowPublicKeyRetrieval", "include.schema.changes", - "tombstones.on.delete", + tombstonesOnDelete, "enable.time.adjuster", - "table.ignore.builtin", + tableIgnoreBuiltin, "gtid.source.filter.dml.events": d.SetDefaultValue("true") case "database.ssl.mode": diff --git a/backend/pkg/connector/patch/debezium_sqlserver_source.go b/backend/pkg/connector/patch/debezium_sqlserver_source.go index b82b70d62..0b913c8c1 100644 --- a/backend/pkg/connector/patch/debezium_sqlserver_source.go +++ b/backend/pkg/connector/patch/debezium_sqlserver_source.go @@ -65,6 +65,13 @@ func (*ConfigPatchDebeziumSQLServerSource) PatchDefinition(d model.ConfigDefinit AddRecommendedValueWithMetadata("io.confluent.connect.avro.AvroConverter", "AVRO"). AddRecommendedValueWithMetadata("org.apache.kafka.connect.json.JsonConverter", "JSON"). AddRecommendedValueWithMetadata("io.debezium.converters.CloudEventsConverter", "CloudEvents") + case "transaction.boundary.interval.ms": + d.SetDefaultValue("1000") + case "snapshot.fetch.size": + d.SetDefaultValue("2000") + case "tombstones.on.delete", + "table.ignore.builtin": + d.SetDefaultValue("true") } // Importance Patches