diff --git a/Makefile b/Makefile index 3317be1..35b411a 100644 --- a/Makefile +++ b/Makefile @@ -147,6 +147,9 @@ strimzi-connector-restart: wait | xargs -I{} make --no-print-directory kubectl exec svc/$(DATAPLANE_CHART)-connect-api -- -it -n $(DATAPLANE_NS) -- \ curl -s -XPOST http://localhost:8083/connectors/\{\}/tasks/0/restart 2>/dev/null +template: + helm template $(PWD)/charts/dataplane --values values.minimal.yml --namespace $(DATAPLANE_NS) --debug + dataplane: @: diff --git a/README.md b/README.md index 1be1337..cbfbdf2 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ # nephelaiio.dataplane -A helm chart to deploy a CDC replication stack integrating the following components +A helm chart to deploy a set of CDC replication connectors to create a data lake from a set of distributed databases + +Deployment integrates the following components +* Metabase Data Reporting * Strimzi Kafka Broker * Zalando PostgreSQL Data Warehouse -* Metabase Data Reporting * Strimzi Kafka Connect cluster * Strimzi Kafka Schema Registry * Strimzi Kafka Connect PostgreSQL sink @@ -17,13 +19,47 @@ helm repo update helm install dataplane/dataplane ``` -## TODO +## Values + +This is an example values definition for replicating pagila db: + +``` +metabase: + admin: + email: metabase@nephelai.io + password: dataplane + ingress: + enabled: true + className: nginx-private + hostName: metabase.nephelai.io +cdc: + postgres: + - hostname: pagilahost + port: 5432 + id: pagila + dbname: pagila + signaling: True +strimzi: + connect: + secret: "metabase-pagila-db" + kafka: + storage: + class: standard + zookeeper: + storage: + class: standard +zalando: + metabase: + class: standard +``` + +## Roadmap In order of priority -* Add support for partitioned PostgreSQL tables +* Create python package for maintenance operations +* Create and publish Topic Reroute transform * Add support for MySQL sources -* Add config options for warehouse backups * Add monitoring for Kafka topics -* Add table exclude support for cdc connectors +* Add Opendistro deployment ## Dependencies Chart depends on the following cluster levels components being deployed in the target cluster @@ -36,7 +72,7 @@ Chart depends on the following cluster levels components being deployed in the t Cluster dependencies are provisioned with role [nephelaiio.k8s](https://github.com/nephelaiio/ansible-role-k8s) in testing environment ## Testing -Testing is performed using molecule against a local single-node kind cluster using Github Actions and can be replicated locally for the latest supported cluster version using the following commands: +Testing is performed using molecule against a local cluster using Github Actions and can be replicated locally for the latest supported cluster version using the following commands: ``` sh ./bin/test diff --git a/bin/test b/bin/test index bea9cf6..e2d703f 100755 --- a/bin/test +++ b/bin/test @@ -14,12 +14,13 @@ function debug { trap debug EXIT -make molecule destroy && \ - make molecule create && \ - make images && \ - make molecule converge && \ - make wait && \ - make molecule verify && \ - make molecule side-effect && \ - make molecule verify && \ +make template && + make molecule destroy && + make molecule create && + make images && + make molecule converge && + make wait && + make molecule verify && + make molecule side-effect && + make molecule verify && trap - EXIT diff --git a/charts/dataplane/Chart.yaml b/charts/dataplane/Chart.yaml index 6393091..8a807a5 100644 --- a/charts/dataplane/Chart.yaml +++ b/charts/dataplane/Chart.yaml @@ -16,10 +16,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.12 +version: 0.1.13 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: 0.1.12 +appVersion: 0.1.13 diff --git a/charts/dataplane/templates/strimzi/connector.yaml b/charts/dataplane/templates/strimzi/connector.yaml index 113666e..56df41f 100644 --- a/charts/dataplane/templates/strimzi/connector.yaml +++ b/charts/dataplane/templates/strimzi/connector.yaml @@ -1,10 +1,29 @@ -{{- $fullName := include "dataplane.strimzi.connect.fullname" . -}} -{{- $registryName := include "dataplane.registry.fullname" . -}} -{{- $registryPort := .Values.registry.service.port -}} +{{- $fullName := include "dataplane.strimzi.connect.fullname" . }} +{{- $registryName := include "dataplane.registry.fullname" . }} +{{- $registryPort := .Values.registry.service.port }} {{- range $db := .Values.cdc.postgres }} -{{- $dbConnectorName := lower (printf "%s-%s-%s" $fullName $db.servername $db.dbname) }} -{{- $dbSlotName := $dbConnectorName | replace "-" "_" }} +{{- $dbId := required "database id is required" $db.id }} +{{- $dbConnectorName := lower (printf "%s-%s-%s" $fullName $dbId $db.dbname) }} +{{- $dbSlotName := $dbConnectorName | replace "-" "_" }} +{{- $exclude := list }} + +{{- if hasKey $db "exclude" }} +{{- if ge (len $db.exclude) 1 }} +{{- range $item := $db.exclude }} +{{- $exclude = append $exclude $item }} +{{- end }} +{{- end }} +{{- end }} + +{{- if hasKey $db "partitions" }} +{{- if ge (len $db.partitions) 1 }} +{{- range $item := $db.partitions }} +{{- $exclude = append $exclude $item.source }} +{{- end }} +{{- end }} +{{- end }} + --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector @@ -20,14 +39,14 @@ spec: database.port: "{{ $db.port | default 5432 }}" database.user: "${env:POSTGRES_CDC_USER}" database.password: "${env:POSTGRES_CDC_PASS}" - database.server.name: "{{ $db.servername }}" + database.server.name: "{{ $dbId }}" database.dbname: "{{ $db.dbname }}" snapshot.new.tables: parallel slot.name: "{{ $dbSlotName }}" plugin.name: pgoutput tasks.max: "1" - {{- if and (hasKey $db "partitions") (ge (len $db.partitions) 1) }} - table.exclude.list: "{{- range $i, $p := $db.partitions -}}{{- if $i }},{{- end -}}{{ $p.source }}{{- end }}" + {{- if ge (len $exclude) 1 }} + table.exclude.list: "{{- range $i, $p := $exclude -}}{{- if $i }},{{- end -}}{{ $p }}{{- end }}" {{- end }} key.converter: io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }} @@ -41,12 +60,13 @@ spec: transforms.reroute.topic.regex: (.*) transforms.reroute.topic.replacement: cdc.$1 -{{- if and (hasKey $db "partitions") (ge (len $db.partitions) 1) }} -{{- range $partition := $db.partitions }} -{{- $partitionSource := required "partition source is required" $partition.source }} -{{- $partitionSink := required "partition sink is required" $partition.sink }} -{{- $partitionConnectorName := lower (printf "%s-%s" $dbConnectorName $partitionSink) | replace "." "-" }} -{{- $partitionSlotName := $partitionConnectorName | replace "-" "_" }} +{{- if (hasKey $db "partitions") }} +{{- if ge (len $db.partitions) 1 }} +{{- range $partition := $db.partitions }} +{{- $partitionSource := required "partition source is required" $partition.source }} +{{- $partitionSink := required "partition sink is required" $partition.sink }} +{{- $partitionConnectorName := lower (printf "%s-%s" $dbConnectorName $partitionSink) | replace "." "-" }} +{{- $partitionSlotName := $partitionConnectorName | replace "-" "_" }} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector @@ -62,7 +82,7 @@ spec: database.port: "{{ $db.port | default 5432 }}" database.user: "${env:POSTGRES_CDC_USER}" database.password: "${env:POSTGRES_CDC_PASS}" - database.server.name: "{{ $db.servername }}" + database.server.name: "{{ $dbId }}" database.dbname: "{{ $db.dbname }}" snapshot.new.tables: parallel slot.name: {{ $partitionSlotName }} @@ -80,8 +100,9 @@ spec: transforms.reroute.type: io.debezium.transforms.ByLogicalTableRouter transforms.reroute.topic.regex: "^([^\\.]+)\\.([^\\.]+)\\..*" transforms.reroute.topic.replacement: "cdc.$1.$2.{{ $partitionSink }}" -{{- end }} -{{- end }} +{{- end }} +{{- end }} +{{- end }} {{- end }} --- diff --git a/charts/dataplane/values.yaml b/charts/dataplane/values.yaml index c146f08..e327b45 100644 --- a/charts/dataplane/values.yaml +++ b/charts/dataplane/values.yaml @@ -7,7 +7,7 @@ util: image: repository: nephelaiio/dataplane-util pullPolicy: IfNotPresent - tag: dataplane-0.1.12 + tag: dataplane-0.1.13 resources: {} cdc: @@ -15,7 +15,7 @@ cdc: # postgres: # - hostname: "network.hostname" # port: 5432 - # servername: "host.identifier" + # id: "dbId" # dbname: "sourcedb" # signaling: "signaling_table" # partitions: @@ -45,7 +45,7 @@ strimzi: connect: image: repository: nephelaiio/dataplane-connect - tag: dataplane-0.1.12 + tag: dataplane-0.1.13 replicas: 1 config: group.id: connect-cluster @@ -100,7 +100,7 @@ metabase: image: repository: nephelaiio/dataplane-util pullPolicy: IfNotPresent - tag: dataplane-0.1.12 + tag: dataplane-0.1.13 securityContext: {} image: diff --git a/molecule/default/molecule.yml b/molecule/default/molecule.yml index e524135..54054de 100644 --- a/molecule/default/molecule.yml +++ b/molecule/default/molecule.yml @@ -90,9 +90,11 @@ provisioner: postgres: - hostname: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" port: 5432 - servername: pagila + id: pagila dbname: "{{ dataplane_pagila_db }}" signaling: "{{ dataplane_pagila_signaling }}" + exclude: + - "public.staff" partitions: - source: "public.payment.*" sink: "payment" diff --git a/molecule/default/verify.yml b/molecule/default/verify.yml index 77a61f5..2496481 100644 --- a/molecule/default/verify.yml +++ b/molecule/default/verify.yml @@ -62,7 +62,7 @@ - name: record source table data ansible.builtin.set_fact: pagila_all_tables: "{{ _table_names | list }}" - pagila_target_tables: "{{ _table_names | reject('regex', '^payment_.*') }}" + pagila_target_tables: "{{ _table_names | reject('regex', '^payment_.*') | reject('regex', '^staff') }}" vars: _table_names: "{{ pagila_table_query.query_result | map(attribute='table_name') }}" @@ -123,7 +123,9 @@ register: warehouse_table_query retries: 20 delay: 30 - until: warehouse_tables_expected | difference(warehouse_tables_found) | length == 0 + until: + - warehouse_tables_expected | difference(warehouse_tables_found) | length == 0 + - warehouse_tables_found | difference(warehouse_tables_expected) | length == 0 rescue: - name: debug table mismatches @@ -132,7 +134,9 @@ vars: warehouse_tables_expected: "{{ pagila_target_tables| map('map_format', 'pagila_%s') | list }}" warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}" - warehouse_tables_diff: "{{ warehouse_tables_expected | difference(warehouse_tables_found) }}" + warehouse_tables_diff_expected: "{{ warehouse_tables_expected | difference(warehouse_tables_found) }}" + warehouse_tables_diff_found: "{{ warehouse_tables_found | difference(warehouse_tables_expected) }}" + warehouse_tables_diff: "{{ warehouse_tables_diff_expected + warehouse_tables_diff_found }}" - name: fail verification ansible.builtin.fail: diff --git a/pyproject.toml b/pyproject.toml index 13a6b1c..d7c2166 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dataplane" -version = "0.1.12" +version = "0.1.13" description = "" authors = ["Teodoro Cook "] diff --git a/values.minimal.yml b/values.minimal.yml new file mode 100644 index 0000000..0025a63 --- /dev/null +++ b/values.minimal.yml @@ -0,0 +1,31 @@ +--- +metabase: + admin: + email: metabase@nephelai.io + password: dataplane + ingress: + enabled: true + className: nginx-private + hostName: metabase.nephelai.io +cdc: + postgres: + - hostname: pagilahost + id: pagila + dbname: pagila + exclude: + - "public.staff" + partitions: + - source: "public.payment.*" + sink: "payment" +strimzi: + connect: + secret: "metabase-pagila-db" + kafka: + storage: + class: standard + zookeeper: + storage: + class: standard +zalando: + metabase: + class: standard