Skip to content

Commit

Permalink
Add ksql kafka message transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
pvannierop committed Apr 24, 2024
1 parent 7816250 commit 78f1709
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 0 deletions.
4 changes: 4 additions & 0 deletions etc/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ radar_jdbc_connector:
# Change the list of topics if you have dashboards that read other data or if you don't have certain topics available on your cluster.
topics: android_phone_relative_location, android_phone_battery_level, connect_fitbit_intraday_heart_rate, connect_fitbit_intraday_steps

kafka_data_transformer:
_install: false
_chart_version: 0.3.1

# --------------------------------------------------------- 20-ingestion.yaml ---------------------------------------------------------

radar_gateway:
Expand Down
9 changes: 9 additions & 0 deletions etc/base.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@
# # file by setting
# {{/* keystore: {{ readFile "certificate.pem" | b64enc | quote }} */}}

# If radar_grafana is used, please remove the Go template comments and yaml comments.
radar_grafana:
dashboards:
allprojects:
home:
json: {{ readFile "radar-grafana/dashboards/allprojects/home.json" | quote }}

# If radar_jdbc_connector is used, and data transformation is needed, please remove the Go template comments and yaml comments.
# Example transformation is exploding an array field in kafka message into multiple database rows.
# Make sure to defined the transformation logic in the queries.sql file as needed for the use case.
# kafka-data-transform:
# ksql:
# queries: |
# {{- readFile "../etc/kafka-data-transform/queries.sql" | nindent 8 }}

# If appserver is used, please remove the Go template comments and yaml comments.
# Again, like with management_portal, if you want to store the credentials in a
# less secure location, please encrypt the JSON file it and read it with sops,
Expand Down
80 changes: 80 additions & 0 deletions etc/kafka-data-transformer/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
SET 'auto.offset.reset' = 'earliest';

-- Define stream from questionnaire_response topic
-- This way it can be read by ksqlDB
CREATE STREAM questionnaire_response (
projectId VARCHAR KEY,
userId VARCHAR KEY,
sourceId VARCHAR KEY,
time DOUBLE,
timeCompleted DOUBLE,
timeNotification DOUBLE,
name VARCHAR,
version VARCHAR,
answers ARRAY<STRUCT<questionId VARCHAR, value STRUCT<int INT, string VARCHAR, double DOUBLE>, startTime DOUBLE, endTime DOUBLE>>
) WITH (
kafka_topic = 'questionnaire_response',
partitions = 3,
format = 'avro'
);

-- Convert all questionnaires answers to single observation records
CREATE STREAM outcomes_observations_unindexed
WITH (
kafka_topic = 'ksql_outcomes_observations_unindexed',
partitions = 3,
format = 'avro'
)
AS SELECT
EXPLODE(TRANSFORM(q.answers, a => a->questionId)) as `variable`,
FROM_UNIXTIME(CAST(q.time * 1000 AS BIGINT)) as `date`,
q.userId as `subject_id`,
CAST(NULL as TIMESTAMP) as `end_date`,
-- if present, take the int response, otherwise try to convert if the string response to a numeric value.
EXPLODE(TRANSFORM(q.answers, a => COALESCE(a->value->int, CAST(a->value->string as INT)))) as `value_numeric`,
EXPLODE(TRANSFORM(q.answers, a => a->value->string)) as `value_textual`
FROM questionnaire_response q
PARTITION BY q.userId
EMIT CHANGES;

-- Read select statements.
CREATE STREAM outcomes_observations_unindexed_select
WITH (
kafka_topic = 'ksql_outcomes_observations_unindexed',
partitions = 3,
format = 'avro'
)
AS SELECT
EXPLODE(SPLIT(`value_textual`, ',')) as `variable`,
`subject_id`,
`date`,
`end_date`,
1 as `value_numeric`,
CAST(NULL as VARCHAR) as `value_textual`
FROM outcomes_observations_unindexed
WHERE `variable` LIKE 'select:%'
AND `value_textual` IS NOT NULL
AND `value_textual` != ''
EMIT CHANGES;

-- Read the correct variable ID from the database and join it with the question ID.
-- Non-existing variables will not be joined so questionIds that are not used in H2O
-- will not be converted.
CREATE STREAM outcomes_observations
WITH (
kafka_topic = 'ksql_outcomes_observations',
partitions = 3,
format = 'avro'
)
AS SELECT
o.`variable`,
v.id as `variable_id`,
o.`subject_id`,
o.`date`,
o.`end_date`,
o.`value_numeric`,
o.`value_textual`
FROM outcomes_observations_unindexed o
-- this will force o.variable to be the record key
JOIN outcomes_variable v ON o.`variable` = v.name
EMIT CHANGES;
12 changes: 12 additions & 0 deletions etc/kafka-data-transformer/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
replicaCount: 1

kafka:
bootstrapServers: PLAINTEXT://cp-kafka-headless:9092

cp-schema-registry:
url: http://cp-schema-registry:8081

configurationOverrides:
log4j.loggers: org.apache.kafka.streams.processor.internals.StreamThread=WARN,org.apache.kafka.clients.producer.ProducerConfig=WARN,org.apache.kafka.clients.consumer.ConsumerConfig=WARN,org.apache.kafka.clients.admin.AdminClientConfig=WARN,io.confluent.kafka.serializers.KafkaAvroSerializerConfig=WARN,io.confluent.kafka.serializers.KafkaAvroDeserializerConfig=WARN,io.confluent.connect.avro.AvroConverterConfig=WARN,io.confluent.connect.avro.AvroDataConfig=WARN,org.apache.kafka.streams.StreamsConfig=WARN,io.confluent.ksql.util.KsqlConfig=WARN
confluent.telemetry.enabled: "false"
confluent.support.metrics.enable: "false"
10 changes: 10 additions & 0 deletions helmfile.d/20-dashboard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ releases:
- name: jdbc.url
value: {{ dig "jdbc" "url" (printf "jdbc:postgresql://timescaledb-postgresql-hl:5432/%s" .Values.timescaledb_db_name) .Values.data_dashboard_backend }}

- name: kafka-data-transformer
chart: cp-radar/cp-ksql-server
version: {{ .Values.kafka_data_transformer._chart_version }}
timeout: {{ add .Values.base_timeout .Values.kafka_data_transformer._extra_timeout }}
wait: false
installed: {{ .Values.kafka_data_transformer._install }}
values:
- "../etc/kafka_data_transformer/values.yaml"
- {{ .Values.kafka_data_transformer | toYaml | indent 8 | trim }}

- name: radar-jdbc-connector
chart: radar/radar-jdbc-connector
version: {{ .Values.radar_jdbc_connector._chart_version }}
Expand Down

0 comments on commit 78f1709

Please sign in to comment.