Skip to content

Latest commit

 

History

History
95 lines (84 loc) · 3.85 KB

README.md

File metadata and controls

95 lines (84 loc) · 3.85 KB

Apache Flink Engine

This the implementation of the Engine contract of Open Data Fabric using the Apache Flink stream processing framework. It is currently in use in kamu-cli data management tool.

Features

Flink engine currently provides the most rich functionality for aggregating and joining event streams.

Windowed Aggregations

Example:

SELECT
    TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
    symbol as symbol,
    min(price) as `min`,
    max(price) as `max`
FROM `in`
GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol

Stream-To-Stream Joins

Example:

SELECT
  o.event_time as order_time,
  o.order_id,
  o.quantity as order_quantity,
  CAST(s.event_time as TIMESTAMP) as shipped_time,
  COALESCE(s.num_shipped, 0) as shipped_quantity
FROM
  orders as o
LEFT JOIN shipments as s
ON
  o.order_id = s.order_id
  AND s.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '2' DAY

Temporal Table Joins

Example:

SELECT
  t.event_time,
  t.symbol,
  p.volume as volume,
  t.price as current_price,
  p.volume * t.price as current_value
FROM tickers as t
JOIN portfolio FOR SYSTEM_TIME AS OF t.event_time AS p
WHERE t.symbol = p.symbol

Example (Uising old LATERAL TABLE syntax):

SELECT
  t.event_time,
  t.symbol,
  p.volume as volume,
  t.price as current_price,
  p.volume * t.price as current_value
FROM
  tickers as t,
  LATERAL TABLE (portfolio(t.event_time)) AS p
WHERE t.symbol = p.symbol

Known Issues

  • Takes a long time to start up which is hurting the user experience
  • SQL parser is very sensitive to keywords and requires a lot of quoting
  • Defaults to using deprecated and no longer supported by most libraries int96 type for timestamps when encoding to Parquet FLINK-25565
    • Using custom Parquet reader/writer for now that decodes/encodes int64 timestamps
  • DECIMAL format is encoded into Parquet as byte_array instead of a fixed_len_byte_array as prescribed by Parquet spec - this results in Apache Spark failing to read values correctly
  • Does not support reading/writing generic data from Parquet without having a hardcoded schema
    • We implement custom schema converter for reading
    • We have to convert data to Avro and then to Parquet upon saving
  • Does not save watermarks in savepoints FLINK-5601
    • Manually saving watermarks as part of the checkpoint
  • Does not support month/quarter/year tumbling windows FLINK-9740
  • Does not support nested data in Parquet FLINK-XXX
  • Does not have "process available inputs and stop with savepoint" execution mode
    • We patch some internal classes in order to flush the data and trigger the savepoint when there is no more data to consume
  • Does not (easily) support resuming from savepoint programmatically
    • Have to resume from savepoint using CLI rather than in a program
  • Does not support Apache Arrow format FLINK-10929
  • Does not support late data handling in SQL API FLINK-XXX
  • Does not support temporal table joins without or with compound primary key FLINK-XXX
  • Joins using FOR SYSTEM_TIME AS OF syntax require us to manually set primary keys

Past Issues

  • DECIMAL data type is broken in Parquet FLINK-17804
    • Pull request created flink#12768 and waiting for approval
    • Using our forked version for now

Developing

See the Developer Guide