From 884125781ee62e66ba1c6d158bd9b9872109c620 Mon Sep 17 00:00:00 2001 From: Debezium Builder Date: Sun, 21 Jan 2024 10:22:47 +0000 Subject: [PATCH 01/18] [release] Development version for testing module deps --- debezium-testing/debezium-testing-system/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-testing/debezium-testing-system/pom.xml b/debezium-testing/debezium-testing-system/pom.xml index 5cf8fb52f06..8d17e668211 100644 --- a/debezium-testing/debezium-testing-system/pom.xml +++ b/debezium-testing/debezium-testing-system/pom.xml @@ -126,7 +126,7 @@ ORCLPDB1 - 2.6.0-SNAPSHOT + ${project.version} http://debezium-artifact-server.${ocp.project.debezium}.svc.cluster.local:8080 From 151b7bc8aa77b336523c97092895aa7f420c3380 Mon Sep 17 00:00:00 2001 From: Jordan Pittier Date: Thu, 18 Jan 2024 10:51:26 +0100 Subject: [PATCH 02/18] [docs] incremental snapshot: blocking snapshot are also supported PR #4721 documented blocking snapshot added in DBZ-6566. But it missed a few spots. Also, for consistency use "snapshot type" instead of "snapshot kind". --- ...roc-stopping-an-incremental-snapshot-nosql.adoc | 2 +- .../proc-stopping-an-incremental-snapshot-sql.adoc | 2 +- .../proc-stopping-an-incremental-snapshot.adoc | 4 ++-- ...c-triggering-an-incremental-snapshot-kafka.adoc | 2 +- ...c-triggering-an-incremental-snapshot-nosql.adoc | 10 +++++----- ...roc-triggering-an-incremental-snapshot-sql.adoc | 10 +++++----- .../proc-triggering-an-incremental-snapshot.adoc | 14 +++++++------- 7 files changed, 22 insertions(+), 22 deletions(-) diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc index 4280db5cf80..231156fc3d3 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc @@ -63,7 +63,7 @@ If this component of the `data` field is omitted, the signal stops the entire in |5 |`incremental` -|A required component of the `data` field of a signal that specifies the kind of snapshot operation that is to be stopped. + +|A required component of the `data` field of a signal that specifies the type of snapshot operation that is to be stopped. + Currently, the only valid option is `incremental`. + If you do not specify a `type` value, the signal fails to stop the incremental snapshot. |=== diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-sql.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-sql.adoc index 8a5f03425eb..859badf5213 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-sql.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-sql.adoc @@ -62,7 +62,7 @@ If this component of the `data` field is omitted, the signal stops the entire in |5 |`incremental` -|A required component of the `data` field of a signal that specifies the kind of snapshot operation that is to be stopped. + +|A required component of the `data` field of a signal that specifies the type of snapshot operation that is to be stopped. + Currently, the only valid option is `incremental`. + If you do not specify a `type` value, the signal fails to stop the incremental snapshot. |=== diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot.adoc index 8139a7b3152..22af5054893 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot.adoc @@ -90,7 +90,7 @@ If this component of the `data` field is omitted, the signal stops the entire in |5 |`incremental` -|A required component of the `data` field of a signal that specifies the kind of snapshot operation that is to be stopped. + +|A required component of the `data` field of a signal that specifies the type of snapshot operation that is to be stopped. + Currently, the only valid option is `incremental`. + If you do not specify a `type` value, the signal fails to stop the incremental snapshot. |=== @@ -125,7 +125,7 @@ If this component of the `data` field is omitted, the signal stops the entire in |5 |`incremental` -|A required component of the `data` field of a signal that specifies the kind of snapshot operation that is to be stopped. + +|A required component of the `data` field of a signal that specifies the type of snapshot operation that is to be stopped. + Currently, the only valid option is `incremental`. + If you do not specify a `type` value, the signal fails to stop the incremental snapshot. |=== diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc index 45a02a66d1a..2fe3b7bd027 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc @@ -14,7 +14,7 @@ The signal type is `execute-snapshot`, and the `data` field must have the follow |`type` |`incremental` | The type of the snapshot to be executed. -Currently {prodname} supports only the `incremental` type. + +Currently {prodname} supports the `incremental` and `blocking` types. + See the next section for more details. |`data-collections` diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc index c1de9c15212..6536faf880a 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc @@ -4,8 +4,8 @@ You submit a signal to the signaling {data-collection} by using the MongoDB `ins After {prodname} detects the change in the signaling {data-collection}, it reads the signal, and runs the requested snapshot operation. -The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the kind of snapshot operation. -Currently, the only valid option for snapshots operations is the default value, `incremental`. +The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the type of snapshot operation. +Currently, the only valid options for snapshots operations are `incremental`and `blocking`. To specify the {data-collection}s to include in the snapshot, provide a `data-collections` array that lists the {data-collection}s or an array of regular expressions used to match {data-collection}s, for example, + `{"data-collections": ["public.Collection1", "public.Collection2"]}` + @@ -80,8 +80,8 @@ The array lists regular expressions which match {data-collection}s by their full |5 |`incremental` -|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. + -Currently, the only valid option is the default value, `incremental`. + +|An optional `type` component of the `data` field of a signal that specifies the type of snapshot operation to run. + +Currently supports the `incremental` and `blocking` types. + If you do not specify a value, the connector runs an incremental snapshot. |=== @@ -111,7 +111,7 @@ The following example, shows the JSON for an incremental snapshot event that is |1 |`snapshot` |Specifies the type of snapshot operation to run. + -Currently, the only valid option is the default value, `incremental`. + +Currently, the only valid options are `blocking` and `incremental`. + Specifying a `type` value in the SQL query that you submit to the signaling {data-collection} is optional. + If you do not specify a value, the connector runs an incremental snapshot. diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-sql.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-sql.adoc index 9e1c81271bc..d43eb9ed198 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-sql.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-sql.adoc @@ -4,8 +4,8 @@ You submit a signal to the signaling {data-collection} as SQL `INSERT` queries. After {prodname} detects the change in the signaling {data-collection}, it reads the signal, and runs the requested snapshot operation. -The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the kind of snapshot operation. -Currently, the only valid option for snapshots operations is the default value, `incremental`. +The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the type of snapshot operation. +Currently supports the `incremental` and `blocking` types. To specify the {data-collection}s to include in the snapshot, provide a `data-collections` array that lists the {data-collection}s or an array of regular expressions used to match {data-collection}s, for example, + @@ -79,8 +79,8 @@ The array lists regular expressions which match {data-collection}s by their full |5 |`incremental` -|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. + -Currently, the only valid option is the default value, `incremental`. + +|An optional `type` component of the `data` field of a signal that specifies the type of snapshot operation to run. + +Currently supports the `incremental` and `blocking` types. + If you do not specify a value, the connector runs an incremental snapshot. |6 @@ -164,7 +164,7 @@ The following example, shows the JSON for an incremental snapshot event that is |1 |`snapshot` |Specifies the type of snapshot operation to run. + -Currently, the only valid option is the default value, `incremental`. + +Currently, the only valid options are `blocking` and `incremental`. + Specifying a `type` value in the SQL query that you submit to the signaling {data-collection} is optional. + If you do not specify a value, the connector runs an incremental snapshot. diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot.adoc index ac3dc1e8726..7d062965b30 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot.adoc @@ -8,8 +8,8 @@ end::nosql-based-snapshot[] After {prodname} detects the change in the signaling {data-collection}, it reads the signal, and runs the requested snapshot operation. -The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the kind of snapshot operation. -Currently, the only valid option for snapshots operations is the default value, `incremental`. +The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the type of snapshot operation. +Currently, the only valid options for snapshots operations are `incremental` and `blocking`. To specify the {data-collection}s to include in the snapshot, provide a `data-collections` array that lists the {data-collection}s or an array of regular expressions used to match {data-collection}s, for example, + tag::sql-based-snapshot[] @@ -120,8 +120,8 @@ The array lists regular expressions which match {data-collection}s by their full |5 |`incremental` -|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. + -Currently, the only valid option is the default value, `incremental`. + +|An optional `type` component of the `data` field of a signal that specifies the type of snapshot operation to run. + +Currently supports the `incremental` and `blocking` types. + If you do not specify a value, the connector runs an incremental snapshot. |6 @@ -214,8 +214,8 @@ Rather, during the snapshot, {prodname} generates its own `id` string as a water The array lists regular expressions which match {data-collection}s by their fully-qualified names, using the same format as you use to specify the name of the connector's signaling {data-collection} in the xref:{context}-property-signal-data-collection[`signal.data.collection`] configuration property. |`incremental` -|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. + -Currently, the only valid option is the default value, `incremental`. + +|An optional `type` component of the `data` field of a signal that specifies the type of snapshot operation to run. + +Currently supports the `incremental` and `blocking` types. + If you do not specify a value, the connector runs an incremental snapshot. |=== @@ -247,7 +247,7 @@ The following example, shows the JSON for an incremental snapshot event that is |1 |`snapshot` |Specifies the type of snapshot operation to run. + -Currently, the only valid option is the default value, `incremental`. + +Currently, the only valid options are `blocking` and `incremental`. + Specifying a `type` value in the SQL query that you submit to the signaling {data-collection} is optional. + If you do not specify a value, the connector runs an incremental snapshot. From 738f2f87885b0bacff2a60dbaafeb767ce47ad05 Mon Sep 17 00:00:00 2001 From: V K Date: Fri, 19 Jan 2024 01:09:37 +0100 Subject: [PATCH 03/18] DBZ-7366 use withMaskedPasswords() --- .../main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java index 36566e1d79c..0a21ba7cebf 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java @@ -45,7 +45,7 @@ public class JdbcCommonConfig { public JdbcCommonConfig(Configuration config, String prefix) { config = config.subset(prefix, true); - LOGGER.info("Configuration for '{}' with prefix '{}': {}", getClass().getSimpleName(), prefix, config.asMap()); + LOGGER.info("Configuration for '{}' with prefix '{}': {}", getClass().getSimpleName(), prefix, config.withMaskedPasswords().asMap()); if (!config.validateAndRecord(getAllConfigurationFields(), error -> LOGGER.error("Validation error for property with prefix '{}': {}", prefix, error))) { throw new DebeziumException( String.format("Error configuring an instance of '%s' with prefix '%s'; check the logs for errors", getClass().getSimpleName(), prefix)); From 17d3df62c02a3b463aa59175aa97c4712c8b35b7 Mon Sep 17 00:00:00 2001 From: ani-sha Date: Tue, 9 Jan 2024 14:03:10 +0530 Subject: [PATCH 04/18] DBZ-7022 Allow conversion of source block timestamp fields --- .../transforms/TimezoneConverter.java | 41 +++++++++++++--- .../transforms/TimezoneConverterTest.java | 47 ++++++++++++++++++- 2 files changed, 80 insertions(+), 8 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java index 306726944ad..86f40d2d8cf 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java @@ -94,6 +94,9 @@ public class TimezoneConverter> implements Transforma private List excludeList; private static final String SOURCE = "source"; private static final String TOPIC = "topic"; + private static final String FIELD_SOURCE_PREFIX = "source"; + private static final String FIELD_BEFORE_PREFIX = "before"; + private static final String FIELD_AFTER_PREFIX = "after"; private static final Pattern TIMEZONE_OFFSET_PATTERN = Pattern.compile("^[+-]\\d{2}:\\d{2}(:\\d{2})?$"); private static final Pattern LIST_PATTERN = Pattern.compile("^\\[(source|topic|[\".\\w\\s_]+):([\".\\w\\s_]+(?::[\".\\w\\s_]+)?(?:,|]$))+$"); private final Map> topicFieldsMap = new HashMap<>(); @@ -128,8 +131,6 @@ public R apply(R record) { } Struct value = (Struct) record.value(); - Schema schema = value.schema(); - String table = getTableFromSource(value); String topic = record.topic(); @@ -307,12 +308,38 @@ private void handleStructs(Struct value, Type type, String matchName, Set beforeFields = new HashSet<>(); + Set afterFields = new HashSet<>(); + Set sourceFields = new HashSet<>(); + + if (!fields.isEmpty() && !fields.contains(null)) { + for (String field : fields) { + if (field.startsWith(FIELD_SOURCE_PREFIX)) { + sourceFields.add(field.substring(FIELD_SOURCE_PREFIX.length() + 1)); + } + else if (field.startsWith(FIELD_BEFORE_PREFIX)) { + beforeFields.add(field.substring(FIELD_BEFORE_PREFIX.length() + 1)); + } + else if (field.startsWith(FIELD_AFTER_PREFIX)) { + afterFields.add(field.substring(FIELD_AFTER_PREFIX.length() + 1)); + } + else { + beforeFields.add(field); + afterFields.add(field); + } + } + } if (before != null) { - handleValueForFields(before, type, fields); + handleValueForFields(before, type, beforeFields); } if (after != null) { - handleValueForFields(after, type, fields); + handleValueForFields(after, type, afterFields); + } + if (source != null && !sourceFields.isEmpty()) { + handleValueForFields(source, type, sourceFields); } } @@ -480,7 +507,7 @@ private void handleInclude(Struct value, String table, String topic) { } } else { - handleStructs(value, Type.ALL, table, Set.of("")); + handleStructs(value, Type.ALL, table, Collections.emptySet()); } } @@ -490,7 +517,7 @@ private void handleExclude(Struct value, String table, String topic) { Set fields = matchFieldsResult.getFields(); if (matchName == null) { - handleStructs(value, Type.ALL, table != null ? table : topic, Set.of("")); + handleStructs(value, Type.ALL, table != null ? table : topic, Collections.emptySet()); } else if (!fields.contains(null)) { handleStructs(value, Type.EXCLUDE, matchName, fields); @@ -499,7 +526,7 @@ else if (!fields.contains(null)) { private void handleAllRecords(Struct value, String table, String topic) { if (!topicFieldsMap.containsKey(topic) && !tableFieldsMap.containsKey(table) && !noPrefixFieldsMap.containsKey(table)) { - handleStructs(value, Type.ALL, table != null ? table : topic, Set.of("")); + handleStructs(value, Type.ALL, table != null ? table : topic, Collections.emptySet()); } } } diff --git a/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java b/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java index eb6371cce08..77fed74696e 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java @@ -201,7 +201,7 @@ public void testKafkaConnectTimestamp() { public void testIncludeListWithTablePrefix() { final Map props = new HashMap<>(); props.put("converted.timezone", "Atlantic/Azores"); - props.put("include.list", "source:customers:order_date_zoned_timestamp"); + props.put("include.list", "source:customers:after.order_date_zoned_timestamp"); converter.configure(props); final Struct before = new Struct(recordSchema); @@ -810,6 +810,51 @@ public void testUnsupportedLogicalTypes() { VerifyRecord.isValid(record); converter.apply(record); assertThat(logInterceptor.containsMessage("Skipping conversion for unsupported logical type: io.debezium.time.Date for field: order_date")).isTrue(); + } + + @Test + public void testSourceBlockTimestamp() { + Map props = new HashMap<>(); + props.put("converted.timezone", "Europe/Moscow"); + props.put("include.list", "source:customers:source.ts_ms"); + + converter.configure(props); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "Amy Rose"); + before.put("order_date_zoned_time", "11:15:30.123456789+00:00"); + + source.put("table", "customers"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + VerifyRecord.isValid(record); + final SourceRecord transformedRecord = converter.apply(record); + VerifyRecord.isValid(transformedRecord); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedSource = transformedValue.getStruct(Envelope.FieldName.SOURCE); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedSource.get("ts_ms")).isEqualTo(123456789); + assertThat(transformedAfter.get("order_date_zoned_time")).isEqualTo("11:15:30.123456789+00:00"); } } From 47765a0b5490d9859d00c8f53ed1d50df4a67329 Mon Sep 17 00:00:00 2001 From: ani-sha Date: Wed, 10 Jan 2024 11:43:07 +0530 Subject: [PATCH 05/18] DBZ-7022 Add documentation for metadata conversion in TZ SMT --- .../transformations/timezone-converter.adoc | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/documentation/modules/ROOT/pages/transformations/timezone-converter.adoc b/documentation/modules/ROOT/pages/transformations/timezone-converter.adoc index 67e39ee3818..44eba748566 100644 --- a/documentation/modules/ROOT/pages/transformations/timezone-converter.adoc +++ b/documentation/modules/ROOT/pages/transformations/timezone-converter.adoc @@ -34,6 +34,13 @@ Providing a fixed UTC offset is useful when converting timestamp fields to a spe The `include.list` and `exclude.list` configuration options are mutually exclusive. You must specify only one of the options. ==== +The SMT also allows conversion of event metadata fields in the source information block, such as `ts_ms` to the target timezone. In order to convert the metadata fields, you must include the `source` prefix in the `fieldname` of the `include.list` or `exclude.list` configuration option. + +[NOTE] +==== +If the schema for timestamp fields in the source information block, like `ts_ms`, is currently set to `INT64`, which is not a timestamp type, future releases aim to support the conversion of such fields by introducing compatibility for a timestamp schema. +==== + [[timezone-converter-usage]] [[basic-example-timezone-converter]] @@ -179,14 +186,14 @@ Specify rules by using one of the following formats: The SMT converts all time-based fields in the matched table. `source::` :: Matches {prodname} change events with source information blocks that have the specified table name. -The SMT converts only fields in the specified table that have the specified field name. +The SMT converts only fields in the specified table that have the specified field name. `fieldname` can be prefixed with `before`, `after`, or `source` to include the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are converted. `topic:` :: Matches events from the specified topic name, converting all time-based fields in the event record. -`topic::` :: Matches events from the specified topic name, and converts values for the specified fields only. +`topic::` :: Matches events from the specified topic name, and converts values for the specified fields only. `fieldname` can be prefixed with `before`, `after`, or `source` to include the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are converted. `:` :: Applies a heuristic matching algorithm to match against the table name of the source information block, if present; otherwise, matches against the topic name. -The SMT converts values for the specified field name only. +The SMT converts values for the specified field name only. `fieldname` can be prefixed with `before`, `after`, or `source` to include the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are converted. |list |medium |[[timezone-converter-exclude-list]]<> @@ -197,14 +204,14 @@ Specify rules by using one of the following formats: The SMT excludes all time-based fields in the matched table from conversion. `source::` :: Matches {prodname} change events with source information blocks that have the specified table name. -The SMT excludes from conversion fields in the specified table that match the specified field name. +The SMT excludes from conversion fields in the specified table that match the specified field name. `fieldname` can be prefixed with `before`, `after`, or `source` to exclude the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are excluded from conversion. `topic:` :: Matches events from the specified topic name, and excludes from conversion all time-based fields in the topic. -`topic::` :: Matches events from the specified topic name, and excludes from conversion any fields in the topic that have the specified name. +`topic::` :: Matches events from the specified topic name, and excludes from conversion any fields in the topic that have the specified name. `fieldname` can be prefixed with `before`, `after`, or `source` to exclude the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are excluded from conversion. `:` :: Applies a heuristic matching algorithm to match against the table name of the source information block, if present; otherwise, matches against the topic name. -The SMT excludes from conversion only fields that have the specified name. +The SMT excludes from conversion only fields that have the specified name. `fieldname` can be prefixed with `before`, `after`, or `source` to exclude the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are excluded from conversion. |list |medium |=== From 78a54a3ff756268a56dcc5d48d193cc43e5b3ff5 Mon Sep 17 00:00:00 2001 From: ani-sha Date: Wed, 17 Jan 2024 09:59:20 +0530 Subject: [PATCH 06/18] DBZ-7022 Adjust prefix names, simplify null check --- .../transforms/TimezoneConverter.java | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java index 86f40d2d8cf..968344abfb3 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java @@ -94,9 +94,9 @@ public class TimezoneConverter> implements Transforma private List excludeList; private static final String SOURCE = "source"; private static final String TOPIC = "topic"; - private static final String FIELD_SOURCE_PREFIX = "source"; - private static final String FIELD_BEFORE_PREFIX = "before"; - private static final String FIELD_AFTER_PREFIX = "after"; + private static final String FIELD_SOURCE_PREFIX = "source."; + private static final String FIELD_BEFORE_PREFIX = "before."; + private static final String FIELD_AFTER_PREFIX = "after."; private static final Pattern TIMEZONE_OFFSET_PATTERN = Pattern.compile("^[+-]\\d{2}:\\d{2}(:\\d{2})?$"); private static final Pattern LIST_PATTERN = Pattern.compile("^\\[(source|topic|[\".\\w\\s_]+):([\".\\w\\s_]+(?::[\".\\w\\s_]+)?(?:,|]$))+$"); private final Map> topicFieldsMap = new HashMap<>(); @@ -191,19 +191,25 @@ private void collectTablesAndTopics(List list) { if (!topicFieldsMap.containsKey(matchName)) { topicFieldsMap.put(matchName, new HashSet<>()); } - topicFieldsMap.get(matchName).add(field); + if (field != null) { + topicFieldsMap.get(matchName).add(field); + } } else if (Objects.equals(commonPrefix, SOURCE)) { if (!tableFieldsMap.containsKey(matchName)) { tableFieldsMap.put(matchName, new HashSet<>()); } - tableFieldsMap.get(matchName).add(field); + if (field != null) { + tableFieldsMap.get(matchName).add(field); + } } else { if (!noPrefixFieldsMap.containsKey(matchName)) { noPrefixFieldsMap.put(matchName, new HashSet<>()); } - noPrefixFieldsMap.get(matchName).add(field); + if (field != null) { + noPrefixFieldsMap.get(matchName).add(field); + } } } } @@ -314,16 +320,16 @@ private void handleStructs(Struct value, Type type, String matchName, Set afterFields = new HashSet<>(); Set sourceFields = new HashSet<>(); - if (!fields.isEmpty() && !fields.contains(null)) { + if (!fields.isEmpty()) { for (String field : fields) { if (field.startsWith(FIELD_SOURCE_PREFIX)) { - sourceFields.add(field.substring(FIELD_SOURCE_PREFIX.length() + 1)); + sourceFields.add(field.substring(FIELD_SOURCE_PREFIX.length())); } else if (field.startsWith(FIELD_BEFORE_PREFIX)) { - beforeFields.add(field.substring(FIELD_BEFORE_PREFIX.length() + 1)); + beforeFields.add(field.substring(FIELD_BEFORE_PREFIX.length())); } else if (field.startsWith(FIELD_AFTER_PREFIX)) { - afterFields.add(field.substring(FIELD_AFTER_PREFIX.length() + 1)); + afterFields.add(field.substring(FIELD_AFTER_PREFIX.length())); } else { beforeFields.add(field); @@ -499,7 +505,7 @@ private void handleInclude(Struct value, String table, String topic) { Set fields = matchFieldsResult.getFields(); if (matchName != null) { - if (!fields.contains(null)) { + if (!fields.isEmpty()) { handleStructs(value, Type.INCLUDE, matchName, fields); } else { @@ -519,7 +525,7 @@ private void handleExclude(Struct value, String table, String topic) { if (matchName == null) { handleStructs(value, Type.ALL, table != null ? table : topic, Collections.emptySet()); } - else if (!fields.contains(null)) { + else if (!fields.isEmpty()) { handleStructs(value, Type.EXCLUDE, matchName, fields); } } From 5be134eb422f96c14261e6da8a10b75dfa74da12 Mon Sep 17 00:00:00 2001 From: ani-sha Date: Wed, 17 Jan 2024 12:48:01 +0530 Subject: [PATCH 07/18] DBZ-7022 Use prefix names from Envelope class --- .../java/io/debezium/transforms/TimezoneConverter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java index 968344abfb3..d9e4fab8d9e 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java @@ -92,11 +92,11 @@ public class TimezoneConverter> implements Transforma private String convertedTimezone; private List includeList; private List excludeList; - private static final String SOURCE = "source"; + private static final String SOURCE = Envelope.FieldName.SOURCE; private static final String TOPIC = "topic"; - private static final String FIELD_SOURCE_PREFIX = "source."; - private static final String FIELD_BEFORE_PREFIX = "before."; - private static final String FIELD_AFTER_PREFIX = "after."; + private static final String FIELD_SOURCE_PREFIX = Envelope.FieldName.SOURCE + "."; + private static final String FIELD_BEFORE_PREFIX = Envelope.FieldName.BEFORE + "."; + private static final String FIELD_AFTER_PREFIX = Envelope.FieldName.AFTER + "."; private static final Pattern TIMEZONE_OFFSET_PATTERN = Pattern.compile("^[+-]\\d{2}:\\d{2}(:\\d{2})?$"); private static final Pattern LIST_PATTERN = Pattern.compile("^\\[(source|topic|[\".\\w\\s_]+):([\".\\w\\s_]+(?::[\".\\w\\s_]+)?(?:,|]$))+$"); private final Map> topicFieldsMap = new HashMap<>(); From 218da532b59499d00aea42ed072973e0d3e8859e Mon Sep 17 00:00:00 2001 From: Bob Roldan Date: Mon, 22 Jan 2024 17:25:44 -0500 Subject: [PATCH 08/18] DBZ-7314 Update `column.include.list` note and property description --- .../modules/ROOT/pages/connectors/sqlserver.adoc | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc index e5144b65120..a8837f80c27 100644 --- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc +++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc @@ -916,7 +916,10 @@ a|Name of the schema that defines the structure of the key's payload. This schem ifdef::community[] [NOTE] ==== -Although the `column.exclude.list` and `column.include.list` connector configuration properties allow you to capture only a subset of table columns, all columns in a primary or unique key are always included in the event's key. +When {prodname} emits a change event record, it sets the message key for each record to the name of the primary key or unique key column of the source table. +{prodname} must be able to read these columns to function properly. +If you set the xref:sqlserver-property-column-include-list[`column.include.list`] or xref:sqlserver-property-column-exclude-list[`column.exclude.list`] properties in the connector configuration, +be sure that your settings permit the connector to capture the required primary key or unique key columns. ==== [WARNING] @@ -2519,9 +2522,13 @@ If you include this property in the configuration, do not also set the `table.in |[[sqlserver-property-column-include-list]]<> |_empty string_ |An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in the change event message values. -Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_. -Note that primary key columns are always included in the event's key, even if not included in the value. -For now, table primary key has to be always explicitly included in the list of captured columns.+ +Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_. + + +[NOTE] +==== +Each change event record that {prodname} emits for a table includes an event key that contains fields for each column in the table's primary key or unique key. +To ensure that event keys are generated correctly, if you set this property, be sure to explicitly list the primary key columns of any captured tables. +==== To match the name of a column, {prodname} applies the regular expression that you specify as an _anchored_ regular expression. That is, the specified expression is matched against the entire name string of the column; it does not match substrings that might be present in a column name. + From 986303babcba1d370c5f7648a4c54f72b8cdf50d Mon Sep 17 00:00:00 2001 From: Animesh Kumar Date: Tue, 23 Jan 2024 14:57:34 +0530 Subject: [PATCH 09/18] DBZ-7376 Fix logging for schema only recovery in mysql --- .../connector/mysql/MySqlSnapshotChangeEventSource.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index b3e3241c820..0401de6e2af 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -99,7 +99,13 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse // found a previous offset and the earlier snapshot has completed if (previousOffset != null && !previousOffset.isSnapshotRunning()) { - LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted."); + if (databaseSchema.isStorageInitializationExecuted()) { + LOGGER.info( + "A previous offset indicating a completed snapshot has been found, schema will still be snapshotted since we are in schema_only_recovery mode."); + } + else { + LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted."); + } return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false); } From 26261c88de8f829553000650dae40329f4b16772 Mon Sep 17 00:00:00 2001 From: liaoyuxing <229737734@qq.com> Date: Tue, 23 Jan 2024 14:18:52 +0800 Subject: [PATCH 10/18] DBZ-7362 Support DECFLOAT in Db2 connector --- COPYRIGHT.txt | 1 + jenkins-jobs/scripts/config/Aliases.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index ece3aca6913..225a0b34c9e 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -579,3 +579,4 @@ Pavithrananda Prabhu حمود سمبول Peter Hamer Artem Shubovych +leoloel \ No newline at end of file diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index f1f5f6e4b92..d31fbeb58d3 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -249,3 +249,4 @@ Lourens Naude,Lourens Naudé overwatcheddude,حمود سمبول wukachn,Peter Hamer shybovycha,Artem Shubovych +Liaoyuxing,leoloel \ No newline at end of file From 0344b67cdfed3277942abda90f38af293e0fa7ae Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 22 Jan 2024 20:19:04 +0000 Subject: [PATCH 11/18] [ci] Bump tj-actions/changed-files from 41.1.1 to 42.0.0 Bumps [tj-actions/changed-files](https://github.com/tj-actions/changed-files) from 41.1.1 to 42.0.0. - [Release notes](https://github.com/tj-actions/changed-files/releases) - [Changelog](https://github.com/tj-actions/changed-files/blob/main/HISTORY.md) - [Commits](https://github.com/tj-actions/changed-files/compare/v41.1.1...v42.0.0) --- updated-dependencies: - dependency-name: tj-actions/changed-files dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/debezium-workflow.yml | 28 ++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/.github/workflows/debezium-workflow.yml b/.github/workflows/debezium-workflow.yml index 8278b02c870..bc5a3192dcd 100644 --- a/.github/workflows/debezium-workflow.yml +++ b/.github/workflows/debezium-workflow.yml @@ -44,7 +44,7 @@ jobs: - name: Get modified files (Common) id: changed-files-common - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | support/checkstyle/** @@ -62,49 +62,49 @@ jobs: - name: Get modified files (MongoDB) id: changed-files-mongodb - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-connector-mongodb/** - name: Get modified files (MySQL) id: changed-files-mysql - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-connector-mysql/** - name: Get modified files (PostgreSQL) id: changed-files-postgresql - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-connector-postgres/** - name: Get modified files (Oracle) id: changed-files-oracle - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-connector-oracle/** - name: Get modified files (SQL Server) id: changed-files-sqlserver - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-connector-sqlserver/** - name: Get modified files (Quarkus Outbox) id: changed-files-outbox - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-quarkus-outbox/** - name: Get modified files (REST Extension) id: changed-files-rest-extension - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-connect-rest-extension/** @@ -116,7 +116,7 @@ jobs: - name: Get modified files (Schema Generator) id: changed-files-schema-generator - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-schema-generator/** @@ -128,14 +128,14 @@ jobs: - name: Get modified files (Debezium Testing) id: changed-files-debezium-testing - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-testing/** - name: Get modified files (MySQL DDL parser) id: changed-files-mysql-ddl-parser - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/** @@ -144,7 +144,7 @@ jobs: - name: Get modified files (Oracle DDL parser) id: changed-files-oracle-ddl-parser - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/** @@ -154,14 +154,14 @@ jobs: - name: Get modified files (Documentation) id: changed-files-documentation - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | documentation/** - name: Get modified files (Storage) id: changed-files-storage - uses: tj-actions/changed-files@v41.1.1 + uses: tj-actions/changed-files@v42.0.0 with: files: | debezium-storage/** From 5a493bea5880b1a3f51504fa06dd7fc80d61fc96 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 22 Jan 2024 20:19:00 +0000 Subject: [PATCH 12/18] [ci] Bump actions/cache from 2 to 4 Bumps [actions/cache](https://github.com/actions/cache) from 2 to 4. - [Release notes](https://github.com/actions/cache/releases) - [Changelog](https://github.com/actions/cache/blob/main/RELEASES.md) - [Commits](https://github.com/actions/cache/compare/v2...v4) --- updated-dependencies: - dependency-name: actions/cache dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/debezium-workflow.yml | 40 +++++++++++----------- .github/workflows/oracle-workflow-test.yml | 4 +-- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/.github/workflows/debezium-workflow.yml b/.github/workflows/debezium-workflow.yml index bc5a3192dcd..12d11e7912d 100644 --- a/.github/workflows/debezium-workflow.yml +++ b/.github/workflows/debezium-workflow.yml @@ -183,7 +183,7 @@ jobs: - name: Cache Maven Repository id: maven-cache-check - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -231,7 +231,7 @@ jobs: - name: Cache Maven Repository id: maven-cache-check - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -272,7 +272,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -310,7 +310,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -347,7 +347,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -383,7 +383,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -423,7 +423,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -456,7 +456,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -492,7 +492,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -525,7 +525,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -558,7 +558,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -590,7 +590,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -622,7 +622,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -655,7 +655,7 @@ jobs: java-version: 17 - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }} @@ -717,7 +717,7 @@ jobs: # For this build, we do not care if there are or are not changes in the sibling repository since this # job will only ever fire if there are changes in the common paths identified in the files_changed job. - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }} @@ -784,7 +784,7 @@ jobs: # For this build, we do not care if there are or are not changes in the sibling repository since this # job will only ever fire if there are changes in the common paths identified in the files_changed job. - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }} @@ -841,7 +841,7 @@ jobs: # For this build, we do not care if there are or are not changes in the sibling repository since this # job will only ever fire if there are changes in the common paths identified in the files_changed job. - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }} @@ -898,7 +898,7 @@ jobs: # For this build, we do not care if there are or are not changes in the sibling repository since this # job will only ever fire if there are changes in the common paths identified in the files_changed job. - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }} @@ -955,7 +955,7 @@ jobs: # For this build, we do not care if there are or are not changes in the sibling repository since this # job will only ever fire if there are changes in the common paths identified in the files_changed job. - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }} @@ -1013,7 +1013,7 @@ jobs: # For this build, we do not care if there are or are not changes in the sibling repository since this # job will only ever fire if there are changes in the common paths identified in the files_changed job. - name: Cache Maven Repository - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.m2/repository key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }} diff --git a/.github/workflows/oracle-workflow-test.yml b/.github/workflows/oracle-workflow-test.yml index d1a0ea73c69..a3e1c6ad77e 100644 --- a/.github/workflows/oracle-workflow-test.yml +++ b/.github/workflows/oracle-workflow-test.yml @@ -82,7 +82,7 @@ jobs: # This workflow uses its own dependency cache rather than the main debezium workflow cache because # we explicitly want to trigger this build on pushes separate from the other workflow. - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository # refresh cache every month to avoid unlimited growth @@ -140,7 +140,7 @@ jobs: # This workflow uses its own dependency cache rather than the main debezium workflow cache because # we explicitly want to trigger this build on pushes separate from the other workflow. - name: Cache Maven Repository - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: ~/.m2/repository # refresh cache every month to avoid unlimited growth From f5e50b87044c01b62e3aa1423aa396ed1e895e72 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 22 Jan 2024 17:23:35 +0000 Subject: [PATCH 13/18] DBZ-7383 Bump com.jayway.jsonpath:json-path from 2.7.0 to 2.9.0 in /debezium-bom Bumps [com.jayway.jsonpath:json-path](https://github.com/jayway/JsonPath) from 2.7.0 to 2.9.0. - [Release notes](https://github.com/jayway/JsonPath/releases) - [Changelog](https://github.com/json-path/JsonPath/blob/master/changelog.md) - [Commits](https://github.com/jayway/JsonPath/compare/json-path-2.7.0...json-path-2.9.0) --- updated-dependencies: - dependency-name: com.jayway.jsonpath:json-path dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- debezium-bom/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml index ae0b5aac4af..b8d0ac29915 100644 --- a/debezium-bom/pom.xml +++ b/debezium-bom/pom.xml @@ -59,7 +59,7 @@ 3.0.0 4.0.1 1.19.1 - 2.7.0 + 2.9.0 1.5.0 0.9.12 3.5.0.1 From ee0ea0e18976d1440358ba2e696d3fc828595c5e Mon Sep 17 00:00:00 2001 From: Jiri Novotny Date: Mon, 22 Jan 2024 15:08:08 +0100 Subject: [PATCH 14/18] DBZ-7373 Zookeeper container not being deleted quick fix + added wait times --- .../system/tools/kafka/DockerKafkaController.java | 9 ++++++++- .../testing/system/tools/kafka/DockerKafkaDeployer.java | 4 +++- .../system/tools/kafka/docker/KafkaConnectConainer.java | 6 +++++- .../system/tools/kafka/docker/KafkaContainer.java | 3 +++ .../system/tools/kafka/docker/ZookeeperContainer.java | 4 ++++ 5 files changed, 23 insertions(+), 3 deletions(-) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java index ddd8b68b5cb..d345a9e052d 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; import io.debezium.testing.system.tools.kafka.docker.KafkaContainer; +import io.debezium.testing.system.tools.kafka.docker.ZookeeperContainer; /** * This class provides control over Kafka instance deployed as DockerContainer @@ -24,6 +25,7 @@ public class DockerKafkaController implements KafkaController { private static final Logger LOGGER = LoggerFactory.getLogger(DockerKafkaController.class); private final KafkaContainer container; + private ZookeeperContainer zookeeperContainer; public DockerKafkaController(KafkaContainer container) { this.container = container; @@ -38,6 +40,10 @@ public String getPublicBootstrapAddress() { return container.getPublicBootstrapAddress(); } + public void setZookeeperContainer(ZookeeperContainer zookeeperContainer) { + this.zookeeperContainer = zookeeperContainer; + } + @Override public String getBootstrapAddress() { return container.getBootstrapAddress(); @@ -51,7 +57,8 @@ public String getTlsBootstrapAddress() { @Override public boolean undeploy() { container.stop(); - return container.isRunning(); + zookeeperContainer.stop(); + return zookeeperContainer.isRunning(); } @Override diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java index d18fa180b57..0581a9b8770 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java @@ -34,7 +34,9 @@ public DockerKafkaController deploy() { container.withZookeeper(zookeeperContainer); Startables.deepStart(Stream.of(zookeeperContainer, container)).join(); - return getController(container); + DockerKafkaController controller = getController(container); + controller.setZookeeperContainer(zookeeperContainer); + return controller; } public static class Builder diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java index 05cc651b49d..2e27bfceae4 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java @@ -5,10 +5,13 @@ */ package io.debezium.testing.system.tools.kafka.docker; +import java.time.Duration; + import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.WaitConditions; public class KafkaConnectConainer extends GenericContainer { @@ -28,7 +31,7 @@ public KafkaConnectConainer() { } private void defaultConfig() { - withReuse(true); + withReuse(false); withExposedPorts(KAFKA_CONNECT_API_PORT, KAFKA_JMX_PORT); addEnv("CONFIG_STORAGE_TOPIC", "connect_config"); addEnv("OFFSET_STORAGE_TOPIC", "connect_offsets"); @@ -36,6 +39,7 @@ private void defaultConfig() { addEnv("JMXHOST", KAFKA_JMX_HOST); addEnv("JMXPORT", String.valueOf(KAFKA_JMX_PORT)); withHttpMetrics(); + withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1))); withCommand(KAFKA_CONNECT_COMMAND); } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java index 14c512214b8..3d00600957d 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java @@ -5,6 +5,7 @@ */ package io.debezium.testing.system.tools.kafka.docker; +import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import org.testcontainers.containers.GenericContainer; @@ -13,6 +14,7 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.WaitConditions; public class KafkaContainer extends GenericContainer { @@ -42,6 +44,7 @@ private void defaultConfig() { withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://" + getPublicBootstrapAddress() + ",BROKER://" + getBootstrapAddress()); withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1))); } public KafkaContainer withZookeeper(ZookeeperContainer zookeeper) { diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java index 564708f01be..b53971311ef 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java @@ -5,9 +5,12 @@ */ package io.debezium.testing.system.tools.kafka.docker; +import java.time.Duration; + import org.testcontainers.containers.GenericContainer; import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.WaitConditions; public class ZookeeperContainer extends GenericContainer { @@ -30,6 +33,7 @@ public String serverAddress() { private void defaultConfig() { withExposedPorts(ZOOKEEPER_PORT_CLIENT); withCommand(ZOOKEEPER_COMMAND); + withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1))); } } From 87ef90de2c840654b793b1af92be360dc7adbe6f Mon Sep 17 00:00:00 2001 From: Jiri Novotny Date: Wed, 24 Jan 2024 09:09:42 +0100 Subject: [PATCH 15/18] DBZ-7373 Change naming of variable containing Kafka container --- .../kafka/DockerKafkaConnectDeployer.java | 2 +- .../tools/kafka/DockerKafkaController.java | 26 ++++++++----------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectDeployer.java index a15940b33db..ac4d1c51014 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectDeployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectDeployer.java @@ -36,7 +36,7 @@ public Builder(KafkaConnectConainer container) { } public Builder withKafka(DockerKafkaController kafka) { - container.withKafka(kafka.getContainer()); + container.withKafka(kafka.getKafkaContainer()); return self(); } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java index d345a9e052d..b87af911aaf 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java @@ -9,6 +9,8 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static org.awaitility.Awaitility.await; +import lombok.Getter; +import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,29 +26,23 @@ public class DockerKafkaController implements KafkaController { private static final Logger LOGGER = LoggerFactory.getLogger(DockerKafkaController.class); - private final KafkaContainer container; + @Getter + private final KafkaContainer kafkaContainer; + @Setter private ZookeeperContainer zookeeperContainer; public DockerKafkaController(KafkaContainer container) { - this.container = container; - } - - public KafkaContainer getContainer() { - return container; + this.kafkaContainer = container; } @Override public String getPublicBootstrapAddress() { - return container.getPublicBootstrapAddress(); - } - - public void setZookeeperContainer(ZookeeperContainer zookeeperContainer) { - this.zookeeperContainer = zookeeperContainer; + return kafkaContainer.getPublicBootstrapAddress(); } @Override public String getBootstrapAddress() { - return container.getBootstrapAddress(); + return kafkaContainer.getBootstrapAddress(); } @Override @@ -56,15 +52,15 @@ public String getTlsBootstrapAddress() { @Override public boolean undeploy() { - container.stop(); + kafkaContainer.stop(); zookeeperContainer.stop(); - return zookeeperContainer.isRunning(); + return !zookeeperContainer.isRunning() && !kafkaContainer.isRunning(); } @Override public void waitForCluster() { await() .atMost(scaled(5), MINUTES) - .until(container::isRunning); + .until(kafkaContainer::isRunning); } } From 09e8f7e1b7d933178e28027634161886521272d4 Mon Sep 17 00:00:00 2001 From: Jiri Novotny Date: Wed, 24 Jan 2024 09:30:08 +0100 Subject: [PATCH 16/18] DBZ-7373 Fix deploy return value in DockerKafkaConnectController --- .../system/tools/kafka/DockerKafkaConnectController.java | 2 +- .../testing/system/tools/kafka/DockerKafkaController.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectController.java index bf1a923847f..45399b40a11 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectController.java @@ -111,7 +111,7 @@ public HttpUrl getMetricsURL() { @Override public boolean undeploy() { container.stop(); - return container.isRunning(); + return !container.isRunning(); } @Override diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java index b87af911aaf..8a91a03c42b 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java @@ -9,14 +9,15 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static org.awaitility.Awaitility.await; -import lombok.Getter; -import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.testing.system.tools.kafka.docker.KafkaContainer; import io.debezium.testing.system.tools.kafka.docker.ZookeeperContainer; +import lombok.Getter; +import lombok.Setter; + /** * This class provides control over Kafka instance deployed as DockerContainer * From 187db57a44e472255bed22390c63d7df7770b686 Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Wed, 24 Jan 2024 13:34:44 +0100 Subject: [PATCH 17/18] [docs] Fix name of capture table DDL option for SQL server --- documentation/modules/ROOT/pages/connectors/sqlserver.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc index a8837f80c27..f34987d0a24 100644 --- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc +++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc @@ -248,7 +248,7 @@ Removing offsets should be performed only by advanced users who have experience This operation is potentially destructive, and should be performed only as a last resort. ==== 4. Apply the following changes to the connector configuration: -.. (Optional) Set the value of xref:{context}-property-database-history-store-only-captured-tables-ddl[`schema.history.internal.captured.tables.ddl`] to `false`. +.. (Optional) Set the value of xref:{context}-property-database-history-store-only-captured-tables-ddl[`schema.history.internal.store.only.captured.tables.ddl`] to `false`. This setting causes the snapshot to capture the schema for all tables, and guarantees that, in the future, the connector can reconstruct the schema history for all tables. + + [NOTE] @@ -258,7 +258,7 @@ Snapshots that capture the schema for all tables require more time to complete. .. Add the tables that you want the connector to capture to xref:{context}-property-table-include-list[`table.include.list`]. .. Set the xref:{context}-property-snapshot-mode[`snapshot.mode`] to one of the following values: `initial`:: When you restart the connector, it takes a full snapshot of the database that captures the table data and table structures. + -If you select this option, consider setting the value of the xref:{context}-property-database-history-store-only-captured-tables-ddl[`schema.history.internal.captured.tables.ddl`] property to `false` to enable the connector to capture the schema of all tables. +If you select this option, consider setting the value of the xref:{context}-property-database-history-store-only-captured-tables-ddl[`schema.history.internal.store.only.captured.tables.ddl`] property to `false` to enable the connector to capture the schema of all tables. `schema_only`:: When you restart the connector, it takes a snapshot that captures only the table schema. Unlike a full data snapshot, this option does not capture any table data. Use this option if you want to restart the connector more quickly than with a full snapshot. From 5462605a7360f9da78721114dfe4964c994ae0b8 Mon Sep 17 00:00:00 2001 From: Vadzim Ramanenka Date: Wed, 20 Dec 2023 13:08:11 +0100 Subject: [PATCH 18/18] DBZ-7273: Introduce mode to query capture tables directly --- .../sqlserver/SqlServerConnection.java | 87 ++++++++++++------- .../sqlserver/SqlServerConnectorConfig.java | 82 ++++++++++++++++- .../sqlserver/SqlServerConnectorIT.java | 77 ++++++++++++++++ 3 files changed, 214 insertions(+), 32 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 97c40289825..491e718b3d0 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -66,9 +66,12 @@ public class SqlServerConnection extends JdbcConnection { private static final String GET_MIN_LSN = "SELECT [#db].sys.fn_cdc_get_min_lsn('#')"; private static final String LOCK_TABLE = "SELECT * FROM [#] WITH (TABLOCKX)"; private static final String INCREMENT_LSN = "SELECT [#db].sys.fn_cdc_increment_lsn(?)"; - private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT *# FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC"; - private final String get_all_changes_for_table; protected static final String LSN_TIMESTAMP_SELECT_STATEMENT = "TODATETIMEOFFSET([#db].sys.fn_cdc_map_lsn_to_time([__$start_lsn]), DATEPART(TZOFFSET, SYSDATETIMEOFFSET()))"; + private static final String GET_ALL_CHANGES_FOR_TABLE_SELECT = "SELECT [__$start_lsn], [__$seqval], [__$operation], [__$update_mask], #, " + + LSN_TIMESTAMP_SELECT_STATEMENT; + private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')"; + private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#]"; + private static final String GET_ALL_CHANGES_FOR_TABLE_ORDER_BY = "ORDER BY [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC"; /** * Queries the list of captured column names and their change table identifiers in the given database. @@ -133,10 +136,47 @@ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConver defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters); this.queryFetchSize = config.getQueryFetchSize(); + getAllChangesForTable = buildGetAllChangesForTableQuery(config.getDataQueryMode(), skippedOperations); + + this.config = config; + this.useSingleDatabase = useSingleDatabase; + + this.optionRecompile = false; + } + + /** + * Creates a new connection using the supplied configuration. + * + * @param config {@link Configuration} instance, may not be null. + * @param valueConverters {@link SqlServerValueConverters} instance + * @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming + * @param optionRecompile Includes query option RECOMPILE on incremental snapshots + */ + public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConverters valueConverters, + Set skippedOperations, boolean useSingleDatabase, + boolean optionRecompile) { + this(config, valueConverters, skippedOperations, useSingleDatabase); + + this.optionRecompile = optionRecompile; + } + + private String buildGetAllChangesForTableQuery(SqlServerConnectorConfig.DataQueryMode dataQueryMode, + Set skippedOperations) { + String result = GET_ALL_CHANGES_FOR_TABLE_SELECT + " "; + List where = new LinkedList<>(); + switch (dataQueryMode) { + case FUNCTION: + result += GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION + " "; + break; + case DIRECT: + result += GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT + " "; + where.add("[__$start_lsn] >= ?"); + where.add("[__$start_lsn] <= ?"); + break; + } + if (hasSkippedOperations(skippedOperations)) { Set skippedOps = new HashSet<>(); - StringBuilder getAllChangesForTableStatement = new StringBuilder( - "SELECT *# FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') WHERE __$operation NOT IN ("); skippedOperations.forEach((Envelope.Operation operation) -> { // This number are the __$operation number in the SQLServer // https://docs.microsoft.com/en-us/sql/relational-databases/system-functions/cdc-fn-cdc-get-all-changes-capture-instance-transact-sql?view=sql-server-ver15#table-returned @@ -153,37 +193,16 @@ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConver break; } }); - getAllChangesForTableStatement.append(String.join(",", skippedOps)); - getAllChangesForTableStatement.append(") order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC"); - get_all_changes_for_table = getAllChangesForTableStatement.toString(); + where.add("[__$operation] NOT IN (" + String.join(",", skippedOps) + ")"); } - else { - get_all_changes_for_table = GET_ALL_CHANGES_FOR_TABLE; + if (!where.isEmpty()) { + result += " WHERE " + String.join(" AND ", where) + " "; } - getAllChangesForTable = get_all_changes_for_table.replaceFirst(STATEMENTS_PLACEHOLDER, - Matcher.quoteReplacement(", " + LSN_TIMESTAMP_SELECT_STATEMENT)); - this.config = config; - this.useSingleDatabase = useSingleDatabase; + result += GET_ALL_CHANGES_FOR_TABLE_ORDER_BY; - this.optionRecompile = false; - } - - /** - * Creates a new connection using the supplied configuration. - * - * @param config {@link Configuration} instance, may not be null. - * @param valueConverters {@link SqlServerValueConverters} instance - * @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming - * @param optionRecompile Includes query option RECOMPILE on incremental snapshots - */ - public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConverters valueConverters, - Set skippedOperations, boolean useSingleDatabase, - boolean optionRecompile) { - this(config, valueConverters, skippedOperations, useSingleDatabase); - - this.optionRecompile = optionRecompile; + return result; } private boolean hasSkippedOperations(Set skippedOperations) { @@ -330,8 +349,14 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan int idx = 0; for (SqlServerChangeTable changeTable : changeTables) { + String capturedColumns = String.join(", ", changeTable.getCapturedColumns()); + String source = changeTable.getCaptureInstance(); + if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { + source = changeTable.getChangeTableId().table(); + } final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) - .replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance()); + .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) + .replace(STATEMENTS_PLACEHOLDER, source); queries[idx] = query; // If the table was added in the middle of queried buffer we need // to adjust from to the first LSN available diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 367cea2e9aa..edee0e429b3 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -217,6 +217,67 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) { } } + /** + * The set of predefined data query mode options. + */ + public enum DataQueryMode implements EnumeratedValue { + + /** + * In this mode the CDC data is queried by means of calling {@code cdc.[fn_cdc_get_all_changes_#]} function. + */ + FUNCTION("function"), + + /** + * In this mode the CDC data is queried from change tables directly. + */ + DIRECT("direct"); + + private final String value; + + DataQueryMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static DataQueryMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + for (DataQueryMode option : DataQueryMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static DataQueryMode parse(String value, String defaultValue) { + DataQueryMode mode = parse(value); + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + return mode; + } + } + public static final Field USER = RelationalDatabaseConnectorConfig.USER .optional() .withNoValidation(); @@ -303,6 +364,17 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) { public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(SqlServerSourceInfoStructMaker.class.getName()); + public static final Field DATA_QUERY_MODE = Field.create("data.query.mode") + .withDisplayName("Data query mode") + .withEnum(DataQueryMode.class, DataQueryMode.FUNCTION) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("Controls how the connector queries CDC data. " + + "The default is '" + DataQueryMode.FUNCTION.getValue() + + "', which means the data is queried by means of calling cdc.[fn_cdc_get_all_changes_#] function. " + + "The value of '" + DataQueryMode.DIRECT.getValue() + + "' makes the connector to query the change tables directly."); + private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit() .name("SQL Server") .type( @@ -321,7 +393,8 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) { INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE, INCREMENTAL_SNAPSHOT_CHUNK_SIZE, INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, - QUERY_FETCH_SIZE) + QUERY_FETCH_SIZE, + DATA_QUERY_MODE) .events(SOURCE_INFO_STRUCT_MAKER) .excluding( SCHEMA_INCLUDE_LIST, @@ -346,6 +419,7 @@ public static ConfigDef configDef() { private final int maxTransactionsPerIteration; private final boolean optionRecompile; private final int queryFetchSize; + private final DataQueryMode dataQueryMode; public SqlServerConnectorConfig(Configuration config) { super( @@ -386,6 +460,8 @@ public SqlServerConnectorConfig(Configuration config) { } this.optionRecompile = config.getBoolean(INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE); + + this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString()); } public List getDatabaseNames() { @@ -517,6 +593,10 @@ public Map getSnapshotSelectOverridesByTable() { return Collections.unmodifiableMap(snapshotSelectOverridesByTable); } + public DataQueryMode getDataQueryMode() { + return dataQueryMode; + } + private static int validateDatabaseNames(Configuration config, Field field, Field.ValidationOutput problems) { String databaseNames = config.getString(field); int count = 0; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index e50338d17ca..1757b08f2ee 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -184,6 +184,83 @@ public void createAndDelete() throws Exception { stopConnector(); } + @Test + public void createAndDeleteInDataQueryDirectMode() throws Exception { + final int RECORDS_PER_TABLE = 5; + final int TABLES = 2; + final int ID_START = 10; + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT) + .build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot completion + consumeRecordsByTopic(1); + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START + i; + connection.execute( + "INSERT INTO tablea VALUES(" + id + ", 'a')"); + connection.execute( + "INSERT INTO tableb VALUES(" + id + ", 'b')"); + } + + final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); + final List tableA = records.recordsForTopic("server1.testDB1.dbo.tablea"); + final List tableB = records.recordsForTopic("server1.testDB1.dbo.tableb"); + assertThat(tableA).hasSize(RECORDS_PER_TABLE); + assertThat(tableB).hasSize(RECORDS_PER_TABLE); + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final SourceRecord recordA = tableA.get(i); + final SourceRecord recordB = tableB.get(i); + final List expectedRowA = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a")); + final List expectedRowB = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + + final Struct keyA = (Struct) recordA.key(); + final Struct valueA = (Struct) recordA.value(); + assertRecord((Struct) valueA.get("after"), expectedRowA); + assertNull(valueA.get("before")); + + final Struct keyB = (Struct) recordB.key(); + final Struct valueB = (Struct) recordB.value(); + assertRecord((Struct) valueB.get("after"), expectedRowB); + assertNull(valueB.get("before")); + } + + connection.execute("DELETE FROM tableB"); + final SourceRecords deleteRecords = consumeRecordsByTopic(2 * RECORDS_PER_TABLE); + final List deleteTableA = deleteRecords.recordsForTopic("server1.testDB1.dbo.tablea"); + final List deleteTableB = deleteRecords.recordsForTopic("server1.testDB1.dbo.tableb"); + assertThat(deleteTableA).isNullOrEmpty(); + assertThat(deleteTableB).hasSize(2 * RECORDS_PER_TABLE); + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final SourceRecord deleteRecord = deleteTableB.get(i * 2); + final SourceRecord tombstoneRecord = deleteTableB.get(i * 2 + 1); + final List expectedDeleteRow = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + + final Struct deleteKey = (Struct) deleteRecord.key(); + final Struct deleteValue = (Struct) deleteRecord.value(); + assertRecord((Struct) deleteValue.get("before"), expectedDeleteRow); + assertNull(deleteValue.get("after")); + + final Struct tombstoneKey = (Struct) tombstoneRecord.key(); + final Struct tombstoneValue = (Struct) tombstoneRecord.value(); + assertNull(tombstoneValue); + } + + stopConnector(); + } + @Test @FixFor("DBZ-1642") public void readOnlyApplicationIntent() throws Exception {