Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Work on Demo preparation. #6

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 273 additions & 0 deletions demo_instructions.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
Topics/Schemas to create ahead of time
------------------------------------------
topic: 'submitted_orders'
key:
{
"type" : "record",
"name" : "orderId",
"namespace" : "com.celonis.example",
"fields": [
{"name": "id", "type": "string"}
]
}
value:
{
"type" : "record",
"name" : "order",
"namespace" : "com.celonis.example",
"fields": [
{"name": "client", "type": "string"},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "line_items", "type": {
"type": "array",
"items": {
"type": "record",
"name": "line_item",
"namespace": "com.celonis.example",
"fields": [
{"name": "item_id", "type": "string"},
{"name": "item_description", "type": "string"},
{"name": "item_price", "type": "double"},
{"name": "item_qty", "type": "int"}
]
}
} }
]
}
example record:
[
{
"key": {"id": "order01"},
"value": {
"client": "mark@celonis",
"created_at": 1647968442656,
"line_items": [
{
"item_id": "item001",
"item_description": "headphones",
"item_price": 300.00,
"item_qty": 1
},
{
"item_id": "item002",
"item_description": "tablet",
"item_price": 450.00,
"item_qty": 1
}
]
}
}
]
------------------------------------------------------
topic: 'order_item_cancellation'
key:
{
"type": "record",
"name": "cancellation_id",
"namespace": "com.celonis.example",
"fields": [
{
"name": "id",
"type": "string"
}
]
}
value:
{
"type": "record",
"name": "cancellation",
"namespace": "com.celonis.example",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "item_id", "type": "string"},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
example record:
[
{
"key": {"id": "cancellation_01"},
"value": {
"item_id": "item002",
"order_id": "order01",
"created_at": 1648073037184
}
}
]
-----------------------------------------------------
topic: 'payments_received'
key:
{
"type": "record",
"name": "payment_id",
"namespace": "com.celonis.example",
"fields": [
{"name": "id", "type": "string"}
]
}
value:
{
"type": "record",
"name": "payment_received",
"namespace": "com.celonis.example",
"fields": [
{"name": "order_id","type": "string"},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
example record:
[
{
"key": {"id": "payment_01"},
"value": {
"order_id": "order01",
"created_at": 1648073037184
}
}
]

-----------------------------------------
Create processor: [001] Order Items

SET defaults.topic.autocreate=true;
SET commit.interval.ms = 0;

INSERT INTO submitted_orders_items
SELECT STREAM
concat(line_item.item_id,'-',so._key.id) as _key.id,
line_item.*,
so._key.id as order_id,
so.client as client,
so.created_at as created_at
FROM submitted_orders so LATERAL so.line_items AS line_item
;

INSERT INTO audit_log
SELECT STREAM
order_id as _key.id,
o._key.id as order_id,
'CREATED' as status,
o.created_at as event_timestamp,
now() as audit_created_at
FROM submitted_orders o
-----------------------------------------------------------
Create Processor: [002] Waiting payment and cancellations

SET defaults.topic.autocreate=true;
SET commit.interval.ms = 0;

WITH cancellations_rk AS (
SELECT STREAM
concat(oi.item_id, '-' ,oi.order_id) as _key.id,
*
FROM order_item_cancellation oi
);


WITH cancellation_table AS (
SELECT TABLE * FROM cancellations_rk
);

INSERT INTO items_awaiting_payment
SELECT TABLE
so.*,
CASE
WHEN ISNULL(ct._key) THEN 'WAITING_PAYMENT'
ELSE 'CANCELLED'
END as status,
ct.created_at as cancelation_timestamp
FROM
submitted_orders_items so LEFT JOIN
cancellation_table ct ON ct._key.id = so._key.id;
---------------------------------------------------------
Create processor: [003] Invoice Requests

SET commit.interval.ms = 0;
SET defaults.topic.autocreate=true;
INSERT INTO invoice_creation
SELECT TABLE
id_aware_collect(i._value) as invoice_items,
SUM(
CASE
WHEN status != 'CANCELLED' THEN item_qty * item_price
ELSE 0.0
END
) as invoice_total,
now() as created_at,
order_id as order_id
FROM items_awaiting_payment i
GROUP BY order_id as order
;

INSERT INTO audit_log
SELECT STREAM
CASE
WHEN sizeof(distinct_items) = 1 AND distinct_items[0] = 'CANCELLED' THEN 'CANCELLED'
WHEN sizeof(distinct_items) = 1 AND distinct_items[0] = 'WAITING_PAYMENT' THEN 'INVOICE_CREATED'
ELSE 'INVOICE_UPDATED'
END as status,
order_id as order_id,
order_id as _key.id,
now() as event_timestamp
FROM (
SELECT STREAM
deduplicate_string(map_string_field(invoice_items, "status")) as distinct_items,
*
FROM
invoice_creation
);
--------------------------------------------------------



--------------------------------
Create Processor: [004] Payments Received

SET defaults.topic.autocreate=true;
SET commit.interval.ms = 0;


INSERT INTO receipt_request
SELECT TABLE
p.order_id as order_id,
i.*,
p.created_at as payment_date
FROM
invoice_creation i INNER JOIN
payments_received p ON i.order_id = p.order_id
;


INSERT INTO audit_log
SELECT STREAM
'PAYED' as status,
order_id as order_id,
payment_date as event_date,
order_id as _key.id
FROM receipt_request



-------------------------------
Create Processor: [005] Notify Payment Missing

SET defaults.topic.autocreate=true;

WITH creation AS(
SELECT STREAM * FROM audit_log WHERE status = 'CREATED'
);

WITH payed AS (
SELECT STREAM * FROM audit_log WHERE status = 'PAYED'
);

INSERT INTO notify_unpayed
SELECT STREAM
'UNPAYED' as warning,
*
FROM
creation LEFT JOIN payed
WITHIN 30s
WHERE ISNULL(payed._key)
;

2 changes: 1 addition & 1 deletion src/main/java/io/lenses/sql/udf/celsius_to_fahrenheit.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Value evaluate(Value value) throws UdfException {
}

private OptionalValue evaluateInternal(Double celsius) throws UdfException {
double fahrenheit = celsius * 1.8 + 32;
double fahrenheit = celsius;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you remove this in purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope.. must have happened :)

return OptionalValue.of(new DoubleValue(Math.round(fahrenheit * 100.0)/ 100.0));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.lenses.sql.udaf;
package io.lenses.sql.udf;

import io.lenses.sql.udf.FinalStep;
import io.lenses.sql.udf.UdfException;
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/io/lenses/sql/udf/deduplicate_string.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.lenses.sql.udf;

import io.lenses.sql.udf.datatype.DataType;
import io.lenses.sql.udf.datatype.LTStruct;
import io.lenses.sql.udf.value.*;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class deduplicate_string implements UserDefinedFunction1 {
@Override
public String name() {
return "deduplicate_string";
}

@Override
public String version() {
return "1.0";
}

@Override
public String owner() {
return "Lenses.io";
}

@Override
public DataType typeMapping(DataType dataType) throws UdfException {
return DataType.ltRepeated(DataType.ltString());
}

@Override
public Value evaluate(Value value) throws UdfException {
Stream<StringValue> stream = value
.asRepeatedValue()
.get()
.stream();
return new RepeatedValue<>(stream.map(Primitive::get)
.distinct()
.map(s -> new StringValue((String) s))
.collect(Collectors.toList()), DataType.ltString());
}
}
Loading