Skip to content

Commit

Permalink
Add support for table exclusion (#37)
Browse files Browse the repository at this point in the history
* Update readme

* Bump release version

* Add minimal values example and test

* Add table exclude support
  • Loading branch information
teddyphreak authored Jun 25, 2023
1 parent 7d4865b commit 93eb953
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 43 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ strimzi-connector-restart: wait
| xargs -I{} make --no-print-directory kubectl exec svc/$(DATAPLANE_CHART)-connect-api -- -it -n $(DATAPLANE_NS) -- \
curl -s -XPOST http://localhost:8083/connectors/\{\}/tasks/0/restart 2>/dev/null

template:
helm template $(PWD)/charts/dataplane --values values.minimal.yml --namespace $(DATAPLANE_NS) --debug

dataplane:
@:

Expand Down
50 changes: 43 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# nephelaiio.dataplane

A helm chart to deploy a CDC replication stack integrating the following components
A helm chart to deploy a set of CDC replication connectors to create a data lake from a set of distributed databases

Deployment integrates the following components
* Metabase Data Reporting
* Strimzi Kafka Broker
* Zalando PostgreSQL Data Warehouse
* Metabase Data Reporting
* Strimzi Kafka Connect cluster
* Strimzi Kafka Schema Registry
* Strimzi Kafka Connect PostgreSQL sink
Expand All @@ -17,13 +19,47 @@ helm repo update
helm install dataplane/dataplane
```

## TODO
## Values

This is an example values definition for replicating pagila db:

```
metabase:
admin:
email: [email protected]
password: dataplane
ingress:
enabled: true
className: nginx-private
hostName: metabase.nephelai.io
cdc:
postgres:
- hostname: pagilahost
port: 5432
id: pagila
dbname: pagila
signaling: True
strimzi:
connect:
secret: "metabase-pagila-db"
kafka:
storage:
class: standard
zookeeper:
storage:
class: standard
zalando:
metabase:
class: standard
```

## Roadmap
In order of priority
* Add support for partitioned PostgreSQL tables
* Create python package for maintenance operations
* Create and publish Topic Reroute transform
* Add support for MySQL sources
* Add config options for warehouse backups
* Add monitoring for Kafka topics
* Add table exclude support for cdc connectors
* Add Opendistro deployment

## Dependencies
Chart depends on the following cluster levels components being deployed in the target cluster
Expand All @@ -36,7 +72,7 @@ Chart depends on the following cluster levels components being deployed in the t
Cluster dependencies are provisioned with role [nephelaiio.k8s](https://github.com/nephelaiio/ansible-role-k8s) in testing environment

## Testing
Testing is performed using molecule against a local single-node kind cluster using Github Actions and can be replicated locally for the latest supported cluster version using the following commands:
Testing is performed using molecule against a local cluster using Github Actions and can be replicated locally for the latest supported cluster version using the following commands:

``` sh
./bin/test
Expand Down
17 changes: 9 additions & 8 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ function debug {

trap debug EXIT

make molecule destroy && \
make molecule create && \
make images && \
make molecule converge && \
make wait && \
make molecule verify && \
make molecule side-effect && \
make molecule verify && \
make template &&
make molecule destroy &&
make molecule create &&
make images &&
make molecule converge &&
make wait &&
make molecule verify &&
make molecule side-effect &&
make molecule verify &&
trap - EXIT
4 changes: 2 additions & 2 deletions charts/dataplane/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.12
version: 0.1.13

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: 0.1.12
appVersion: 0.1.13
55 changes: 38 additions & 17 deletions charts/dataplane/templates/strimzi/connector.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
{{- $fullName := include "dataplane.strimzi.connect.fullname" . -}}
{{- $registryName := include "dataplane.registry.fullname" . -}}
{{- $registryPort := .Values.registry.service.port -}}
{{- $fullName := include "dataplane.strimzi.connect.fullname" . }}
{{- $registryName := include "dataplane.registry.fullname" . }}
{{- $registryPort := .Values.registry.service.port }}

{{- range $db := .Values.cdc.postgres }}
{{- $dbConnectorName := lower (printf "%s-%s-%s" $fullName $db.servername $db.dbname) }}
{{- $dbSlotName := $dbConnectorName | replace "-" "_" }}
{{- $dbId := required "database id is required" $db.id }}
{{- $dbConnectorName := lower (printf "%s-%s-%s" $fullName $dbId $db.dbname) }}
{{- $dbSlotName := $dbConnectorName | replace "-" "_" }}
{{- $exclude := list }}

{{- if hasKey $db "exclude" }}
{{- if ge (len $db.exclude) 1 }}
{{- range $item := $db.exclude }}
{{- $exclude = append $exclude $item }}
{{- end }}
{{- end }}
{{- end }}

{{- if hasKey $db "partitions" }}
{{- if ge (len $db.partitions) 1 }}
{{- range $item := $db.partitions }}
{{- $exclude = append $exclude $item.source }}
{{- end }}
{{- end }}
{{- end }}

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
Expand All @@ -20,14 +39,14 @@ spec:
database.port: "{{ $db.port | default 5432 }}"
database.user: "${env:POSTGRES_CDC_USER}"
database.password: "${env:POSTGRES_CDC_PASS}"
database.server.name: "{{ $db.servername }}"
database.server.name: "{{ $dbId }}"
database.dbname: "{{ $db.dbname }}"
snapshot.new.tables: parallel
slot.name: "{{ $dbSlotName }}"
plugin.name: pgoutput
tasks.max: "1"
{{- if and (hasKey $db "partitions") (ge (len $db.partitions) 1) }}
table.exclude.list: "{{- range $i, $p := $db.partitions -}}{{- if $i }},{{- end -}}{{ $p.source }}{{- end }}"
{{- if ge (len $exclude) 1 }}
table.exclude.list: "{{- range $i, $p := $exclude -}}{{- if $i }},{{- end -}}{{ $p }}{{- end }}"
{{- end }}
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }}
Expand All @@ -41,12 +60,13 @@ spec:
transforms.reroute.topic.regex: (.*)
transforms.reroute.topic.replacement: cdc.$1

{{- if and (hasKey $db "partitions") (ge (len $db.partitions) 1) }}
{{- range $partition := $db.partitions }}
{{- $partitionSource := required "partition source is required" $partition.source }}
{{- $partitionSink := required "partition sink is required" $partition.sink }}
{{- $partitionConnectorName := lower (printf "%s-%s" $dbConnectorName $partitionSink) | replace "." "-" }}
{{- $partitionSlotName := $partitionConnectorName | replace "-" "_" }}
{{- if (hasKey $db "partitions") }}
{{- if ge (len $db.partitions) 1 }}
{{- range $partition := $db.partitions }}
{{- $partitionSource := required "partition source is required" $partition.source }}
{{- $partitionSink := required "partition sink is required" $partition.sink }}
{{- $partitionConnectorName := lower (printf "%s-%s" $dbConnectorName $partitionSink) | replace "." "-" }}
{{- $partitionSlotName := $partitionConnectorName | replace "-" "_" }}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
Expand All @@ -62,7 +82,7 @@ spec:
database.port: "{{ $db.port | default 5432 }}"
database.user: "${env:POSTGRES_CDC_USER}"
database.password: "${env:POSTGRES_CDC_PASS}"
database.server.name: "{{ $db.servername }}"
database.server.name: "{{ $dbId }}"
database.dbname: "{{ $db.dbname }}"
snapshot.new.tables: parallel
slot.name: {{ $partitionSlotName }}
Expand All @@ -80,8 +100,9 @@ spec:
transforms.reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.reroute.topic.regex: "^([^\\.]+)\\.([^\\.]+)\\..*"
transforms.reroute.topic.replacement: "cdc.$1.$2.{{ $partitionSink }}"
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}

---
Expand Down
8 changes: 4 additions & 4 deletions charts/dataplane/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ util:
image:
repository: nephelaiio/dataplane-util
pullPolicy: IfNotPresent
tag: dataplane-0.1.12
tag: dataplane-0.1.13
resources: {}

cdc:
postgres: []
# postgres:
# - hostname: "network.hostname"
# port: 5432
# servername: "host.identifier"
# id: "dbId"
# dbname: "sourcedb"
# signaling: "signaling_table"
# partitions:
Expand Down Expand Up @@ -45,7 +45,7 @@ strimzi:
connect:
image:
repository: nephelaiio/dataplane-connect
tag: dataplane-0.1.12
tag: dataplane-0.1.13
replicas: 1
config:
group.id: connect-cluster
Expand Down Expand Up @@ -100,7 +100,7 @@ metabase:
image:
repository: nephelaiio/dataplane-util
pullPolicy: IfNotPresent
tag: dataplane-0.1.12
tag: dataplane-0.1.13
securityContext: {}

image:
Expand Down
4 changes: 3 additions & 1 deletion molecule/default/molecule.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ provisioner:
postgres:
- hostname: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}"
port: 5432
servername: pagila
id: pagila
dbname: "{{ dataplane_pagila_db }}"
signaling: "{{ dataplane_pagila_signaling }}"
exclude:
- "public.staff"
partitions:
- source: "public.payment.*"
sink: "payment"
Expand Down
10 changes: 7 additions & 3 deletions molecule/default/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
- name: record source table data
ansible.builtin.set_fact:
pagila_all_tables: "{{ _table_names | list }}"
pagila_target_tables: "{{ _table_names | reject('regex', '^payment_.*') }}"
pagila_target_tables: "{{ _table_names | reject('regex', '^payment_.*') | reject('regex', '^staff') }}"
vars:
_table_names: "{{ pagila_table_query.query_result | map(attribute='table_name') }}"

Expand Down Expand Up @@ -123,7 +123,9 @@
register: warehouse_table_query
retries: 20
delay: 30
until: warehouse_tables_expected | difference(warehouse_tables_found) | length == 0
until:
- warehouse_tables_expected | difference(warehouse_tables_found) | length == 0
- warehouse_tables_found | difference(warehouse_tables_expected) | length == 0

rescue:
- name: debug table mismatches
Expand All @@ -132,7 +134,9 @@
vars:
warehouse_tables_expected: "{{ pagila_target_tables| map('map_format', 'pagila_%s') | list }}"
warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}"
warehouse_tables_diff: "{{ warehouse_tables_expected | difference(warehouse_tables_found) }}"
warehouse_tables_diff_expected: "{{ warehouse_tables_expected | difference(warehouse_tables_found) }}"
warehouse_tables_diff_found: "{{ warehouse_tables_found | difference(warehouse_tables_expected) }}"
warehouse_tables_diff: "{{ warehouse_tables_diff_expected + warehouse_tables_diff_found }}"

- name: fail verification
ansible.builtin.fail:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dataplane"
version = "0.1.12"
version = "0.1.13"
description = ""
authors = ["Teodoro Cook <[email protected]>"]

Expand Down
31 changes: 31 additions & 0 deletions values.minimal.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
metabase:
admin:
email: [email protected]
password: dataplane
ingress:
enabled: true
className: nginx-private
hostName: metabase.nephelai.io
cdc:
postgres:
- hostname: pagilahost
id: pagila
dbname: pagila
exclude:
- "public.staff"
partitions:
- source: "public.payment.*"
sink: "payment"
strimzi:
connect:
secret: "metabase-pagila-db"
kafka:
storage:
class: standard
zookeeper:
storage:
class: standard
zalando:
metabase:
class: standard

0 comments on commit 93eb953

Please sign in to comment.