From 78f17090ae9cae6650cd037c024c3531ff219008 Mon Sep 17 00:00:00 2001 From: Pim van Nierop Date: Wed, 24 Apr 2024 10:29:01 +0200 Subject: [PATCH] Add ksql kafka message transformer --- etc/base.yaml | 4 ++ etc/base.yaml.gotmpl | 9 +++ etc/kafka-data-transformer/queries.sql | 80 ++++++++++++++++++++++++++ etc/kafka-data-transformer/values.yaml | 12 ++++ helmfile.d/20-dashboard.yaml | 10 ++++ 5 files changed, 115 insertions(+) create mode 100644 etc/kafka-data-transformer/queries.sql create mode 100644 etc/kafka-data-transformer/values.yaml diff --git a/etc/base.yaml b/etc/base.yaml index 4d9d0d96..8bc089b9 100644 --- a/etc/base.yaml +++ b/etc/base.yaml @@ -323,6 +323,10 @@ radar_jdbc_connector: # Change the list of topics if you have dashboards that read other data or if you don't have certain topics available on your cluster. topics: android_phone_relative_location, android_phone_battery_level, connect_fitbit_intraday_heart_rate, connect_fitbit_intraday_steps +kafka_data_transformer: + _install: false + _chart_version: 0.3.1 + # --------------------------------------------------------- 20-ingestion.yaml --------------------------------------------------------- radar_gateway: diff --git a/etc/base.yaml.gotmpl b/etc/base.yaml.gotmpl index 0bee5a73..b0de346b 100644 --- a/etc/base.yaml.gotmpl +++ b/etc/base.yaml.gotmpl @@ -12,12 +12,21 @@ # # file by setting # {{/* keystore: {{ readFile "certificate.pem" | b64enc | quote }} */}} +# If radar_grafana is used, please remove the Go template comments and yaml comments. radar_grafana: dashboards: allprojects: home: json: {{ readFile "radar-grafana/dashboards/allprojects/home.json" | quote }} +# If radar_jdbc_connector is used, and data transformation is needed, please remove the Go template comments and yaml comments. +# Example transformation is exploding an array field in kafka message into multiple database rows. +# Make sure to defined the transformation logic in the queries.sql file as needed for the use case. +# kafka-data-transform: +# ksql: +# queries: | +# {{- readFile "../etc/kafka-data-transform/queries.sql" | nindent 8 }} + # If appserver is used, please remove the Go template comments and yaml comments. # Again, like with management_portal, if you want to store the credentials in a # less secure location, please encrypt the JSON file it and read it with sops, diff --git a/etc/kafka-data-transformer/queries.sql b/etc/kafka-data-transformer/queries.sql new file mode 100644 index 00000000..cfab87e4 --- /dev/null +++ b/etc/kafka-data-transformer/queries.sql @@ -0,0 +1,80 @@ +SET 'auto.offset.reset' = 'earliest'; + +-- Define stream from questionnaire_response topic +-- This way it can be read by ksqlDB +CREATE STREAM questionnaire_response ( + projectId VARCHAR KEY, + userId VARCHAR KEY, + sourceId VARCHAR KEY, + time DOUBLE, + timeCompleted DOUBLE, + timeNotification DOUBLE, + name VARCHAR, + version VARCHAR, + answers ARRAY, startTime DOUBLE, endTime DOUBLE>> +) WITH ( + kafka_topic = 'questionnaire_response', + partitions = 3, + format = 'avro' +); + +-- Convert all questionnaires answers to single observation records +CREATE STREAM outcomes_observations_unindexed +WITH ( + kafka_topic = 'ksql_outcomes_observations_unindexed', + partitions = 3, + format = 'avro' +) +AS SELECT + EXPLODE(TRANSFORM(q.answers, a => a->questionId)) as `variable`, + FROM_UNIXTIME(CAST(q.time * 1000 AS BIGINT)) as `date`, + q.userId as `subject_id`, + CAST(NULL as TIMESTAMP) as `end_date`, + -- if present, take the int response, otherwise try to convert if the string response to a numeric value. + EXPLODE(TRANSFORM(q.answers, a => COALESCE(a->value->int, CAST(a->value->string as INT)))) as `value_numeric`, + EXPLODE(TRANSFORM(q.answers, a => a->value->string)) as `value_textual` +FROM questionnaire_response q +PARTITION BY q.userId +EMIT CHANGES; + +-- Read select statements. +CREATE STREAM outcomes_observations_unindexed_select +WITH ( + kafka_topic = 'ksql_outcomes_observations_unindexed', + partitions = 3, + format = 'avro' +) +AS SELECT + EXPLODE(SPLIT(`value_textual`, ',')) as `variable`, + `subject_id`, + `date`, + `end_date`, + 1 as `value_numeric`, + CAST(NULL as VARCHAR) as `value_textual` +FROM outcomes_observations_unindexed +WHERE `variable` LIKE 'select:%' + AND `value_textual` IS NOT NULL + AND `value_textual` != '' +EMIT CHANGES; + +-- Read the correct variable ID from the database and join it with the question ID. +-- Non-existing variables will not be joined so questionIds that are not used in H2O +-- will not be converted. +CREATE STREAM outcomes_observations +WITH ( + kafka_topic = 'ksql_outcomes_observations', + partitions = 3, + format = 'avro' +) +AS SELECT + o.`variable`, + v.id as `variable_id`, + o.`subject_id`, + o.`date`, + o.`end_date`, + o.`value_numeric`, + o.`value_textual` +FROM outcomes_observations_unindexed o +-- this will force o.variable to be the record key +JOIN outcomes_variable v ON o.`variable` = v.name +EMIT CHANGES; diff --git a/etc/kafka-data-transformer/values.yaml b/etc/kafka-data-transformer/values.yaml new file mode 100644 index 00000000..516e9d77 --- /dev/null +++ b/etc/kafka-data-transformer/values.yaml @@ -0,0 +1,12 @@ +replicaCount: 1 + +kafka: + bootstrapServers: PLAINTEXT://cp-kafka-headless:9092 + +cp-schema-registry: + url: http://cp-schema-registry:8081 + +configurationOverrides: + log4j.loggers: org.apache.kafka.streams.processor.internals.StreamThread=WARN,org.apache.kafka.clients.producer.ProducerConfig=WARN,org.apache.kafka.clients.consumer.ConsumerConfig=WARN,org.apache.kafka.clients.admin.AdminClientConfig=WARN,io.confluent.kafka.serializers.KafkaAvroSerializerConfig=WARN,io.confluent.kafka.serializers.KafkaAvroDeserializerConfig=WARN,io.confluent.connect.avro.AvroConverterConfig=WARN,io.confluent.connect.avro.AvroDataConfig=WARN,org.apache.kafka.streams.StreamsConfig=WARN,io.confluent.ksql.util.KsqlConfig=WARN + confluent.telemetry.enabled: "false" + confluent.support.metrics.enable: "false" diff --git a/helmfile.d/20-dashboard.yaml b/helmfile.d/20-dashboard.yaml index ddafd75a..7e118cfa 100644 --- a/helmfile.d/20-dashboard.yaml +++ b/helmfile.d/20-dashboard.yaml @@ -85,6 +85,16 @@ releases: - name: jdbc.url value: {{ dig "jdbc" "url" (printf "jdbc:postgresql://timescaledb-postgresql-hl:5432/%s" .Values.timescaledb_db_name) .Values.data_dashboard_backend }} + - name: kafka-data-transformer + chart: cp-radar/cp-ksql-server + version: {{ .Values.kafka_data_transformer._chart_version }} + timeout: {{ add .Values.base_timeout .Values.kafka_data_transformer._extra_timeout }} + wait: false + installed: {{ .Values.kafka_data_transformer._install }} + values: + - "../etc/kafka_data_transformer/values.yaml" + - {{ .Values.kafka_data_transformer | toYaml | indent 8 | trim }} + - name: radar-jdbc-connector chart: radar/radar-jdbc-connector version: {{ .Values.radar_jdbc_connector._chart_version }}