From df7c144e8115d8ce819aa94995063751fa1c7df2 Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Wed, 23 Mar 2022 23:09:15 +0000 Subject: [PATCH 1/3] Initial Work on Demo preparation. --- demo_instructions.txt | 225 ++++++++++++++++++ .../lenses/sql/udf/celsius_to_fahrenheit.java | 2 +- .../sql/{udaf => udf}/count_double.java | 2 +- .../io/lenses/sql/udf/deduplicate_string.java | 43 ++++ .../io/lenses/sql/udf/id_aware_collect.java | 126 ++++++++++ .../io/lenses/sql/udf/map_string_field.java | 45 ++++ .../io/lenses/sql/udaf/count_doubleTest.java | 1 + 7 files changed, 442 insertions(+), 2 deletions(-) create mode 100644 demo_instructions.txt rename src/main/java/io/lenses/sql/{udaf => udf}/count_double.java (98%) create mode 100644 src/main/java/io/lenses/sql/udf/deduplicate_string.java create mode 100644 src/main/java/io/lenses/sql/udf/id_aware_collect.java create mode 100644 src/main/java/io/lenses/sql/udf/map_string_field.java diff --git a/demo_instructions.txt b/demo_instructions.txt new file mode 100644 index 0000000..b064881 --- /dev/null +++ b/demo_instructions.txt @@ -0,0 +1,225 @@ +Topics/Schemas to create ahead of time +------------------------------------------ +topic: 'submitted_orders' +key: +{ + "type" : "record", + "name" : "orderId", + "namespace" : "com.celonis.example", + "fields": [ + {"name": "id", "type": "string"} + ] +} +value: +{ + "type" : "record", + "name" : "order", + "namespace" : "com.celonis.example", + "fields": [ + {"name": "client", "type": "string"}, + {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}, + {"name": "line_items", "type": { + "type": "array", + "items": { + "type": "record", + "name": "line_item", + "namespace": "com.celonis.example", + "fields": [ + {"name": "item_id", "type": "string"}, + {"name": "item_description", "type": "string"}, + {"name": "item_price", "type": "double"}, + {"name": "item_qty", "type": "int"} + ] + } + } } + ] +} +example record: +[ + { + "key": {"id": "order01"}, + "value": { + "client": "mark@celonis", + "created_at": 1647968442656, + "line_items": [ + { + "item_id": "item001", + "item_description": "headphones", + "item_price": 300.00, + "item_qty": 1 + }, + { + "item_id": "item002", + "item_description": "tablet", + "item_price": 450.00, + "item_qty": 1 + } + ] + } + } +] +------------------------------------------------------ +topic: 'order_item_cancellation' +key: +{ + "type": "record", + "name": "cancellation_id", + "namespace": "com.celonis.example", + "fields": [ + { + "name": "id", + "type": "string" + } + ] +} +value: +{ + "type": "record", + "name": "cancellation", + "namespace": "com.celonis.example", + "fields": [ + {"name": "order_id", "type": "string"}, + {"name": "item_id", "type": "string"}, + {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}} + ] +} +example record: +[ + { + "key": {"id": "cancellation_01"}, + "value": { + "item_id": "item002", + "order_id": "order01", + "created_at": 1648073037184 + } + } +] +----------------------------------------------------- +topic: 'payments_received' +key: +{ + "type": "record", + "name": "payment_id", + "namespace": "com.celonis.example", + "fields": [ + {"name": "id", "type": "string"} + ] +} +value: + { + "type": "record", + "name": "payment_received", + "namespace": "com.celonis.example", + "fields": [ + {"name": "order_id","type": "string"} + ] + } +example record: +[ + { + "key": {"id": "payment_01"}, + "value": { + "order_id": "order01" + } + } +] + +----------------------------------------- +Create processor: [001] Order Items + +SET defaults.topic.autocreate=true; + +INSERT INTO submitted_orders_items +SELECT STREAM + concat(line_item.item_id,'-',so._key.id) as _key.id, + line_item.*, + so._key.id as order_id, + so.client as client, + so.created_at as created_at +FROM submitted_orders so LATERAL so.line_items AS line_item +; + +INSERT INTO audit_log +SELECT STREAM + UUID() as _key.id, + o._key.id as order_id, + 'CREATED' as status, + o.created_at as order_created_at, + now() as audit_created_at +FROM submitted_orders o +----------------------------------------------------------- +Create Processor: [002] Waiting payment and cancellations + +SET defaults.topic.autocreate=true; +WITH cancellations_rk AS ( + SELECT STREAM + concat(oi.item_id, '-' ,oi.order_id) as _key.id, + * + FROM order_item_cancellation oi +); + + +WITH cancellation_table AS ( + SELECT TABLE * FROM cancellations_rk +); + +INSERT INTO items_awaiting_payment +SELECT TABLE + so.*, + CASE + WHEN ISNULL(ct._key) THEN 'WAITING_PAYMENT' + ELSE 'CANCELLED' + END as status +FROM + submitted_orders_items so LEFT JOIN + cancellation_table ct ON ct._key.id = so._key.id; +--------------------------------------------------------- +Create processor: [003] Invoice Requests + +SET defaults.topic.autocreate=true; +INSERT INTO invoice_creation +SELECT TABLE + id_aware_collect(i._value) as invoice_items, + SUM( + CASE + WHEN status != 'CANCELLED' THEN item_qty * item_price + ELSE 0.0 + END + ) as invoice_total, + now() as created_at, + order_id as order_id +FROM items_awaiting_payment i +GROUP BY order_id as order +; + +INSERT INTO audit_log +SELECT STREAM + CASE + WHEN sizeof(distinct_items) = 1 AND distinct_items[0] = 'CANCELLED' THEN 'CANCELLED' + ELSE 'WAITING_PAYMENT' + END as status, + order_id as order_id, + now() as audit_created_at +FROM ( + SELECT STREAM + deduplicate_string(map_string_field(invoice_items, "status")) as distinct_items, + * + FROM + invoice_creation +); +-------------------------------------------------------- + + + +-------------------------------- +Create Processor: [O2C] 003 Create Receipt (TODO) + +SET defaults.topic.autocreate=true; + +INSERT INTO receipt_request +SELECT TABLE * + FROM + invoice_requests invoice LEFT JOIN + payments_received payments ON invoice._key = payments.order_id +; + diff --git a/src/main/java/io/lenses/sql/udf/celsius_to_fahrenheit.java b/src/main/java/io/lenses/sql/udf/celsius_to_fahrenheit.java index 9bc8db7..306796a 100644 --- a/src/main/java/io/lenses/sql/udf/celsius_to_fahrenheit.java +++ b/src/main/java/io/lenses/sql/udf/celsius_to_fahrenheit.java @@ -49,7 +49,7 @@ public Value evaluate(Value value) throws UdfException { } private OptionalValue evaluateInternal(Double celsius) throws UdfException { - double fahrenheit = celsius * 1.8 + 32; + double fahrenheit = celsius; return OptionalValue.of(new DoubleValue(Math.round(fahrenheit * 100.0)/ 100.0)); } diff --git a/src/main/java/io/lenses/sql/udaf/count_double.java b/src/main/java/io/lenses/sql/udf/count_double.java similarity index 98% rename from src/main/java/io/lenses/sql/udaf/count_double.java rename to src/main/java/io/lenses/sql/udf/count_double.java index 41dc6a0..236a680 100644 --- a/src/main/java/io/lenses/sql/udaf/count_double.java +++ b/src/main/java/io/lenses/sql/udf/count_double.java @@ -1,4 +1,4 @@ -package io.lenses.sql.udaf; +package io.lenses.sql.udf; import io.lenses.sql.udf.FinalStep; import io.lenses.sql.udf.UdfException; diff --git a/src/main/java/io/lenses/sql/udf/deduplicate_string.java b/src/main/java/io/lenses/sql/udf/deduplicate_string.java new file mode 100644 index 0000000..d5932ec --- /dev/null +++ b/src/main/java/io/lenses/sql/udf/deduplicate_string.java @@ -0,0 +1,43 @@ +package io.lenses.sql.udf; + +import io.lenses.sql.udf.datatype.DataType; +import io.lenses.sql.udf.datatype.LTStruct; +import io.lenses.sql.udf.value.*; + +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class deduplicate_string implements UserDefinedFunction1 { + @Override + public String name() { + return "deduplicate_string"; + } + + @Override + public String version() { + return "1.0"; + } + + @Override + public String owner() { + return "Lenses.io"; + } + + @Override + public DataType typeMapping(DataType dataType) throws UdfException { + return DataType.ltRepeated(DataType.ltString()); + } + + @Override + public Value evaluate(Value value) throws UdfException { + Stream stream = value + .asRepeatedValue() + .get() + .stream(); + return new RepeatedValue<>(stream.map(Primitive::get) + .distinct() + .map(s -> new StringValue((String) s)) + .collect(Collectors.toList()), DataType.ltString()); + } +} \ No newline at end of file diff --git a/src/main/java/io/lenses/sql/udf/id_aware_collect.java b/src/main/java/io/lenses/sql/udf/id_aware_collect.java new file mode 100644 index 0000000..f1a23d5 --- /dev/null +++ b/src/main/java/io/lenses/sql/udf/id_aware_collect.java @@ -0,0 +1,126 @@ +package io.lenses.sql.udf; + +import io.lenses.sql.udf.FinalStep; +import io.lenses.sql.udf.UdfException; +import io.lenses.sql.udf.UserDefinedAggregateFunction; +import io.lenses.sql.udf.UserDefinedTableAggregateFunction; +import io.lenses.sql.udf.datatype.DataType; +import io.lenses.sql.udf.datatype.LTStruct; +import io.lenses.sql.udf.value.LongValue; +import io.lenses.sql.udf.value.RepeatedValue; +import io.lenses.sql.udf.value.StructValue; +import io.lenses.sql.udf.value.Value; + +import java.util.*; +import java.util.stream.Collectors; + +public class id_aware_collect implements UserDefinedTableAggregateFunction { + LTStruct innerType = DataType.ltStruct( + new HashMap() {{ + put("item_id", DataType.ltString()); + put("item_description", DataType.ltString()); + put("item_price", DataType.ltDouble()); + put("item_qty", DataType.ltInt()); + put("order_id", DataType.ltString()); + put("client", DataType.ltString()); + put("created_at", DataType.ltTimestampMillis()); + put("status", DataType.ltString()); + }} + ); + + @Override + public DataType typer(DataType dataType) throws UdfException { + if (dataType.isStruct()) { + LTStruct struct = (LTStruct) dataType; + if (struct.schema.containsKey("item_id") && struct.schema.containsKey("status")) { + innerType = struct; + return DataType.ltRepeated(dataType); + } else { + throw new UdfException("Expecting argument of type struct with a field `item_id` and a field 'status'"); + } + } else { + throw new UdfException("Expecting argument of type struct."); + } + } + + private RepeatedValue deduplicate(List structValues) throws UdfException { + Collections.reverse(structValues); + List newList = new ArrayList<>(); + List idsVisited = new ArrayList<>(); + for (StructValue item : structValues) { + String itemId = item.getField("item_id").toStringValue().get(); + if (!idsVisited.contains(itemId)) { + idsVisited.add(itemId); + newList.add(item); + } + } + Collections.reverse(newList); + return new RepeatedValue(newList, innerType); + } + + @Override + public Value empty() { + return RepeatedValue.empty(innerType); + } + + @Override + public void init(Value[] args) { + } + + @Override + public Value add(Value aggregateKey, Value aggregatedValue, Value toBeAdded) throws UdfException { + List elements = aggregatedValue.asRepeatedValue().get(); + ArrayList array = new ArrayList(elements); + array.add(toBeAdded.asStructValue()); + return deduplicate(array); + } + + @Override + public Value merge(Value aggregtedKey, Value first, Value second) throws UdfException { + List firstElements = first.asRepeatedValue().get(); + List secondElements = second.asRepeatedValue().get(); + List all = new ArrayList<>(firstElements); + all.addAll(secondElements); + return deduplicate(all); + } + + @Override + public Optional finalStep() { + return Optional.empty(); + } + + @Override + public String name() { + return "id_aware_collect"; + } + + @Override + public String version() { + return "1.0"; + } + + @Override + public String owner() { + return "Lenses.io"; + } + + @Override + public Value subtract(Value value, Value value1, Value value2) throws UdfException { + String itemId = value2.asStructValue().getField("item_id").toStringValue().get(); + List values = value1.asRepeatedValue().get(); + List result = values.stream().filter(f -> { + boolean keep = true; + try { + keep = !f + .getField("item_id") + .toStringValue() + .get() + .equals(itemId); + } catch (UdfException e) { + e.printStackTrace(); + } + return keep; + }).collect(Collectors.toList()); + return new RepeatedValue(result, innerType); + } +} \ No newline at end of file diff --git a/src/main/java/io/lenses/sql/udf/map_string_field.java b/src/main/java/io/lenses/sql/udf/map_string_field.java new file mode 100644 index 0000000..fe34151 --- /dev/null +++ b/src/main/java/io/lenses/sql/udf/map_string_field.java @@ -0,0 +1,45 @@ +package io.lenses.sql.udf; + +import io.lenses.sql.udf.datatype.DataType; +import io.lenses.sql.udf.datatype.LTOptional; +import io.lenses.sql.udf.datatype.LTRepeated; +import io.lenses.sql.udf.datatype.LTString; +import io.lenses.sql.udf.value.*; + +import javax.xml.crypto.Data; +import java.util.ArrayList; +import java.util.List; + +public class map_string_field implements UserDefinedFunction2 { + + @Override + public String name() { + return "map_string_field"; + } + + @Override + public String version() { + return "1.0"; + } + + @Override + public String owner() { + return "Lenses.io"; + } + + @Override + public DataType typeMapping(DataType dataType, DataType dataType1) throws UdfException { + return DataType.ltRepeated(DataType.ltString()); + } + + @Override + public Value evaluate(Value value, Value value1) throws UdfException { + String property = value1.toStringValue().get(); + List values = value.asRepeatedValue().get(); + List result = new ArrayList<>(); + for (StructValue v : values) { + result.add(v.getField(property).toStringValue()); + } + return new RepeatedValue<>(result, DataType.ltString()); + } +} diff --git a/src/test/java/io/lenses/sql/udaf/count_doubleTest.java b/src/test/java/io/lenses/sql/udaf/count_doubleTest.java index 0287712..4b6d173 100644 --- a/src/test/java/io/lenses/sql/udaf/count_doubleTest.java +++ b/src/test/java/io/lenses/sql/udaf/count_doubleTest.java @@ -1,6 +1,7 @@ package io.lenses.sql.udaf; import io.lenses.sql.udf.UdfException; +import io.lenses.sql.udf.count_double; import io.lenses.sql.udf.datatype.DataType; import io.lenses.sql.udf.value.LongValue; import io.lenses.sql.udf.value.Value; From ed7b427330faa7182aedf9f187df068846276b95 Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Thu, 24 Mar 2022 12:21:21 +0000 Subject: [PATCH 2/3] WIP --- demo_instructions.txt | 50 ++++++++++++++----- .../io/lenses/sql/udf/id_aware_collect.java | 5 ++ 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/demo_instructions.txt b/demo_instructions.txt index b064881..44d86e5 100644 --- a/demo_instructions.txt +++ b/demo_instructions.txt @@ -111,7 +111,8 @@ value: "name": "payment_received", "namespace": "com.celonis.example", "fields": [ - {"name": "order_id","type": "string"} + {"name": "order_id","type": "string"}, + {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}} ] } example record: @@ -119,7 +120,8 @@ example record: { "key": {"id": "payment_01"}, "value": { - "order_id": "order01" + "order_id": "order01", + "created_at": 1648073037184 } } ] @@ -128,6 +130,7 @@ example record: Create processor: [001] Order Items SET defaults.topic.autocreate=true; +SET commit.interval.ms = 0; INSERT INTO submitted_orders_items SELECT STREAM @@ -141,16 +144,18 @@ FROM submitted_orders so LATERAL so.line_items AS line_item INSERT INTO audit_log SELECT STREAM - UUID() as _key.id, + order_id as _key.id, o._key.id as order_id, 'CREATED' as status, - o.created_at as order_created_at, + o.created_at as event_timestamp, now() as audit_created_at FROM submitted_orders o ----------------------------------------------------------- Create Processor: [002] Waiting payment and cancellations SET defaults.topic.autocreate=true; +SET commit.interval.ms = 0; + WITH cancellations_rk AS ( SELECT STREAM concat(oi.item_id, '-' ,oi.order_id) as _key.id, @@ -169,13 +174,15 @@ SELECT TABLE CASE WHEN ISNULL(ct._key) THEN 'WAITING_PAYMENT' ELSE 'CANCELLED' - END as status + END as status, + ct.created_at as cancelation_timestamp FROM submitted_orders_items so LEFT JOIN cancellation_table ct ON ct._key.id = so._key.id; --------------------------------------------------------- Create processor: [003] Invoice Requests +SET commit.interval.ms = 0; SET defaults.topic.autocreate=true; INSERT INTO invoice_creation SELECT TABLE @@ -196,10 +203,12 @@ INSERT INTO audit_log SELECT STREAM CASE WHEN sizeof(distinct_items) = 1 AND distinct_items[0] = 'CANCELLED' THEN 'CANCELLED' - ELSE 'WAITING_PAYMENT' + WHEN sizeof(distinct_items) = 1 AND distinct_items[0] = 'WAITING_PAYMENT' THEN 'INVOICE_CREATED' + ELSE 'INVOICE_UPDATED' END as status, order_id as order_id, - now() as audit_created_at + order_id as _key.id, + now() as event_timestamp FROM ( SELECT STREAM deduplicate_string(map_string_field(invoice_items, "status")) as distinct_items, @@ -212,14 +221,31 @@ FROM ( -------------------------------- -Create Processor: [O2C] 003 Create Receipt (TODO) +Create Processor: [004] Payments Received SET defaults.topic.autocreate=true; +SET commit.interval.ms = 0; + INSERT INTO receipt_request -SELECT TABLE * - FROM - invoice_requests invoice LEFT JOIN - payments_received payments ON invoice._key = payments.order_id +SELECT TABLE + p.order_id as order_id, + i.*, + p.created_at as payment_date +FROM + invoice_creation i INNER JOIN + payments_received p ON i.order_id = p.order_id ; + +INSERT INTO audit_log +SELECT STREAM + 'PAYED' as status, + order_id as order_id, + payment_date as event_date, + order_id as _key.id +FROM receipt_request + + + + diff --git a/src/main/java/io/lenses/sql/udf/id_aware_collect.java b/src/main/java/io/lenses/sql/udf/id_aware_collect.java index f1a23d5..7f93620 100644 --- a/src/main/java/io/lenses/sql/udf/id_aware_collect.java +++ b/src/main/java/io/lenses/sql/udf/id_aware_collect.java @@ -25,6 +25,11 @@ public class id_aware_collect implements UserDefinedTableAggregateFunction { put("client", DataType.ltString()); put("created_at", DataType.ltTimestampMillis()); put("status", DataType.ltString()); + try { + put("cancelation_timestamp", DataType.ltOptional(DataType.ltTimestampMillis())); + } catch (UdfException e) { + e.printStackTrace(); + } }} ); From be6f2064978eb3a9ede932b4794896249f1a92f9 Mon Sep 17 00:00:00 2001 From: cmcmteixeira Date: Wed, 29 Jun 2022 11:55:21 +0100 Subject: [PATCH 3/3] demo instructions. --- demo_instructions.txt | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/demo_instructions.txt b/demo_instructions.txt index 44d86e5..1aefdef 100644 --- a/demo_instructions.txt +++ b/demo_instructions.txt @@ -248,4 +248,26 @@ FROM receipt_request +------------------------------- +Create Processor: [005] Notify Payment Missing + +SET defaults.topic.autocreate=true; + +WITH creation AS( + SELECT STREAM * FROM audit_log WHERE status = 'CREATED' +); + +WITH payed AS ( + SELECT STREAM * FROM audit_log WHERE status = 'PAYED' +); + +INSERT INTO notify_unpayed +SELECT STREAM + 'UNPAYED' as warning, + * +FROM + creation LEFT JOIN payed +WITHIN 30s +WHERE ISNULL(payed._key) +;