Skip to content

Commit

Permalink
Merge pull request #63 from Emrehzl94/release-5.1.5
Browse files Browse the repository at this point in the history
Release 5.1.5
  • Loading branch information
Emrehzl94 authored Oct 30, 2024
2 parents 479f510 + 5514521 commit a8ad063
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 1 deletion.
2 changes: 1 addition & 1 deletion antora.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ asciidoc:
page-product: Neo4j Connector for Kafka
kafka-connect-version: 3.0
connector-version: '5.1'
exact-connector-version: '5.1.4'
exact-connector-version: '5.1.5'
page-pagination: true
product-name: Neo4j Connector for Kafka
url-common-license-page: https://neo4j.com/docs/license/
Expand Down
1 change: 1 addition & 0 deletions modules/ROOT/content-nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
** xref:source/query.adoc[Query]
* xref:source/schema-registry.adoc[Schema Registry]
* xref:source/configuration.adoc[Settings]
* xref:source/payload-mode.adoc[Payload Mode]
* *Sink connector*
* xref::sink.adoc[Configuration]
Expand Down
23 changes: 23 additions & 0 deletions modules/ROOT/pages/changelog.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,29 @@

This page lists changes to the {product-name}.

== Version 5.1.5

=== New and Updated Features

[cols="1,2", options="header"]
|===
| Feature | Details

a|
label:functionality[]
label:new[]

Added `neo4j.payload-mode` configuration property for source connector.
| Introduced the `neo4j.payload-mode` option to define the structure of change messages. Available values are `COMPACT` and `EXTENDED`. `COMPACT` provides simpler messages but faces schema compatibility issues with property type changes, while `EXTENDED` includes type information to avoid such issues. Default is `EXTENDED`.

a|
label:bug[]
label:fixed[]

Prevented exception caused by adding duplicate fields in schema generation for CDC source events.
| Resolved an issue in the ChangeEvent schema generation process where duplicate fields were causing an exception (`org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication id`). When building the schema for key array elements, if different maps contained the same field name, the field was being added multiple times, leading to this exception. Now, duplicate fields are handled appropriately to avoid this issue.
|===

== Version 5.1.4

=== New and updated features
Expand Down
4 changes: 4 additions & 0 deletions modules/ROOT/pages/source/configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ Default: `false`

Default: `1000`

| neo4j.payload-mode
| Defines the structure of change messages. One of `COMPACT`, `EXTENDED`. `COMPACT` provides simpler messages but faces schema compatibility issues if property types change. `EXTENDED` includes type information to avoid such issues.

Default: `EXTENDED`.
|===

== CDC Strategy Settings
Expand Down
185 changes: 185 additions & 0 deletions modules/ROOT/pages/source/payload-mode.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
= Kafka Source Connector: Payload Mode Configuration

The Kafka Source Connector for Neo4j supports two payload modes to control the format of data serialized and published to Kafka topics: `EXTENDED` and `COMPACT`. This feature is configurable through the `neo4j.payload-mode` property, allowing users to select the preferred serialization format based on data requirements.

== Payload Modes

The `neo4j.payload-mode` configuration offers the following options:

* **`EXTENDED` (Default)**: Provides a detailed structure for each property, supporting schema compatibility and consistency. This format is especially useful in cases where schema changes (such as property type changes) or temporal types are present, ensuring data consistency across changes.

* **`COMPACT`**: Produces a simpler format that only includes the essential fields. This format is lighter and may be preferable when schema compatibility or complex data types are not required.

[WARNING]
====
*Limitations of `COMPACT` Mode*
* **Property Type Changes**: `COMPACT` mode does not support changes in property types. If a property type changes in Neo4j (e.g., from integer to string), it can break the schema.
* **Protobuf Compatibility**: `COMPACT` mode is not supported with Protobuf. It does not support serialization of temporal types (e.g., `LocalDate`, `LocalDateTime`).
====


== Configuration

The payload mode can be configured in the source connector's settings as follows:

[source,json]
----
"neo4j.payload-mode": "EXTENDED" // Or "COMPACT" based on requirements
----

== Example Data Formats

The following examples show how data will be published in each payload mode.

=== `COMPACT` Mode Example

The `COMPACT` mode produces a minimalistic payload with only the essential fields:

[source,json]
----
{
"name": "mary",
"surname": "doe",
"timestamp": 1729779296311
}
----

This mode is useful when performance and simplicity are priorities, and it is suitable for scenarios where schema evolution and temporal consistency are not a primary concern.

=== `EXTENDED` Mode Example

The `EXTENDED` mode includes additional structure and metadata to support complex types and schema consistency, preventing issues when property types change over time:

[source,json]
----
{
"name": {
"type": "S",
"B": null,
"I64": null,
"F64": null,
"S": "mary",
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
},
"surname": {
"type": "S",
"B": null,
"I64": null,
"F64": null,
"S": "doe",
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
},
"timestamp": {
"type": "I64",
"B": null,
"I64": 1729779365447,
"F64": null,
"S": null,
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
}
}
----

This mode is especially beneficial for data with complex schema requirements, as it ensures compatibility even if property types change on the Neo4j side.

== Understanding the `EXTENDED` Payload Structure

In `EXTENDED` mode, each property includes fields for every supported Neo4j type. Only the field corresponding to the actual property type will contain a non-null value, while all others are set to null. This structure ensures that any change in the type of a property does not cause schema enforcement errors at either the source or sink connector.

[cols="1,2"]
|===
| Field | Description

| type | Indicates the type of the property. Possible values include: `B`, `I64`, `F64`, `S`, `BA`, `TLD`, `TLDT`, `TLT`, `TZDT`, `TOT`, `TD`, `SP`, or their list equivalents (e.g., `LB`, `LI64`, `LF64`, `LS`, `LTLD`, etc.).
| B | Boolean type (true or false)
| I64 | 64-bit integer
| F64 | 64-bit floating point
| S | String
| BA | Byte array
| TLD | Temporal Local Date
| TLDT | Temporal Local DateTime
| TLT | Temporal Local Time
| TZDT | Temporal Zoned DateTime
| TOT | Temporal Offset Time
| TD | Temporal Duration
| SP | Spatial Point
| LB, LI64, LF64, LS, LTLD, etc. | Lists of each corresponding type
|===

For example, a string field will be represented as:

[source,json]
----
{
"type": "S",
"B": null,
"I64": null,
"F64": null,
"S": "actual_value",
...
}
----

== Configuration Recommendations

`COMPACT` mode is useful and easier to work with when generated messages are consumed with other connectors or applications, and you can relax your schema compatibility mode on target topics. If your environment requires schema compatibility, temporal data types, or you have strong type safety requirements with different converters (`AVRO`, `JSON Schema`, `PROTOBUF` or `JSON Embedded`), `EXTENDED` mode should be preferred.

== Compatibility with Sink Connectors

The `EXTENDED` format was introduced in connector version 5.1.0 to ensure that all data published to Kafka topics adheres to a consistent schema. This prevents issues when a property changes type on the Neo4j side (e.g., a name property changes from integer to string), enabling smooth data processing across connectors and Kafka consumers. When a Neo4j sink connector is fed by a Neo4j source connector, it’s recommended to use `EXTENDED` mode, as the Neo4j sink connector can seamlessly handle the `EXTENDED` data type.
6 changes: 6 additions & 0 deletions modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ It is no longer possible to turn off this behavior in the connector itself, and

* It is now possible to ignore stored offsets by setting `neo4j.ignore-stored-offset` to `true` if required.

* The new `payload.mode` configuration provides options to control the payload structure:

** **`EXTENDED`**: Provides detailed data and type information, ensuring compatibility even if property types change.

** **`COMPACT`**: Provides a simpler, lightweight format with only essential fields, best used when schema compatibility or complex types aren’t needed.

== Sink

* Changes are now applied in the order they are received from Kafka Connect, grouped by their topics.
Expand Down

0 comments on commit a8ad063

Please sign in to comment.