From 7d4865b3dac76421eec77574d99b8f0a0af0ec31 Mon Sep 17 00:00:00 2001 From: Teodoro Cook Date: Thu, 22 Jun 2023 23:33:31 -0600 Subject: [PATCH] Add support for table partition rerouting (#36) --- README.md | 2 - charts/dataplane/Chart.yaml | 4 +- .../templates/strimzi/connector.yaml | 54 +++++++++++++++++- charts/dataplane/values.yaml | 6 +- molecule/default/converge.yml | 55 +++++++++++++++---- molecule/default/molecule.yml | 4 +- molecule/default/verify.yml | 11 ++-- pyproject.toml | 2 +- 8 files changed, 109 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 2f1090b..1be1337 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ # nephelaiio.dataplane -[![Build Status](https://github.com/nephelaiio/helm-dataplane/workflows/molecule/badge.svg)](https://github.com/nephelaiio/helm-dataplane/actions) - A helm chart to deploy a CDC replication stack integrating the following components * Strimzi Kafka Broker * Zalando PostgreSQL Data Warehouse diff --git a/charts/dataplane/Chart.yaml b/charts/dataplane/Chart.yaml index 70cb601..6393091 100644 --- a/charts/dataplane/Chart.yaml +++ b/charts/dataplane/Chart.yaml @@ -16,10 +16,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.11 +version: 0.1.12 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: 0.1.11 +appVersion: 0.1.12 diff --git a/charts/dataplane/templates/strimzi/connector.yaml b/charts/dataplane/templates/strimzi/connector.yaml index 7e910dc..113666e 100644 --- a/charts/dataplane/templates/strimzi/connector.yaml +++ b/charts/dataplane/templates/strimzi/connector.yaml @@ -3,11 +3,13 @@ {{- $registryPort := .Values.registry.service.port -}} {{- range $db := .Values.cdc.postgres }} +{{- $dbConnectorName := lower (printf "%s-%s-%s" $fullName $db.servername $db.dbname) }} +{{- $dbSlotName := $dbConnectorName | replace "-" "_" }} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: - name: {{ $fullName }}-{{ lower $db.servername }}-{{ lower $db.dbname }} + name: {{ $dbConnectorName }} labels: strimzi.io/cluster: {{ $fullName }} spec: @@ -21,7 +23,12 @@ spec: database.server.name: "{{ $db.servername }}" database.dbname: "{{ $db.dbname }}" snapshot.new.tables: parallel + slot.name: "{{ $dbSlotName }}" + plugin.name: pgoutput tasks.max: "1" + {{- if and (hasKey $db "partitions") (ge (len $db.partitions) 1) }} + table.exclude.list: "{{- range $i, $p := $db.partitions -}}{{- if $i }},{{- end -}}{{ $p.source }}{{- end }}" + {{- end }} key.converter: io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }} value.converter: io.confluent.connect.avro.AvroConverter @@ -33,7 +40,50 @@ spec: transforms.reroute.type: io.debezium.transforms.ByLogicalTableRouter transforms.reroute.topic.regex: (.*) transforms.reroute.topic.replacement: cdc.$1 + +{{- if and (hasKey $db "partitions") (ge (len $db.partitions) 1) }} +{{- range $partition := $db.partitions }} +{{- $partitionSource := required "partition source is required" $partition.source }} +{{- $partitionSink := required "partition sink is required" $partition.sink }} +{{- $partitionConnectorName := lower (printf "%s-%s" $dbConnectorName $partitionSink) | replace "." "-" }} +{{- $partitionSlotName := $partitionConnectorName | replace "-" "_" }} +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaConnector +metadata: + name: {{ $partitionConnectorName }} + labels: + strimzi.io/cluster: {{ $fullName }} +spec: + class: io.debezium.connector.postgresql.PostgresConnector + tasksMax: 1 + config: + database.hostname: {{ $db.hostname }} + database.port: "{{ $db.port | default 5432 }}" + database.user: "${env:POSTGRES_CDC_USER}" + database.password: "${env:POSTGRES_CDC_PASS}" + database.server.name: "{{ $db.servername }}" + database.dbname: "{{ $db.dbname }}" + snapshot.new.tables: parallel + slot.name: {{ $partitionSlotName }} + plugin.name: pgoutput + tasks.max: "1" + table.include.list: "{{ $partitionSource }}" + key.converter: io.confluent.connect.avro.AvroConverter + key.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }} + value.converter: io.confluent.connect.avro.AvroConverter + value.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }} + {{- if $db.signaling }} + signal.data.collection: debezium.signaling + {{- end }} + transforms: reroute + transforms.reroute.type: io.debezium.transforms.ByLogicalTableRouter + transforms.reroute.topic.regex: "^([^\\.]+)\\.([^\\.]+)\\..*" + transforms.reroute.topic.replacement: "cdc.$1.$2.{{ $partitionSink }}" +{{- end }} {{- end }} +{{- end }} + --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector @@ -60,5 +110,5 @@ spec: transforms.unwrap.drop.tombstones: false transforms.unwrap.delete.handling.mode: rewrite transforms.rename.type: org.apache.kafka.connect.transforms.RegexRouter - transforms.rename.regex: "^cdc\\.(.*)\\.(.*)\\.(.*)" + transforms.rename.regex: "^cdc\\.([^\\.]*)\\.([^\\.]*)\\.([^\\.]*)" transforms.rename.replacement: "$1_$3" diff --git a/charts/dataplane/values.yaml b/charts/dataplane/values.yaml index 634208f..c146f08 100644 --- a/charts/dataplane/values.yaml +++ b/charts/dataplane/values.yaml @@ -7,7 +7,7 @@ util: image: repository: nephelaiio/dataplane-util pullPolicy: IfNotPresent - tag: dataplane-0.1.11 + tag: dataplane-0.1.12 resources: {} cdc: @@ -45,7 +45,7 @@ strimzi: connect: image: repository: nephelaiio/dataplane-connect - tag: dataplane-0.1.11 + tag: dataplane-0.1.12 replicas: 1 config: group.id: connect-cluster @@ -100,7 +100,7 @@ metabase: image: repository: nephelaiio/dataplane-util pullPolicy: IfNotPresent - tag: dataplane-0.1.11 + tag: dataplane-0.1.12 securityContext: {} image: diff --git a/molecule/default/converge.yml b/molecule/default/converge.yml index 19783c3..a7972f3 100644 --- a/molecule/default/converge.yml +++ b/molecule/default/converge.yml @@ -99,6 +99,49 @@ - (cluster_status | length) != (cluster_query | length) - cluster_failed | length > 0 + - name: query pagila service data + ansible.builtin.set_fact: + pagila_host: "{{ _db_svc_data.status.loadBalancer.ingress[0].ip }}" + vars: + _db_svc_name: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" + _db_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', _db_svc_name) | first }}" + service_query: "{{ query( + 'kubernetes.core.k8s', + namespace=dataplane_pagila_namespace, + kind='Service', + kubeconfig=k8s_kubeconfig) }}" + + - name: query pagila owner data + ansible.builtin.set_fact: + pagila_owner_user: "{{ _db_secret_data.data.username | b64decode }}" + pagila_owner_pass: "{{ _db_secret_data.data.password | b64decode }}" + vars: + _db_secret_name: "zalando-{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" + _db_secret_data: "{{ secret_query | selectattr('metadata.name', 'equalto', _db_secret_name) | first }}" + secret_query: "{{ query( + 'kubernetes.core.k8s', + namespace=dataplane_pagila_namespace, + kind='Secret', + kubeconfig=k8s_kubeconfig) }}" + + - name: query pagila publications + community.postgresql.postgresql_query: + db: "{{ dataplane_pagila_db }}" + query: "SELECT * FROM pg_publication where pubname = 'dbz_publication'" + login_user: "{{ pagila_owner_user }}" + login_password: "{{ pagila_owner_pass }}" + login_host: "{{ pagila_host }}" + register: pagila_publication_query + + - name: create pagila publication + community.postgresql.postgresql_query: + db: "{{ dataplane_pagila_db }}" + query: "CREATE PUBLICATION dbz_publication FOR ALL TABLES" + login_user: "{{ pagila_owner_user }}" + login_password: "{{ pagila_owner_pass }}" + login_host: "{{ pagila_host }}" + when: pagila_publication_query.query_all_results | flatten | length == 0 + - name: deploy dataplane helm chart kubernetes.core.helm: state: present @@ -144,18 +187,6 @@ kind='Secret', kubeconfig=k8s_kubeconfig) }}" - - name: query pagila service data - ansible.builtin.set_fact: - pagila_host: "{{ _db_svc_data.status.loadBalancer.ingress[0].ip }}" - vars: - _db_svc_name: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" - _db_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', _db_svc_name) | first }}" - service_query: "{{ query( - 'kubernetes.core.k8s', - namespace=dataplane_pagila_namespace, - kind='Service', - kubeconfig=k8s_kubeconfig) }}" - - name: wait for database port open ansible.builtin.wait_for: host: "{{ pagila_host }}" diff --git a/molecule/default/molecule.yml b/molecule/default/molecule.yml index a301088..e524135 100644 --- a/molecule/default/molecule.yml +++ b/molecule/default/molecule.yml @@ -94,8 +94,8 @@ provisioner: dbname: "{{ dataplane_pagila_db }}" signaling: "{{ dataplane_pagila_signaling }}" partitions: - - source: "payments.*" - sink: "payments" + - source: "public.payment.*" + sink: "payment" strimzi: connect: image: diff --git a/molecule/default/verify.yml b/molecule/default/verify.yml index 967a9fc..77a61f5 100644 --- a/molecule/default/verify.yml +++ b/molecule/default/verify.yml @@ -61,7 +61,8 @@ - name: record source table data ansible.builtin.set_fact: - pagila_tables: "{{ _table_names | difference(['payment']) | list }}" + pagila_all_tables: "{{ _table_names | list }}" + pagila_target_tables: "{{ _table_names | reject('regex', '^payment_.*') }}" vars: _table_names: "{{ pagila_table_query.query_result | map(attribute='table_name') }}" @@ -72,7 +73,7 @@ login_user: "{{ pagila_user }}" login_password: "{{ pagila_pass }}" login_host: "{{ pagila_host }}" - loop: "{{ pagila_tables }}" + loop: "{{ pagila_all_tables }}" register: pagila_data_query - name: record pagila table data @@ -117,7 +118,7 @@ login_password: "{{ warehouse_pass }}" login_host: "{{ warehouse_host }}" vars: - warehouse_tables_expected: "{{ pagila_tables | map('map_format', 'pagila_%s') | list }}" + warehouse_tables_expected: "{{ pagila_target_tables | map('map_format', 'pagila_%s') | list }}" warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}" register: warehouse_table_query retries: 20 @@ -129,7 +130,7 @@ ansible.builtin.debug: msg: "table diff=[{{ ', '.join(warehouse_tables_diff) }}]" vars: - warehouse_tables_expected: "{{ pagila_tables | map('map_format', 'pagila_%s') | list }}" + warehouse_tables_expected: "{{ pagila_target_tables| map('map_format', 'pagila_%s') | list }}" warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}" warehouse_tables_diff: "{{ warehouse_tables_expected | difference(warehouse_tables_found) }}" @@ -168,7 +169,7 @@ source_table_records: "{{ (pagila_data[source_table].query_result | list)[0].count }}" loop_control: label: "{{ source_table }}" - loop: "{{ (pagila_data.keys() | list) }}" + loop: "{{ (pagila_target_tables | list) }}" when: source_table_records > warehouse_table_records - name: verify metabase app database diff --git a/pyproject.toml b/pyproject.toml index 62799b8..13a6b1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dataplane" -version = "0.1.11" +version = "0.1.12" description = "" authors = ["Teodoro Cook "]