diff --git a/README.md b/README.md index 2460044..2f1090b 100644 --- a/README.md +++ b/README.md @@ -21,10 +21,10 @@ helm install dataplane/dataplane ## TODO In order of priority +* Add support for partitioned PostgreSQL tables * Add support for MySQL sources * Add config options for warehouse backups * Add monitoring for Kafka topics -* Add Apache Flink deployment * Add table exclude support for cdc connectors ## Dependencies diff --git a/charts/dataplane/Chart.yaml b/charts/dataplane/Chart.yaml index 68e4acc..70cb601 100644 --- a/charts/dataplane/Chart.yaml +++ b/charts/dataplane/Chart.yaml @@ -16,10 +16,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.10 +version: 0.1.11 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: 0.1.10 +appVersion: 0.1.11 diff --git a/charts/dataplane/values.yaml b/charts/dataplane/values.yaml index c6e5a73..634208f 100644 --- a/charts/dataplane/values.yaml +++ b/charts/dataplane/values.yaml @@ -7,11 +7,20 @@ util: image: repository: nephelaiio/dataplane-util pullPolicy: IfNotPresent - tag: dataplane-0.1.10 + tag: dataplane-0.1.11 resources: {} cdc: postgres: [] + # postgres: + # - hostname: "network.hostname" + # port: 5432 + # servername: "host.identifier" + # dbname: "sourcedb" + # signaling: "signaling_table" + # partitions: + # - source: "payments.*" + # sink: "payments" mysql: [] strimzi: @@ -36,7 +45,7 @@ strimzi: connect: image: repository: nephelaiio/dataplane-connect - tag: dataplane-0.1.10 + tag: dataplane-0.1.11 replicas: 1 config: group.id: connect-cluster @@ -91,7 +100,7 @@ metabase: image: repository: nephelaiio/dataplane-util pullPolicy: IfNotPresent - tag: dataplane-0.1.10 + tag: dataplane-0.1.11 securityContext: {} image: diff --git a/molecule/default/converge.yml b/molecule/default/converge.yml index 5018a10..19783c3 100644 --- a/molecule/default/converge.yml +++ b/molecule/default/converge.yml @@ -6,11 +6,9 @@ gather_facts: false roles: - - nephelaiio.plugins tasks: - - name: wait for strimzi crd deployment kubernetes.core.k8s_info: api_version: v1 @@ -86,15 +84,14 @@ ansible.builtin.fail: msg: "zalando clusters failed to come up on time" vars: - cluster_failed: "{{ cluster_status | rejectattr('status.PostgresClusterStatus', 'equalto', 'Running') | map(attribute='metadata.name') }}" + cluster_failed_data: "{{ cluster_status | rejectattr('status.PostgresClusterStatus', 'equalto', 'Running') }}" + cluster_failed: "{{ cluster_failed_data | map(attribute='metadata.name') }}" cluster_status: "{{ cluster_query | selectattr('status', 'defined') }}" - cluster_query: "{{ - query( - 'kubernetes.core.k8s', - api_version='acid.zalan.do/v1', - kind='postgresql', - kubeconfig=k8s_kubeconfig - )}}" + cluster_query: "{{ query( + 'kubernetes.core.k8s', + api_version='acid.zalan.do/v1', + kind='postgresql', + kubeconfig=k8s_kubeconfig) }}" retries: 15 delay: 30 until: cluster_failed | length == 0 @@ -122,13 +119,11 @@ vars: cluster_failed: "{{ cluster_status | rejectattr('status.PostgresClusterStatus', 'equalto', 'Running') | map(attribute='metadata.name') }}" cluster_status: "{{ cluster_query | selectattr('status', 'defined') }}" - cluster_query: "{{ - query( - 'kubernetes.core.k8s', - api_version='acid.zalan.do/v1', - kind='postgresql', - kubeconfig=k8s_kubeconfig - )}}" + cluster_query: "{{ query( + 'kubernetes.core.k8s', + api_version='acid.zalan.do/v1', + kind='postgresql', + kubeconfig=k8s_kubeconfig) }}" retries: 15 delay: 30 until: cluster_failed | length == 0 @@ -138,32 +133,28 @@ - name: query pagila connection data ansible.builtin.set_fact: - pagila_user: "{{ pagila_db_secret_data.data.username | b64decode }}" - pagila_pass: "{{ pagila_db_secret_data.data.password | b64decode }}" + pagila_user: "{{ _db_secret_data.data.username | b64decode }}" + pagila_pass: "{{ _db_secret_data.data.password | b64decode }}" vars: - pagila_db_secret_name: "{{ dataplane_pagila_user }}-{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" - pagila_db_secret_data: "{{ secret_query | selectattr('metadata.name', 'equalto', pagila_db_secret_name) | first }}" - secret_query: "{{ - query( - 'kubernetes.core.k8s', - namespace=dataplane_pagila_namespace, - kind='Secret', - kubeconfig=k8s_kubeconfig - )}}" + _db_secret_name: "{{ dataplane_pagila_user }}-{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" + _db_secret_data: "{{ secret_query | selectattr('metadata.name', 'equalto', _db_secret_name) | first }}" + secret_query: "{{ query( + 'kubernetes.core.k8s', + namespace=dataplane_pagila_namespace, + kind='Secret', + kubeconfig=k8s_kubeconfig) }}" - name: query pagila service data ansible.builtin.set_fact: - pagila_host: "{{ pagila_db_svc_data.status.loadBalancer.ingress[0].ip }}" + pagila_host: "{{ _db_svc_data.status.loadBalancer.ingress[0].ip }}" vars: - pagila_db_svc_name: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" - pagila_db_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', pagila_db_svc_name) | first }}" - service_query: "{{ - query( - 'kubernetes.core.k8s', - namespace=dataplane_pagila_namespace, - kind='Service', - kubeconfig=k8s_kubeconfig - )}}" + _db_svc_name: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" + _db_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', _db_svc_name) | first }}" + service_query: "{{ query( + 'kubernetes.core.k8s', + namespace=dataplane_pagila_namespace, + kind='Service', + kubeconfig=k8s_kubeconfig) }}" - name: wait for database port open ansible.builtin.wait_for: @@ -185,8 +176,8 @@ register: datadir changed_when: false - - block: - + - name: load pagila data + block: - name: fetch pagila schema file ansible.builtin.get_url: url: https://github.com/devrimgunduz/pagila/raw/master/pagila-schema.sql @@ -200,7 +191,8 @@ changed_when: false - name: concatenate pagila sql files - ansible.builtin.shell: "cat {{ datadir.path }}/schema.sql {{ datadir.path }}/data.sql > {{ datadir.path }}/pagila.sql" + ansible.builtin.shell: > + cat {{ datadir.path }}/schema.sql {{ datadir.path }}/data.sql > {{ datadir.path }}/pagila.sql - name: load pagila db data community.postgresql.postgresql_db: @@ -213,7 +205,6 @@ changed_when: false always: - - name: destroy data tempdir ansible.builtin.file: state: absent diff --git a/molecule/default/create.yml b/molecule/default/create.yml index d358dfc..7c724b5 100644 --- a/molecule/default/create.yml +++ b/molecule/default/create.yml @@ -6,11 +6,9 @@ gather_facts: false roles: - - nephelaiio.plugins pre_tasks: - - name: deploy kind cluster ansible.builtin.include_role: name: nephelaiio.kind @@ -18,7 +16,6 @@ - name: set helm release metadata when: helm_release is not defined block: - - name: query helm releases ansible.builtin.uri: url: https://api.github.com/repos/helm/helm/releases/latest @@ -35,13 +32,11 @@ helm_release: "{{ helm_release_query.json.tag_name }}" rescue: - - name: set kind release to failback ansible.builtin.set_fact: helm_release: v3.11.3 tasks: - - name: create temporary directory ansible.builtin.tempfile: state: directory @@ -51,7 +46,6 @@ - name: install helm binary block: - - name: download helm release package ansible.builtin.unarchive: src: "https://get.helm.sh/helm-{{ helm_release }}-linux-amd64.tar.gz" @@ -70,17 +64,16 @@ recurse: true use_regex: true patterns: - - '^helm$' + - "^helm$" register: helm_bin - name: copy helm executable ansible.builtin.copy: src: "{{ helm_bin.files[0].path }}" dest: "{{ k8s_helm_bin }}" - mode: '0755' + mode: "0755" always: - - name: cleanup temp files ansible.builtin.file: state: absent diff --git a/molecule/default/molecule.yml b/molecule/default/molecule.yml index 43b0eab..a301088 100644 --- a/molecule/default/molecule.yml +++ b/molecule/default/molecule.yml @@ -93,6 +93,9 @@ provisioner: servername: pagila dbname: "{{ dataplane_pagila_db }}" signaling: "{{ dataplane_pagila_signaling }}" + partitions: + - source: "payments.*" + sink: "payments" strimzi: connect: image: diff --git a/molecule/default/prepare.yml b/molecule/default/prepare.yml index 0c09e4f..c2957ad 100644 --- a/molecule/default/prepare.yml +++ b/molecule/default/prepare.yml @@ -6,17 +6,14 @@ gather_facts: false vars: - k8s_deploy: true k8s_verify: false roles: - - nephelaiio.plugins - nephelaiio.k8s tasks: - - name: install postgresql binary ansible.builtin.package: name: postgresql-client diff --git a/molecule/default/verify.yml b/molecule/default/verify.yml index 08b927d..967a9fc 100644 --- a/molecule/default/verify.yml +++ b/molecule/default/verify.yml @@ -6,7 +6,6 @@ gather_facts: false vars: - k8s_deploy: false k8s_verify: "{{ lookup('ansible.builtin.env', 'K8S_VERIFY', default='true') | bool }}" k8s_strimzi_verify: false @@ -16,52 +15,45 @@ verify_pagila_user: postgres roles: - - name: nephelaiio.plugins - name: nephelaiio.k8s tasks: - - - when: lookup('ansible.builtin.env', 'CDC_VERIFY', default='true') | bool + - name: verify dataplane sink database + when: lookup('ansible.builtin.env', 'CDC_VERIFY', default='true') | bool block: - - name: query pagila connection data ansible.builtin.set_fact: - pagila_user: "{{ pagila_db_secret_data.data.username | b64decode }}" - pagila_pass: "{{ pagila_db_secret_data.data.password | b64decode }}" + pagila_user: "{{ _db_secret_data.data.username | b64decode }}" + pagila_pass: "{{ _db_secret_data.data.password | b64decode }}" vars: - pagila_db_secret_name: "{{ verify_pagila_user }}-{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" - pagila_db_secret_data: "{{ secret_query | selectattr('metadata.name', 'equalto', pagila_db_secret_name) | first }}" - secret_query: "{{ - query( - 'kubernetes.core.k8s', - namespace=dataplane_pagila_namespace, - kind='Secret', - kubeconfig=k8s_kubeconfig - ) - }}" + _db_secret_name: "{{ verify_pagila_user }}-{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" + _db_secret_list: "{{ secret_query | selectattr('metadata.name', 'equalto', _db_secret_name) }}" + _db_secret_data: "{{ _db_secret_list | first }}" + secret_query: "{{ query( + 'kubernetes.core.k8s', + namespace=dataplane_pagila_namespace, + kind='Secret', + kubeconfig=k8s_kubeconfig) }}" - name: query pagila service data ansible.builtin.set_fact: - pagila_host: "{{ pagila_db_svc_data.status.loadBalancer.ingress[0].ip }}" + pagila_host: "{{ _db_svc_data.status.loadBalancer.ingress[0].ip }}" vars: - pagila_db_svc_name: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" - pagila_db_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', pagila_db_svc_name) | first }}" - service_query: "{{ - query( - 'kubernetes.core.k8s', - namespace=dataplane_pagila_namespace, - kind='Service', - kubeconfig=k8s_kubeconfig - ) - }}" + _db_svc_name: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}" + _db_svc_list: "{{ service_query | selectattr('metadata.name', 'equalto', _db_svc_name) }}" + _db_svc_data: "{{ _db_svc_list | first }}" + service_query: "{{ query( + 'kubernetes.core.k8s', + namespace=dataplane_pagila_namespace, + kind='Service', + kubeconfig=k8s_kubeconfig) }}" - name: query pagila tables community.postgresql.postgresql_query: db: "{{ dataplane_pagila_db }}" - query: >- - SELECT table_name FROM information_schema.tables - WHERE table_schema='public' AND table_type='BASE TABLE' + query: | + SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE' login_user: "{{ pagila_user }}" login_password: "{{ pagila_pass }}" login_host: "{{ pagila_host }}" @@ -69,7 +61,9 @@ - name: record source table data ansible.builtin.set_fact: - pagila_tables: "{{ pagila_table_query.query_result | map(attribute='table_name') | difference(['payment']) | list }}" + pagila_tables: "{{ _table_names | difference(['payment']) | list }}" + vars: + _table_names: "{{ pagila_table_query.query_result | map(attribute='table_name') }}" - name: query pagila table data community.postgresql.postgresql_query: @@ -87,41 +81,38 @@ - name: query warehouse connection data ansible.builtin.set_fact: - warehouse_user: "{{ warehouse_db_secret_data.data.username | b64decode }}" - warehouse_pass: "{{ warehouse_db_secret_data.data.password | b64decode }}" + warehouse_user: "{{ _db_secret_data.data.username | b64decode }}" + warehouse_pass: "{{ _db_secret_data.data.password | b64decode }}" vars: - warehouse_db_secret_name: "postgres-{{ dataplane_chart }}-warehouse" - warehouse_db_secret_data: "{{ secret_query | selectattr('metadata.name', 'equalto', warehouse_db_secret_name) | first }}" - secret_query: "{{ - query( - 'kubernetes.core.k8s', - namespace=dataplane_namespace, - kind='Secret', - kubeconfig=k8s_kubeconfig - ) - }}" + _db_secret_name: "postgres-{{ dataplane_chart }}-warehouse" + _db_secret_list: "{{ secret_query | selectattr('metadata.name', 'equalto', _db_secret_name) }}" + _db_secret_data: "{{ _db_secret_list | first }}" + secret_query: "{{ query( + 'kubernetes.core.k8s', + namespace=dataplane_namespace, + kind='Secret', + kubeconfig=k8s_kubeconfig) }}" - name: query warehouse service data ansible.builtin.set_fact: - warehouse_host: "{{ warehouse_db_svc_data.status.loadBalancer.ingress[0].ip }}" + warehouse_host: "{{ _db_svc_data.status.loadBalancer.ingress[0].ip }}" vars: - warehouse_db_svc_name: "{{ dataplane_chart }}-warehouse" - warehouse_db_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', warehouse_db_svc_name) | first }}" - service_query: "{{ - query( - 'kubernetes.core.k8s', - namespace=dataplane_namespace, - kind='Service', - kubeconfig=k8s_kubeconfig - ) - }}" - - - block: + _db_svc_name: "{{ dataplane_chart }}-warehouse" + _db_svc_list: "{{ service_query | selectattr('metadata.name', 'equalto', _db_svc_name) }}" + _db_svc_data: "{{ _db_svc_list | first }}" + service_query: "{{ query( + 'kubernetes.core.k8s', + namespace=dataplane_namespace, + kind='Service', + kubeconfig=k8s_kubeconfig) }}" + - name: verify sink database table names + block: - name: verify warehouse tables community.postgresql.postgresql_query: db: warehouse - query: "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'" + query: | + SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE' login_user: "{{ warehouse_user }}" login_password: "{{ warehouse_pass }}" login_host: "{{ warehouse_host }}" @@ -134,7 +125,6 @@ until: warehouse_tables_expected | difference(warehouse_tables_found) | length == 0 rescue: - - name: debug table mismatches ansible.builtin.debug: msg: "table diff=[{{ ', '.join(warehouse_tables_diff) }}]" @@ -168,7 +158,9 @@ - name: verify warehouse table data ansible.builtin.fail: - msg: "warehouse data for table {{ source_table }} is incomplete, {{ warehouse_table_records }}/{{ source_table_records }} found" + msg: >- + warehouse data for table {{ source_table }} is incomplete, + {{ warehouse_table_records }}/{{ source_table_records }} found" vars: warehouse_table_records: "{{ (warehouse_data[warehouse_table].query_result | list)[0].count }}" warehouse_table: "pagila_{{ item }}" @@ -179,97 +171,94 @@ loop: "{{ (pagila_data.keys() | list) }}" when: source_table_records > warehouse_table_records - - when: lookup('ansible.builtin.env', 'METABASE_VERIFY', default='true') | bool + - name: verify metabase app database + when: lookup('ansible.builtin.env', 'METABASE_VERIFY', default='true') | bool block: - - - name: query metabase service data - ansible.builtin.set_fact: - metabase_host: "{{ metabase_svc_data.status.loadBalancer.ingress[0].ip }}" - vars: - metabase_svc_name: "{{ dataplane_chart }}-metabase-app" - metabase_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', metabase_svc_name) | first }}" - service_query: "{{ - query( + - name: query metabase service data + ansible.builtin.set_fact: + metabase_host: "{{ _svc_data.status.loadBalancer.ingress[0].ip }}" + vars: + _svc_name: "{{ dataplane_chart }}-metabase-app" + _svc_list: "{{ service_query | selectattr('metadata.name', 'equalto', _svc_name) }}" + _svc_data: "{{ _svc_list | first }}" + service_query: "{{ query( 'kubernetes.core.k8s', namespace=dataplane_namespace, kind='Ingress', - kubeconfig=k8s_kubeconfig - ) - }}" + kubeconfig=k8s_kubeconfig) }}" - - name: query metabase connection data - ansible.builtin.set_fact: - metabase_user: "{{ dataplane_chart_values.metabase.admin.email }}" - metabase_pass: "{{ metabase_admin_secret_data.data.password | b64decode }}" - vars: - metabase_admin_secret_name: "{{ dataplane_chart }}-metabase-admin" - metabase_admin_secret_data: "{{ secret_query | selectattr('metadata.name', 'equalto', metabase_admin_secret_name) | first }}" - secret_query: "{{ - query( + - name: query metabase connection data + ansible.builtin.set_fact: + metabase_user: "{{ dataplane_chart_values.metabase.admin.email }}" + metabase_pass: "{{ _admin_secret_data.data.password | b64decode }}" + vars: + _admin_secret_name: "{{ dataplane_chart }}-metabase-admin" + _admin_secret_list: "{{ secret_query | selectattr('metadata.name', 'equalto', _admin_secret_name) }}" + _admin_secret_data: "{{ _admin_secret_list | first }}" + secret_query: "{{ query( 'kubernetes.core.k8s', namespace=dataplane_namespace, kind='Secret', - kubeconfig=k8s_kubeconfig - ) - }}" - - - name: debug metabase connection data - ansible.builtin.debug: - msg: "host={{ metabase_host }}, hostname={{ dataplane_metabase_hostname }}, user={{ metabase_user }}, password={{ metabase_pass }}" - ignore_errors: true - - - name: log in to metabase - ansible.builtin.uri: - url: "https://{{ metabase_host }}/api/session" - method: POST - body_format: json - body: - username: "{{ metabase_user }}" - password: "{{ metabase_pass }}" - headers: - Host: "{{ dataplane_metabase_hostname }}" - status_code: 200 - validate_certs: false - register: metabase_login_query - - - name: record metabase session id - ansible.builtin.set_fact: - metabase_session: "{{ metabase_login_query.json.id }}" - - - - name: verify metabase dbs - block: - - - name: query metabase dbs - ansible.builtin.uri: - url: "https://{{ metabase_host }}/api/database/?include=tables" - method: GET - headers: - Host: "{{ dataplane_metabase_hostname }}" - Content-Type: application/json - X-Metabase-Session: "{{ metabase_session }}" - status_code: 200 - validate_certs: false - register: metabase_db_query - retries: 10 - delay: 30 - until: - - metabase_db_query.json.data | rejectattr('name', 'equalto', 'dataplane') | length == 0 - - metabase_db_query.json.data | selectattr('name', 'equalto', 'dataplane') | length == 1 - - rescue: + kubeconfig=k8s_kubeconfig) }}" + + - name: debug metabase connection data + ansible.builtin.debug: + msg: > + host={{ metabase_host }}, + hostname={{ dataplane_metabase_hostname }}, + user={{ metabase_user }}, + password={{ metabase_pass }}" + ignore_errors: true + + - name: log in to metabase + ansible.builtin.uri: + url: "https://{{ metabase_host }}/api/session" + method: POST + body_format: json + body: + username: "{{ metabase_user }}" + password: "{{ metabase_pass }}" + headers: + Host: "{{ dataplane_metabase_hostname }}" + status_code: 200 + validate_certs: false + register: metabase_login_query + + - name: record metabase session id + ansible.builtin.set_fact: + metabase_session: "{{ metabase_login_query.json.id }}" + + - name: verify metabase app dbs + block: + - name: query metabase dbs + ansible.builtin.uri: + url: "https://{{ metabase_host }}/api/database/?include=tables" + method: GET + headers: + Host: "{{ dataplane_metabase_hostname }}" + Content-Type: application/json + X-Metabase-Session: "{{ metabase_session }}" + status_code: 200 + validate_certs: false + register: metabase_db_query + retries: 10 + delay: 30 + until: + - metabase_db_query.json.data | rejectattr('name', 'equalto', 'dataplane') | length == 0 + - metabase_db_query.json.data | selectattr('name', 'equalto', 'dataplane') | length == 1 - - name: verify sample db deletion - ansible.builtin.fail: - msg: "Sample databases have not been removed from Metabase" - vars: - extra_dbs: "{{ metabase_db_query.json.data | rejectattr('name', 'equalto', 'dataplane') }}" - when: extra_dbs | length > 0 - ignore_errors: true + rescue: + - name: verify sample db deletion + ansible.builtin.fail: + msg: "Sample databases have not been removed from Metabase" + vars: + extra_dbs: "{{ metabase_db_query.json.data | rejectattr('name', 'equalto', 'dataplane') }}" + when: extra_dbs | length > 0 + ignore_errors: true - - name: verify metabase db creation - ansible.builtin.fail: - msg: "Database 'dataplane' has not been registered to Metabase" - vars: - metabase_dbs: "{{ metabase_db_query.json.data | selectattr('name', 'equalto', 'dataplane') }}" - when: metabase_dbs | length < 1 + - name: verify metabase db creation + ansible.builtin.fail: + msg: "Database 'dataplane' has not been registered to Metabase" + vars: + metabase_dbs: "{{ metabase_db_query.json.data | selectattr('name', 'equalto', 'dataplane') }}" + when: metabase_dbs | length < 1 diff --git a/pyproject.toml b/pyproject.toml index be3078b..62799b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dataplane" -version = "0.1.10" +version = "0.1.11" description = "" authors = ["Teodoro Cook "]