From 687771f95f4ecaccfbf9317b15860d0c1852a617 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Mon, 21 Oct 2024 03:04:12 -0700 Subject: [PATCH] checkpointing final (#48) * Adding slate db support * Working checkpointing of Windows to SlateDB --- Cargo.lock | 1520 ++++++++++++++--- Cargo.toml | 1 + README.md | 12 + crates/common/Cargo.toml | 1 + crates/common/src/error/mod.rs | 2 + crates/core/Cargo.toml | 4 + crates/core/src/context.rs | 14 +- .../src/datasource/kafka/kafka_stream_read.rs | 169 +- crates/core/src/datastream.rs | 142 +- .../continuous/grouped_window_agg_stream.rs | 201 ++- .../continuous/streaming_window.rs | 25 + crates/core/src/state_backend/mod.rs | 1 + crates/core/src/state_backend/slatedb.rs | 92 + crates/core/src/utils/serialization.rs | 31 +- crates/orchestrator/src/orchestrator.rs | 74 +- examples/Cargo.toml | 10 +- examples/examples/emit_measurements.rs | 13 +- examples/examples/simple_aggregation.rs | 13 +- 18 files changed, 1900 insertions(+), 425 deletions(-) create mode 100644 crates/core/src/state_backend/slatedb.rs diff --git a/Cargo.lock b/Cargo.lock index 88bb717..184f3b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.24.1" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" dependencies = [ "gimli", ] @@ -165,6 +165,15 @@ dependencies = [ "zstd 0.12.4", ] +[[package]] +name = "array-util" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e509844de8f09b90a2c3444684a2b6695f4071360e13d2fda0af9f749cc2ed6" +dependencies = [ + "arrayvec", +] + [[package]] name = "arrayref" version = "0.3.9" @@ -179,9 +188,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45aef0d9cf9a039bf6cd1acc451b137aca819977b0928dece52bd92811b640ba" +checksum = "a9ba0d7248932f4e2a12fb37f0a2e3ec82b3bdedbac2a1dce186e036843b8f8c" dependencies = [ "arrow-arith", "arrow-array", @@ -201,9 +210,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03675e42d1560790f3524800e41403b40d0da1c793fe9528929fde06d8c7649a" +checksum = "d60afcdc004841a5c8d8da4f4fa22d64eb19c0c01ef4bcedd77f175a7cf6e38f" dependencies = [ "arrow-array", "arrow-buffer", @@ -216,9 +225,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd2bf348cf9f02a5975c5962c7fa6dee107a2009a7b41ac5fb1a027e12dc033f" +checksum = "7f16835e8599dbbb1659fd869d865254c4cf32c6c2bb60b6942ac9fc36bfa5da" dependencies = [ "ahash", "arrow-buffer", @@ -227,15 +236,15 @@ dependencies = [ "chrono", "chrono-tz", "half", - "hashbrown", + "hashbrown 0.14.5", "num", ] [[package]] name = "arrow-buffer" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3092e37715f168976012ce52273c3989b5793b0db5f06cbaa246be25e5f0924d" +checksum = "1a1f34f0faae77da6b142db61deba2cb6d60167592b178be317b341440acba80" dependencies = [ "bytes", "half", @@ -244,9 +253,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ce1018bb710d502f9db06af026ed3561552e493e989a79d0d0f5d9cf267a785" +checksum = "450e4abb5775bca0740bec0bcf1b1a5ae07eff43bd625661c4436d8e8e4540c4" dependencies = [ "arrow-array", "arrow-buffer", @@ -254,7 +263,7 @@ dependencies = [ "arrow-schema", "arrow-select", "atoi", - "base64", + "base64 0.22.1", "chrono", "comfy-table", "half", @@ -265,9 +274,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd178575f45624d045e4ebee714e246a05d9652e41363ee3f57ec18cca97f740" +checksum = "d3a4e4d63830a341713e35d9a42452fbc6241d5f42fa5cf6a4681b8ad91370c4" dependencies = [ "arrow-array", "arrow-buffer", @@ -284,9 +293,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4ac0c4ee79150afe067dc4857154b3ee9c1cd52b5f40d59a77306d0ed18d65" +checksum = "2b1e618bbf714c7a9e8d97203c806734f012ff71ae3adc8ad1b075689f540634" dependencies = [ "arrow-buffer", "arrow-schema", @@ -296,9 +305,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb307482348a1267f91b0912e962cd53440e5de0f7fb24c5f7b10da70b38c94a" +checksum = "f98e983549259a2b97049af7edfb8f28b8911682040e99a94e4ceb1196bd65c2" dependencies = [ "arrow-array", "arrow-buffer", @@ -311,9 +320,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d24805ba326758effdd6f2cbdd482fcfab749544f21b134701add25b33f474e6" +checksum = "b198b9c6fcf086501730efbbcb483317b39330a116125af7bb06467d04b352a3" dependencies = [ "arrow-array", "arrow-buffer", @@ -322,7 +331,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap", + "indexmap 2.6.0", "lexical-core", "num", "serde", @@ -331,9 +340,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "644046c479d80ae8ed02a7f1e1399072ea344ca6a7b0e293ab2d5d9ed924aa3b" +checksum = "2427f37b4459a4b9e533045abe87a5183a5e0995a3fc2c2fd45027ae2cc4ef3f" dependencies = [ "arrow-array", "arrow-buffer", @@ -346,9 +355,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a29791f8eb13b340ce35525b723f5f0df17ecb955599e11f65c2a94ab34e2efb" +checksum = "15959657d92e2261a7a323517640af87f5afd9fd8a6492e424ebee2203c567f6" dependencies = [ "ahash", "arrow-array", @@ -360,18 +369,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c85320a3a2facf2b2822b57aa9d6d9d55edb8aee0b6b5d3b8df158e503d10858" +checksum = "fbf0388a18fd7f7f3fe3de01852d30f54ed5182f9004db700fbe3ba843ed2794" dependencies = [ "bitflags 2.6.0", ] [[package]] name = "arrow-select" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cc7e6b582e23855fd1625ce46e51647aa440c20ea2e71b1d748e0839dd73cba" +checksum = "b83e5723d307a38bf00ecd2972cd078d1339c7fd3eb044f609958a9a24463f3a" dependencies = [ "ahash", "arrow-array", @@ -383,9 +392,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0775b6567c66e56ded19b87a954b6b1beffbdd784ef95a3a2b03f59570c1d230" +checksum = "7ab3db7c09dd826e74079661d84ed01ed06547cf75d52c2818ef776d0d852305" dependencies = [ "arrow-array", "arrow-buffer", @@ -395,14 +404,26 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.5", +] + +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", ] [[package]] name = "async-compression" -version = "0.4.12" +version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fec134f64e2bc57411226dfc4e52dec859ddfc7e711fc5e07b612584f000e4aa" +checksum = "e26a9844c659a2a293d239c7910b752f8487fe122c6c8bd1659bf85a6507c302" dependencies = [ "bzip2", "flate2", @@ -416,15 +437,54 @@ dependencies = [ "zstd-safe 7.2.1", ] +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" -version = "0.1.82" +version = "0.1.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -436,17 +496,85 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] + [[package]] name = "atomic-waker" version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "auto_enums" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "459b77b7e855f875fd15f101064825cd79eb83185a961d66e6298560126facfb" +dependencies = [ + "derive_utils", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "autocfg" -version = "1.3.0" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", + "tower 0.5.1", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] [[package]] name = "backtrace" @@ -463,6 +591,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -480,9 +614,9 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.69.4" +version = "0.69.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" dependencies = [ "bitflags 2.6.0", "cexpr", @@ -495,7 +629,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -568,6 +702,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "bytemuck" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" + [[package]] name = "byteorder" version = "1.5.0" @@ -603,9 +743,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.21" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" dependencies = [ "jobserver", "libc", @@ -644,9 +784,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" +checksum = "cd6dd8046d00723a59a2f8c5f295c515b9bb9a331ee4f8f3d4dd49e428acd3b6" dependencies = [ "chrono", "chrono-tz-build", @@ -655,12 +795,11 @@ dependencies = [ [[package]] name = "chrono-tz-build" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" +checksum = "e94fea34d77a245229e7746bd2beb786cd2a896f306ff491fb8cecb3074b10a7" dependencies = [ "parse-zoneinfo", - "phf", "phf_codegen", ] @@ -675,6 +814,55 @@ dependencies = [ "libloading", ] +[[package]] +name = "clap" +version = "4.5.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.1", +] + +[[package]] +name = "clap_derive" +version = "4.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.79", +] + +[[package]] +name = "clap_lex" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" + +[[package]] +name = "cmsketch" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeccf706e341a5fcdc7f309af21f75eb4dd68fd7474e171bfe1a5570ea48307a" +dependencies = [ + "paste", +] + [[package]] name = "colorchoice" version = "1.0.2" @@ -692,6 +880,54 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "console-api" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ed14aa9c9f927213c6e4f3ef75faaad3406134efe84ba2cb7983431d5f0931" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e3a111a37f3333946ebf9da370ba5c5577b18eb342ec683eb488dd21980302" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "hyper-util", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-random" version = "0.1.18" @@ -811,6 +1047,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -854,6 +1100,51 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 1.0.109", +] + +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core", + "quote", + "syn 1.0.109", +] + [[package]] name = "dary_heap" version = "0.3.6" @@ -868,7 +1159,7 @@ checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", - "hashbrown", + "hashbrown 0.14.5", "lock_api", "once_cell", "parking_lot_core", @@ -910,8 +1201,8 @@ dependencies = [ "futures", "glob", "half", - "hashbrown", - "indexmap", + "hashbrown 0.14.5", + "indexmap 2.6.0", "itertools 0.13.0", "log", "num-traits", @@ -959,7 +1250,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "hashbrown", + "hashbrown 0.14.5", "instant", "libc", "num_cpus", @@ -991,7 +1282,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "futures", - "hashbrown", + "hashbrown 0.14.5", "log", "object_store", "parking_lot", @@ -1038,14 +1329,14 @@ source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=d812 dependencies = [ "arrow", "arrow-buffer", - "base64", + "base64 0.22.1", "blake2", "blake3", "chrono", "datafusion-common", "datafusion-execution", "datafusion-expr", - "hashbrown", + "hashbrown 0.14.5", "hex", "itertools 0.13.0", "log", @@ -1134,12 +1425,12 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown", - "indexmap", + "hashbrown 0.14.5", + "indexmap 2.6.0", "itertools 0.13.0", "log", "paste", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -1154,7 +1445,7 @@ dependencies = [ "arrow-ord", "arrow-schema", "arrow-string", - "base64", + "base64 0.22.1", "chrono", "datafusion-common", "datafusion-execution", @@ -1163,9 +1454,9 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", - "hashbrown", + "hashbrown 0.14.5", "hex", - "indexmap", + "indexmap 2.6.0", "itertools 0.13.0", "log", "paste", @@ -1182,7 +1473,7 @@ dependencies = [ "arrow", "datafusion-common", "datafusion-expr-common", - "hashbrown", + "hashbrown 0.14.5", "rand", ] @@ -1222,8 +1513,8 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "half", - "hashbrown", - "indexmap", + "hashbrown 0.14.5", + "indexmap 2.6.0", "itertools 0.13.0", "log", "once_cell", @@ -1250,8 +1541,8 @@ dependencies = [ "pyo3", "pyo3-build-config", "rand", - "regex-syntax", - "syn 2.0.77", + "regex-syntax 0.8.5", + "syn 2.0.79", "tokio", "url", "uuid", @@ -1281,7 +1572,7 @@ checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1296,9 +1587,11 @@ dependencies = [ "arrow-ord", "arrow-schema", "async-trait", - "base64", + "base64 0.22.1", "bincode", + "bytes", "chrono", + "crossbeam", "datafusion", "delegate", "denormalized-common", @@ -1306,13 +1599,15 @@ dependencies = [ "flatbuffers", "futures", "half", - "hashbrown", + "hashbrown 0.14.5", "itertools 0.13.0", "log", + "object_store", "rdkafka", "rocksdb", "serde", "serde_json", + "slatedb", "tokio", "tracing", ] @@ -1327,6 +1622,7 @@ dependencies = [ "datafusion", "pyo3", "serde_json", + "slatedb", "thiserror", ] @@ -1337,6 +1633,7 @@ dependencies = [ "arrow", "arrow-array", "arrow-schema", + "console-subscriber", "datafusion", "denormalized", "env_logger", @@ -1378,6 +1675,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "derive_utils" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65f152f4b8559c4da5d574bafc7af85454d706b4c5fe8b530d508cacbb6807ea" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "digest" version = "0.10.7" @@ -1389,6 +1697,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + [[package]] name = "either" version = "1.13.0" @@ -1435,42 +1749,113 @@ dependencies = [ ] [[package]] -name = "fastrand" -version = "2.1.1" +name = "event-listener" +version = "5.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] [[package]] -name = "fixedbitset" -version = "0.4.2" +name = "event-listener-strategy" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] [[package]] -name = "flatbuffers" -version = "24.3.25" +name = "fail-parallel" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +checksum = "5666e8ca4ec174d896fb742789c29b1bea9319dcfd623c41bececc0a60c4939d" dependencies = [ - "bitflags 1.3.2", - "rustc_version", + "log", + "once_cell", + "rand", ] [[package]] -name = "flate2" -version = "1.0.33" +name = "fastrace" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" +checksum = "fe845ecd1e3dba36bd7a20ea3b46c81ec610d5a2ffe288160a7cc6a2051496a5" dependencies = [ - "crc32fast", - "miniz_oxide", + "fastrace-macro", + "minstant", + "once_cell", + "parking_lot", + "pin-project", + "rand", + "rtrb", ] [[package]] -name = "fnv" -version = "1.0.7" +name = "fastrace-macro" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +checksum = "a09bf248c7ec91a448701fa2c31750f78be6cbc3d5269dbb82a9f3945776d1f4" +dependencies = [ + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.79", +] + +[[package]] +name = "fastrand" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" + +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "flatbuffers" +version = "24.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +dependencies = [ + "bitflags 1.3.2", + "rustc_version", +] + +[[package]] +name = "flate2" +version = "1.0.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" @@ -1481,11 +1866,128 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "foyer" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2de727a58f28ad9c5ff9952979d2392851b5917785ca657e07277890109cd3a0" +dependencies = [ + "ahash", + "anyhow", + "fastrace", + "foyer-common", + "foyer-memory", + "foyer-storage", + "futures", + "madsim-tokio", + "pin-project", + "tracing", +] + +[[package]] +name = "foyer-common" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be6577040f1b773e4a606180638ec48f5528d3d843d73092c06b5a8876bc4576" +dependencies = [ + "bytes", + "cfg-if", + "crossbeam", + "fastrace", + "futures", + "hashbrown 0.14.5", + "itertools 0.13.0", + "madsim-tokio", + "metrics", + "parking_lot", + "pin-project", + "serde", +] + +[[package]] +name = "foyer-intrusive" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01dbfd6763227809019a1dc01c98b6a78949c63d3a204d0a6f8e0325f077ab5c" +dependencies = [ + "foyer-common", + "itertools 0.13.0", +] + +[[package]] +name = "foyer-memory" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02c59933e075daa88363e228475ce4fa9098689b9e13a26442666a3a3142da8d" +dependencies = [ + "ahash", + "bitflags 2.6.0", + "cmsketch", + "fastrace", + "foyer-common", + "foyer-intrusive", + "futures", + "hashbrown 0.14.5", + "itertools 0.13.0", + "madsim-tokio", + "parking_lot", + "pin-project", + "serde", + "tracing", +] + +[[package]] +name = "foyer-storage" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6976958440e7fee57bd25f6c04df0759484c18a233f00740f6c2c5a47b939d90" +dependencies = [ + "ahash", + "allocator-api2", + "anyhow", + "array-util", + "async-channel", + "auto_enums", + "bincode", + "bitflags 2.6.0", + "bytes", + "clap", + "either", + "fastrace", + "flume", + "foyer-common", + "foyer-memory", + "fs4", + "futures", + "itertools 0.13.0", + "libc", + "lz4", + "madsim-tokio", + "parking_lot", + "pin-project", + "rand", + "serde", + "thiserror", + "tracing", + "twox-hash", + "zstd 0.13.2", +] + +[[package]] +name = "fs4" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8c6b3bd49c37d2aa3f3f2220233b29a7cd23f79d1fe70e5337d25fb390793de" +dependencies = [ + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1498,9 +2000,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1508,15 +2010,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1525,38 +2027,38 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -1587,15 +2089,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] name = "gimli" -version = "0.31.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "glob" @@ -1615,7 +2119,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap", + "indexmap 2.6.0", "slab", "tokio", "tokio-util", @@ -1633,6 +2137,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -1643,6 +2153,25 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" + +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heck" version = "0.4.1" @@ -1703,9 +2232,15 @@ dependencies = [ [[package]] name = "httparse" -version = "1.9.4" +version = "1.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" + +[[package]] +name = "httpdate" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "humantime" @@ -1726,6 +2261,7 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1744,18 +2280,31 @@ dependencies = [ "hyper", "hyper-util", "rustls", - "rustls-native-certs 0.8.0", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da62f120a8a37763efb0cf8fdf264b884c7b8b9ac8660b900c8661030c00e6ba" +checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" dependencies = [ "bytes", "futures-channel", @@ -1766,7 +2315,6 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", "tower-service", "tracing", ] @@ -1794,6 +2342,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.5.0" @@ -1806,12 +2360,22 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.5.0" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + +[[package]] +name = "indexmap" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", ] [[package]] @@ -1840,9 +2404,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "ipnet" -version = "2.10.0" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" [[package]] name = "is_terminal_polyfill" @@ -1885,9 +2449,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.70" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] @@ -1906,9 +2470,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lexical-core" -version = "0.8.5" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +checksum = "0431c65b318a590c1de6b8fd6e72798c92291d27762d94c9e6c37ed7a73d8458" dependencies = [ "lexical-parse-float", "lexical-parse-integer", @@ -1919,9 +2483,9 @@ dependencies = [ [[package]] name = "lexical-parse-float" -version = "0.8.5" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +checksum = "eb17a4bdb9b418051aa59d41d65b1c9be5affab314a872e5ad7f06231fb3b4e0" dependencies = [ "lexical-parse-integer", "lexical-util", @@ -1930,9 +2494,9 @@ dependencies = [ [[package]] name = "lexical-parse-integer" -version = "0.8.6" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +checksum = "5df98f4a4ab53bf8b175b363a34c7af608fe31f93cc1fb1bf07130622ca4ef61" dependencies = [ "lexical-util", "static_assertions", @@ -1940,18 +2504,18 @@ dependencies = [ [[package]] name = "lexical-util" -version = "0.8.5" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +checksum = "85314db53332e5c192b6bca611fb10c114a80d1b831ddac0af1e9be1b9232ca0" dependencies = [ "static_assertions", ] [[package]] name = "lexical-write-float" -version = "0.8.5" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +checksum = "6e7c3ad4e37db81c1cbe7cf34610340adc09c322871972f74877a712abc6c809" dependencies = [ "lexical-util", "lexical-write-integer", @@ -1960,9 +2524,9 @@ dependencies = [ [[package]] name = "lexical-write-integer" -version = "0.8.5" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +checksum = "eb89e9f6958b83258afa3deed90b5de9ef68eef090ad5086c791cd2345610162" dependencies = [ "lexical-util", "static_assertions", @@ -1970,9 +2534,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.158" +version = "0.2.159" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" [[package]] name = "libflate" @@ -1994,7 +2558,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" dependencies = [ "core2", - "hashbrown", + "hashbrown 0.14.5", "rle-decode-fast", ] @@ -2074,11 +2638,20 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lz4" +version = "1.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725" +dependencies = [ + "lz4-sys", +] + [[package]] name = "lz4-sys" -version = "1.11.0" +version = "1.11.1+lz4-1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcb44a01837a858d47e5a630d2ccf304c8efcc4b83b8f9f75b7a9ee4fcc6e57d" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" dependencies = [ "cc", "libc", @@ -2104,6 +2677,75 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "madsim" +version = "0.2.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3c97f34bb19cf6a435a4da2187e90acc6bc59faa730e493b28b6d33e1bb9ccb" +dependencies = [ + "ahash", + "async-channel", + "async-stream", + "async-task", + "bincode", + "bytes", + "downcast-rs", + "futures-util", + "lazy_static", + "libc", + "madsim-macros", + "naive-timer", + "panic-message", + "rand", + "rand_xoshiro", + "rustversion", + "serde", + "spin", + "tokio", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "madsim-macros" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "madsim-tokio" +version = "0.2.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d3eb2acc57c82d21d699119b859e2df70a91dbdb84734885a1e72be83bdecb5" +dependencies = [ + "madsim", + "spin", + "tokio", +] + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -2129,6 +2771,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884adb57038347dfbaf2d5065887b6cf4312330dc8e94bc30a1a839bd79d3261" +dependencies = [ + "ahash", + "portable-atomic", +] + [[package]] name = "mimalloc" version = "0.1.43" @@ -2151,24 +2803,73 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] -name = "miniz_oxide" -version = "0.8.0" +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + +[[package]] +name = "minstant" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db" +dependencies = [ + "ctor", + "web-time", +] + +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys 0.52.0", +] + +[[package]] +name = "moka" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "event-listener", + "futures-util", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + +[[package]] +name = "naive-timer" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" -dependencies = [ - "adler2", -] +checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed" [[package]] -name = "mio" -version = "1.0.2" +name = "nanorand" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" dependencies = [ - "hermit-abi", - "libc", - "wasi", - "windows-sys 0.52.0", + "getrandom", ] [[package]] @@ -2298,9 +2999,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.4" +version = "0.36.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" dependencies = [ "memchr", ] @@ -2312,7 +3013,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45" dependencies = [ "async-trait", - "base64", + "base64 0.22.1", "bytes", "chrono", "futures", @@ -2338,9 +3039,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "openssl-probe" @@ -2363,6 +3064,18 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "panic-message" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" + +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -2388,9 +3101,9 @@ dependencies = [ [[package]] name = "parquet" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0fbf928021131daaa57d334ca8e3904fe9ae22f73c56244fc7db9b04eedc3d8" +checksum = "310c46a70a3ba90d98fec39fa2da6d9d731e544191da6fb56c9d199484d0dd3e" dependencies = [ "ahash", "arrow-array", @@ -2400,14 +3113,14 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "base64", + "base64 0.22.1", "brotli", "bytes", "chrono", "flate2", "futures", "half", - "hashbrown", + "hashbrown 0.14.5", "lz4_flex", "num", "num-bigint", @@ -2450,7 +3163,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 2.6.0", ] [[package]] @@ -2488,27 +3201,27 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ - "siphasher", + "siphasher 0.3.11", ] [[package]] name = "pin-project" -version = "1.1.5" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +checksum = "baf123a161dde1e524adf36f90bc5d8d3462824a9c43553ad07a8183161189ec" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.5" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2531,9 +3244,9 @@ checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" [[package]] name = "portable-atomic" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d30538d42559de6b034bc76fd6dd4c38961b1ee5c6c56e3808c50128fdbc22ce" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" [[package]] name = "ppv-lite86" @@ -2551,14 +3264,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" dependencies = [ "once_cell", - "toml_edit", + "toml_edit 0.19.15", +] + +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn 2.0.79", ] [[package]] name = "proc-macro2" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a" dependencies = [ "unicode-ident", ] @@ -2583,7 +3318,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2597,9 +3332,9 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.22.3" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15ee168e30649f7f234c3d49ef5a7a6cbf5134289bc46c29ff3155fa3221c225" +checksum = "00e89ce2565d6044ca31a3eb79a334c3a79a841120a98f64eea9f579564cb691" dependencies = [ "cfg-if", "indoc", @@ -2615,9 +3350,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.22.3" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e61cef80755fe9e46bb8a0b8f20752ca7676dcc07a5277d8b7768c6172e529b3" +checksum = "d8afbaf3abd7325e08f35ffb8deb5892046fcb2608b703db6a583a5ba4cea01e" dependencies = [ "once_cell", "target-lexicon", @@ -2625,9 +3360,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.22.3" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67ce096073ec5405f5ee2b8b31f03a68e02aa10d5d4f565eca04acc41931fa1c" +checksum = "ec15a5ba277339d04763f4c23d85987a5b08cbb494860be141e6a10a8eb88022" dependencies = [ "libc", "pyo3-build-config", @@ -2635,27 +3370,27 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.22.3" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2440c6d12bc8f3ae39f1e775266fa5122fd0c8891ce7520fa6048e683ad3de28" +checksum = "15e0f01b5364bcfbb686a52fc4181d412b708a68ed20c330db9fc8d2c2bf5a43" dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] name = "pyo3-macros-backend" -version = "0.22.3" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1be962f0e06da8f8465729ea2cb71a416d2257dff56cbe40a70d3e62a93ae5d1" +checksum = "a09b550200e1e5ed9176976d0060cbc2ea82dc8515da07885e7b8153a85caacb" dependencies = [ "heck 0.5.0", "proc-macro2", "pyo3-build-config", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2664,6 +3399,21 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b76f1009795ca44bb5aaae8fd3f18953e209259c33d9b059b1f53d58ab7511db" +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.36.2" @@ -2761,6 +3511,33 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rand_xorshift" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" +dependencies = [ + "rand_core", +] + +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core", +] + +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rdkafka" version = "0.36.2" @@ -2793,34 +3570,43 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.4" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ "bitflags 2.6.0", ] [[package]] name = "regex" -version = "1.10.6" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" +checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.8", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] name = "regex-automata" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" +checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -2831,17 +3617,23 @@ checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" [[package]] name = "regex-syntax" -version = "0.8.4" +version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "reqwest" -version = "0.12.7" +version = "0.12.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" +checksum = "f713147fbe92361e52392c73b8c9e48c04c6625bce969ef54dc901e58e042a7b" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-core", "futures-util", @@ -2861,13 +3653,13 @@ dependencies = [ "pin-project-lite", "quinn", "rustls", - "rustls-native-certs 0.7.3", + "rustls-native-certs", "rustls-pemfile", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tokio-rustls", "tokio-util", @@ -2911,6 +3703,12 @@ dependencies = [ "librocksdb-sys", ] +[[package]] +name = "rtrb" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3f94e84c073f3b85d4012b44722fa8842b9986d741590d4f2636ad0a5b14143" + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2953,9 +3751,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.13" +version = "0.23.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" +checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" dependencies = [ "once_cell", "ring", @@ -2965,19 +3763,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" -dependencies = [ - "openssl-probe", - "rustls-pemfile", - "rustls-pki-types", - "schannel", - "security-framework", -] - [[package]] name = "rustls-native-certs" version = "0.8.0" @@ -2993,19 +3778,18 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.3" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" dependencies = [ - "base64", "rustls-pki-types", ] [[package]] name = "rustls-pki-types" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" +checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" [[package]] name = "rustls-webpki" @@ -3041,9 +3825,9 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.24" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9aaafd5a2b6e3d657ff009d82fbd630b6bd54dd4eb06f21693925cdf80f9b8b" +checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" dependencies = [ "windows-sys 0.59.0", ] @@ -3106,7 +3890,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3121,6 +3905,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3174,6 +3967,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.9" @@ -3183,6 +3982,41 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slatedb" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c9e4921d27d6b16256acc2146f90d9ac6b1978688cb2f2b5c1737e5066d6f34" +dependencies = [ + "async-channel", + "async-trait", + "atomic", + "bytemuck", + "bytes", + "chrono", + "crc32fast", + "crossbeam-channel", + "crossbeam-skiplist", + "fail-parallel", + "flatbuffers", + "foyer", + "futures", + "log", + "moka", + "object_store", + "once_cell", + "parking_lot", + "rand", + "rand_xorshift", + "serde", + "serde_json", + "siphasher 1.0.1", + "thiserror", + "tokio", + "tracing", + "ulid", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -3207,7 +4041,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3231,6 +4065,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "sqlparser" @@ -3250,7 +4087,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3259,6 +4096,18 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "strum" version = "0.25.0" @@ -3284,7 +4133,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3297,7 +4146,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3319,15 +4168,21 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.77" +version = "2.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" +checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.1" @@ -3337,6 +4192,12 @@ dependencies = [ "futures-core", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "target-lexicon" version = "0.12.16" @@ -3345,9 +4206,9 @@ checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "tempfile" -version = "3.12.0" +version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" +checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" dependencies = [ "cfg-if", "fastrand", @@ -3373,7 +4234,7 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3436,6 +4297,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.52.0", ] @@ -3447,7 +4309,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3461,6 +4323,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.12" @@ -3474,11 +4347,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit 0.22.22", +] + [[package]] name = "toml_datetime" version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -3486,9 +4374,52 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap", + "indexmap 2.6.0", "toml_datetime", - "winnow", + "winnow 0.5.40", +] + +[[package]] +name = "toml_edit" +version = "0.22.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +dependencies = [ + "indexmap 2.6.0", + "serde", + "serde_spanned", + "toml_datetime", + "winnow 0.6.20", +] + +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", ] [[package]] @@ -3499,9 +4430,28 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", "tower-layer", "tower-service", ] @@ -3524,6 +4474,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3537,7 +4488,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3567,14 +4518,24 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "try-lock" version = "0.2.5" @@ -3588,6 +4549,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", + "rand", "static_assertions", ] @@ -3608,7 +4570,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3617,11 +4579,22 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ulid" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289" +dependencies = [ + "getrandom", + "rand", + "web-time", +] + [[package]] name = "unicode-bidi" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" +checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893" [[package]] name = "unicode-ident" @@ -3734,9 +4707,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -3745,24 +4718,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.43" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" dependencies = [ "cfg-if", "js-sys", @@ -3772,9 +4745,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3782,28 +4755,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "wasm-streams" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +checksum = "4e072d4e72f700fb3443d8fe94a39315df013eef1104903cdb0a2abd322bbecd" dependencies = [ "futures-util", "js-sys", @@ -3814,9 +4787,19 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.70" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "web-time" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" dependencies = [ "js-sys", "wasm-bindgen", @@ -3983,6 +4966,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +dependencies = [ + "memchr", +] + [[package]] name = "xz2" version = "0.1.7" @@ -4010,7 +5002,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 167e8fe..4b6da32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,3 +58,4 @@ base64 = "0.22.1" chrono = { version = "0.4.38" } itertools = "0.13" pyo3 = { version = "0.22.2" } +slatedb = "0.2.0" diff --git a/README.md b/README.md index a4d5e1a..eaf2b0d 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,18 @@ Details about developing the python bindings can be found in [py-denormalized/RE 2. Start emitting some sample data: `cargo run --example emit_measurements` 3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation` +### Checkpointing + +We use SlateDB for state backend. Initialize your Job Context to a path to local directory - + +``` + let ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + .await; +``` + +The job with automatically recover from state if a previous checkpoint exists. + ## More examples A more powerful example can be seen in our [Kafka ridesharing example](./docs/kafka_rideshare_example.md) diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index fc274f9..8d9214f 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -17,3 +17,4 @@ thiserror = "1.0.63" pyo3 = { workspace = true, optional = true } serde_json.workspace = true apache-avro = "0.16.0" +slatedb = { workspace = true } diff --git a/crates/common/src/error/mod.rs b/crates/common/src/error/mod.rs index 93a5eae..d99a1b8 100644 --- a/crates/common/src/error/mod.rs +++ b/crates/common/src/error/mod.rs @@ -27,6 +27,8 @@ pub enum DenormalizedError { AvroError(#[from] AvroError), #[error("Json Error")] Json(#[from] JsonError), + //#[error("SlateDB Error")] + //SlateDBError(#[from] SlateDBError), #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 2a5b07a..9bdbd30 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -38,3 +38,7 @@ delegate = "0.12.0" ahash = "0.8.11" hashbrown = "0.14.5" flatbuffers = "24.3.25" +crossbeam = "0.8.4" +slatedb = { workspace = true } # "0.2.0" +object_store = "0.11.0" +bytes = "1.7.2" diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 6319160..3419720 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -10,6 +10,7 @@ use crate::datasource::kafka::TopicReader; use crate::datastream::DataStream; use crate::physical_optimizer::EnsureHashPartititionOnGroupByForStreamingAggregates; use crate::query_planner::StreamingQueryPlanner; +use crate::state_backend::slatedb::initialize_global_slatedb; use crate::utils::get_default_optimizer_rules; use denormalized_common::error::{DenormalizedError, Result}; @@ -53,16 +54,10 @@ impl Context { pub async fn from_topic(&self, topic: TopicReader) -> Result { let topic_name = topic.0.topic.clone(); - self.register_table(topic_name.clone(), Arc::new(topic)) .await?; - let df = self.session_conext.table(topic_name.as_str()).await?; - - let ds = DataStream { - df: Arc::new(df), - context: Arc::new(self.clone()), - }; + let ds = DataStream::new(Arc::new(df), Arc::new(self.clone())); Ok(ds) } @@ -76,4 +71,9 @@ impl Context { Ok(()) } + + pub async fn with_slatedb_backend(self, path: String) -> Self { + let _ = initialize_global_slatedb(path.as_str()).await; + self + } } diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 12267e5..26dc488 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -5,15 +5,17 @@ use std::time::Duration; use arrow::datatypes::TimestampMillisecondType; use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; -use denormalized_orchestrator::channel_manager::{create_channel, get_sender}; +use crossbeam::channel; +use denormalized_orchestrator::channel_manager::{create_channel, get_sender, take_receiver}; use denormalized_orchestrator::orchestrator::{self, OrchestrationMessage}; +use futures::executor::block_on; use log::{debug, error}; use serde::{Deserialize, Serialize}; use crate::config_extensions::denormalized_config::DenormalizedConfig; use crate::physical_plan::stream_table::PartitionStreamExt; use crate::physical_plan::utils::time::array_to_timestamp_array; -use crate::state_backend::rocksdb_backend::get_global_rocksdb; +use crate::state_backend::slatedb::get_global_slatedb; use arrow::compute::{max, min}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -21,7 +23,7 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; use datafusion::physical_plan::streaming::PartitionStream; use rdkafka::consumer::{Consumer, StreamConsumer}; -use rdkafka::{ClientConfig, Message, TopicPartitionList}; +use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; use super::KafkaReadConfig; @@ -44,7 +46,7 @@ impl KafkaStreamRead { #[derive(Debug, Serialize, Deserialize)] struct BatchReadMetadata { - epoch: i32, + epoch: u128, min_timestamp: Option, max_timestamp: Option, offsets_read: HashMap, @@ -81,20 +83,15 @@ impl PartitionStream for KafkaStreamRead { } fn execute(&self, ctx: Arc) -> SendableRecordBatchStream { - let mut assigned_partitions = TopicPartitionList::new(); - - let config_options = ctx + let _config_options = ctx .session_config() .options() .extensions .get::(); - let should_checkpoint = config_options.map_or(false, |c| c.checkpoint); + let mut should_checkpoint = false; //config_options.map_or(false, |c| c.checkpoint); - let topic = self.config.topic.clone(); - for partition in self.assigned_partitions.clone() { - assigned_partitions.add_partition(self.config.topic.as_str(), partition); - } + let node_id = self.exec_node_id.unwrap(); let partition_tag = self .assigned_partitions .iter() @@ -102,91 +99,95 @@ impl PartitionStream for KafkaStreamRead { .collect::>() .join("_"); - let state_backend = if should_checkpoint { - Some(get_global_rocksdb().unwrap()) - } else { - None - }; + let channel_tag = format!("{}_{}", node_id, partition_tag); + let mut serialized_state: Option> = None; + let state_backend = get_global_slatedb().unwrap(); + + let mut starting_offsets: HashMap = HashMap::new(); + if orchestrator::SHOULD_CHECKPOINT { + create_channel(channel_tag.as_str(), 10); + debug!("checking for last checkpointed offsets"); + serialized_state = block_on(state_backend.clone().get(channel_tag.as_bytes().to_vec())); + } + + if let Some(serialized_state) = serialized_state { + let last_batch_metadata = BatchReadMetadata::from_bytes(&serialized_state).unwrap(); + debug!( + "recovering from checkpointed offsets. epoch was {} max timestamp {:?}", + last_batch_metadata.epoch, last_batch_metadata.max_timestamp + ); + starting_offsets = last_batch_metadata.offsets_read.clone(); + } + + let mut assigned_partitions = TopicPartitionList::new(); + + for partition in self.assigned_partitions.clone() { + assigned_partitions.add_partition(self.config.topic.as_str(), partition); + if starting_offsets.contains_key(&partition) { + let offset = starting_offsets.get(&partition).unwrap(); + debug!("setting partition {} to offset {}", partition, offset); + let _ = assigned_partitions.set_partition_offset( + self.config.topic.as_str(), + partition, + Offset::from_raw(*offset), + ); + } + } + let consumer: StreamConsumer = create_consumer(self.config.clone()); consumer .assign(&assigned_partitions) .expect("Partition assignment failed."); - let state_namespace = format!("kafka_source_{}", topic); - - if let Some(backend) = &state_backend { - let _ = match backend.get_cf(&state_namespace) { - Ok(cf) => { - debug!("cf for this already exists"); - Ok(cf) - } - Err(..) => { - let _ = backend.create_cf(&state_namespace); - backend.get_cf(&state_namespace) - } - }; - } - let mut builder = RecordBatchReceiverStreamBuilder::new(self.config.schema.clone(), 1); let tx = builder.tx(); let canonical_schema = self.config.schema.clone(); let timestamp_column: String = self.config.timestamp_column.clone(); let timestamp_unit = self.config.timestamp_unit.clone(); - let batch_timeout = Duration::from_millis(100); - let mut channel_tag: String = String::from(""); - if orchestrator::SHOULD_CHECKPOINT { - let node_id = self.exec_node_id.unwrap(); - channel_tag = format!("{}_{}", node_id, partition_tag); - create_channel(channel_tag.as_str(), 10); - } + let batch_timeout: Duration = Duration::from_millis(100); let mut decoder = self.config.build_decoder(); builder.spawn(async move { let mut epoch = 0; + let mut receiver: Option> = None; if orchestrator::SHOULD_CHECKPOINT { let orchestrator_sender = get_sender("orchestrator"); - let msg = OrchestrationMessage::RegisterStream(channel_tag.clone()); + let msg: OrchestrationMessage = + OrchestrationMessage::RegisterStream(channel_tag.clone()); orchestrator_sender.as_ref().unwrap().send(msg).unwrap(); + receiver = take_receiver(channel_tag.as_str()); } + loop { - let mut last_offsets = HashMap::new(); - if let Some(backend) = &state_backend { - if let Some(offsets) = backend - .get_state(&state_namespace, partition_tag.clone().into_bytes()) - .unwrap() - { - let last_batch_metadata = BatchReadMetadata::from_bytes(&offsets).unwrap(); - last_offsets = last_batch_metadata.offsets_read; - debug!( - "epoch is {} and last read offsets are {:?}", - epoch, last_offsets - ); - } else { - debug!("epoch is {} and no prior offsets were found.", epoch); + //let mut checkpoint_barrier: Option = None; + let mut _checkpoint_barrier: Option = None; + + if orchestrator::SHOULD_CHECKPOINT { + let r = receiver.as_ref().unwrap(); + for message in r.try_iter() { + debug!("received checkpoint barrier for {:?}", message); + if let OrchestrationMessage::CheckpointBarrier(epoch_ts) = message { + epoch = epoch_ts; + should_checkpoint = true; + } } } - for (partition, offset) in &last_offsets { - consumer - .seek( - &topic, - *partition, - rdkafka::Offset::Offset(*offset + 1), - Duration::from_secs(10), - ) - .expect("Failed to seek to stored offset"); - } - let mut offsets_read: HashMap = HashMap::new(); - let start_time = datafusion::common::instant::Instant::now(); - while start_time.elapsed() < batch_timeout { + loop { match tokio::time::timeout(batch_timeout, consumer.recv()).await { Ok(Ok(m)) => { let payload = m.payload().expect("Message payload is empty"); decoder.push_to_buffer(payload.to_owned()); - offsets_read.insert(m.partition(), m.offset()); + offsets_read + .entry(m.partition()) + .and_modify(|existing_value| { + *existing_value = (*existing_value).max(m.offset()) + }) + .or_insert(m.offset()); + break; } Ok(Err(err)) => { error!("Error reading from Kafka {:?}", err); @@ -194,7 +195,8 @@ impl PartitionStream for KafkaStreamRead { } Err(_) => { // Timeout reached - break; + error!("timeout reached."); + //break; } } } @@ -220,7 +222,6 @@ impl PartitionStream for KafkaStreamRead { let max_timestamp: Option<_> = max::(ts_array); let min_timestamp: Option<_> = min::(ts_array); - debug!("min: {:?}, max: {:?}", min_timestamp, max_timestamp); let mut columns: Vec> = record_batch.columns().to_vec(); let metadata_column = StructArray::from(vec![ @@ -245,23 +246,21 @@ impl PartitionStream for KafkaStreamRead { match tx_result { Ok(_) => { if should_checkpoint { - let _ = state_backend.as_ref().map(|backend| { - backend.put_state( - &state_namespace, - partition_tag.clone().into_bytes(), - BatchReadMetadata { - epoch, - min_timestamp, - max_timestamp, - offsets_read, - } - .to_bytes() - .unwrap(), - ) - }); + debug!("about to checkpoint offsets"); + let off = BatchReadMetadata { + epoch, + min_timestamp, + max_timestamp, + offsets_read, + }; + let _ = state_backend + .as_ref() + .put(channel_tag.as_bytes().to_vec(), off.to_bytes().unwrap()); + debug!("checkpointed offsets {:?}", off); + should_checkpoint = false; } } - Err(err) => error!("result err {:?}", err), + Err(err) => error!("result err {:?}. shutdown signal detected.", err), } epoch += 1; } diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 482ebd2..8b83ec4 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -1,10 +1,13 @@ use datafusion::common::runtime::SpawnedTask; use datafusion::logical_expr::LogicalPlan; -use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::ExecutionPlanProperties; use denormalized_orchestrator::orchestrator; use futures::StreamExt; +use log::debug; +use log::info; use std::{sync::Arc, time::Duration}; +use tokio::signal; +use tokio::sync::watch; use datafusion::common::DFSchema; use datafusion::dataframe::DataFrame; @@ -19,6 +22,7 @@ use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; use crate::logical_plan::StreamingLogicalPlanBuilder; use crate::physical_plan::utils::time::TimestampUnit; +use crate::state_backend::slatedb::get_global_slatedb; use denormalized_orchestrator::orchestrator::Orchestrator; use denormalized_common::error::Result; @@ -31,10 +35,42 @@ use denormalized_common::error::Result; pub struct DataStream { pub df: Arc, pub(crate) context: Arc, + shutdown_tx: watch::Sender, // Sender to trigger shutdown + shutdown_rx: watch::Receiver, // Receiver to listen for shutdown signal } impl DataStream { - // Select columns in the output stream + pub fn new(df: Arc, context: Arc) -> Self { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + DataStream { + df, + context, + shutdown_tx, + shutdown_rx, + } + } + + fn start_shutdown_listener(&self) { + let shutdown_tx = self.shutdown_tx.clone(); + + tokio::spawn(async move { + let mut terminate_signal = signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Failed to listen for SIGTERM"); + + loop { + tokio::select! { + _ = signal::ctrl_c() => { + println!("Received Ctrl+C, initiating shutdown..."); + }, + _ = terminate_signal.recv() => { + println!("Received SIGTERM, initiating shutdown..."); + }, + } + shutdown_tx.send(true).unwrap(); + } + }); + } + pub fn select(self, expr_list: Vec) -> Result { let (session_state, plan) = self.df.as_ref().clone().into_parts(); @@ -49,6 +85,8 @@ impl DataStream { Ok(Self { df: Arc::new(DataFrame::new(session_state, project_plan)), context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), }) } @@ -61,6 +99,8 @@ impl DataStream { Ok(Self { df: Arc::new(DataFrame::new(session_state, plan)), context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), }) } @@ -68,6 +108,8 @@ impl DataStream { Ok(Self { df: Arc::new(self.df.as_ref().clone().with_column(name, expr)?), context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), }) } @@ -88,6 +130,8 @@ impl DataStream { Ok(Self { df: Arc::new(DataFrame::new(session_state, plan)), context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), }) } @@ -116,6 +160,8 @@ impl DataStream { Ok(Self { df: Arc::new(DataFrame::new(session_state, plan)), context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), }) } @@ -135,6 +181,8 @@ impl DataStream { Ok(Self { df: Arc::new(DataFrame::new(session_state, plan)), context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), }) } @@ -162,6 +210,8 @@ impl DataStream { pub async fn print_physical_plan(self) -> Result { let (session_state, plan) = self.df.as_ref().clone().into_parts(); let physical_plan = self.df.as_ref().clone().create_physical_plan().await?; + let node_id = physical_plan.node_id(); + debug!("topline node id = {:?}", node_id); let displayable_plan = DisplayableExecutionPlan::new(physical_plan.as_ref()); println!("{}", displayable_plan.indent(true)); @@ -169,40 +219,79 @@ impl DataStream { Ok(Self { df: Arc::new(DataFrame::new(session_state, plan)), context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), }) } /// execute the stream and print the results to stdout. /// Mainly used for development and debugging - pub async fn print_stream(self) -> Result<()> { + pub async fn print_stream(mut self) -> Result<()> { + self.start_shutdown_listener(); + + let mut maybe_orchestrator_handle = None; + if orchestrator::SHOULD_CHECKPOINT { - let plan = self.df.as_ref().clone().create_physical_plan().await?; - let node_ids = extract_node_ids_and_partitions(&plan); - let max_buffer_size = node_ids.iter().map(|x| x.1).sum::(); let mut orchestrator = Orchestrator::default(); - SpawnedTask::spawn_blocking(move || orchestrator.run(max_buffer_size)); + let cloned_shutdown_rx = self.shutdown_rx.clone(); + let orchestrator_handle = + SpawnedTask::spawn_blocking(move || orchestrator.run(10, cloned_shutdown_rx)); + + maybe_orchestrator_handle = Some(orchestrator_handle) } let mut stream: SendableRecordBatchStream = self.df.as_ref().clone().execute_stream().await?; + + // Stream loop with shutdown check loop { - match stream.next().await.transpose() { - Ok(Some(batch)) => { - println!( - "{}", - datafusion::common::arrow::util::pretty::pretty_format_batches(&[batch]) - .unwrap() - ); - } - Ok(None) => { - log::warn!("No RecordBatch in stream"); + tokio::select! { + // Check if shutdown signal has changed + _ = self.shutdown_rx.changed() => { + info!("Graceful shutdown initiated, exiting stream loop..."); + + break; } - Err(err) => { - log::error!("Error reading stream: {:?}", err); - return Err(err.into()); + // Handle the next batch from the DataFusion stream + next_batch = stream.next() => { + match next_batch.transpose() { + Ok(Some(batch)) => { + println!( + "{}", + datafusion::common::arrow::util::pretty::pretty_format_batches(&[batch]) + .unwrap() + ); + } + Ok(None) => { + info!("No more RecordBatch in stream"); + break; // End of stream + } + Err(err) => { + log::error!("Error reading stream: {:?}", err); + return Err(err.into()); + } + } } } } + + log::info!("Stream processing stopped. Cleaning up..."); + + let state_backend = get_global_slatedb(); + if let Ok(db) = state_backend { + log::info!("Closing the state backend (slatedb)..."); + db.close().await.unwrap(); + } + + // Join the orchestrator handle if it exists, ensuring it is joined and awaited + if let Some(orchestrator_handle) = maybe_orchestrator_handle { + log::info!("Waiting for orchestrator task to complete..."); + match orchestrator_handle.join_unwind().await { + Ok(_) => log::info!("Orchestrator task completed successfully."), + Err(e) => log::error!("Error joining orchestrator task: {:?}", e), + } + } + Ok(()) } /// execute the stream and write the results to a give kafka topic @@ -252,16 +341,3 @@ impl Joinable for DataStream { plan } } - -fn extract_node_ids_and_partitions(plan: &Arc) -> Vec<(Option, usize)> { - let node_id = plan.node_id(); - let partitions = plan.output_partitioning().partition_count(); - let mut traversals: Vec<(Option, usize)> = vec![]; - - for child in plan.children() { - let mut traversal = extract_node_ids_and_partitions(child); - traversals.append(&mut traversal); - } - traversals.push((node_id, partitions)); - traversals -} diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index c652540..4114584 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -15,6 +15,7 @@ use arrow::{ use arrow_array::{ArrayRef, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray}; use arrow_ord::cmp; use arrow_schema::{Schema, SchemaRef}; +use crossbeam::channel::Receiver; use datafusion::physical_expr::aggregate::AggregateFunctionExpr; use datafusion::{ common::{utils::proxy::VecAllocExt, DataFusionError, Result}, @@ -35,9 +36,19 @@ use datafusion::{ }, }; -use futures::{Stream, StreamExt}; - -use crate::physical_plan::utils::time::RecordBatchWatermark; +use denormalized_orchestrator::{ + channel_manager::take_receiver, + orchestrator::{self, OrchestrationMessage}, +}; +use futures::{executor::block_on, Stream, StreamExt}; +use log::debug; +use serde::{Deserialize, Serialize}; + +use crate::{ + physical_plan::utils::time::RecordBatchWatermark, + state_backend::slatedb::{get_global_slatedb, SlateDBWrapper}, + utils::serialization::ArrayContainer, +}; use super::{ add_window_columns_to_record_batch, add_window_columns_to_schema, create_group_accumulator, @@ -62,6 +73,31 @@ pub struct GroupedWindowAggStream { group_by: PhysicalGroupBy, group_schema: Arc, context: Arc, + epoch: i64, + partition: usize, + channel_tag: String, + receiver: Option>, + state_backend: Arc, +} + +#[derive(Serialize, Deserialize)] +pub struct CheckpointedGroupedWindowAggStream { + partition: usize, + watermark: Option, + frames: Vec, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct SerializedAccumulator { + states: ArrayContainer, + num_groups: usize, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct CheckpointedGroupedWindowFrame { + window_start_time: SystemTime, + window_end_time: SystemTime, + accumulators: Vec, } fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { @@ -78,6 +114,7 @@ impl GroupedWindowAggStream { watermark: Arc>>, window_type: PhysicalStreamingWindowType, aggregation_mode: AggregateMode, + channel_tag: Option, ) -> Result { let agg_schema = Arc::clone(&exec_operator.schema); let agg_filter_expr = exec_operator.filter_expressions.clone(); @@ -104,7 +141,18 @@ impl GroupedWindowAggStream { let group_by = exec_operator.group_by.clone(); let group_schema = group_schema(&agg_schema, group_by.expr().len()); - Ok(Self { + + let receiver: Option> = channel_tag + .as_ref() + .and_then(|tag| take_receiver(tag.as_str())); + + let channel_tag: String = channel_tag.unwrap_or(String::from("")); + let state_backend = get_global_slatedb().unwrap(); + + let serialized_state = block_on(state_backend.get(channel_tag.as_bytes().to_vec())); + + //let window_frames: BTreeMap = BTreeMap::new(); + let mut stream = Self { schema: agg_schema, input, baseline_metrics, @@ -118,7 +166,38 @@ impl GroupedWindowAggStream { group_by, group_schema, context, - }) + epoch: 0, + partition, + channel_tag: channel_tag, + receiver, + state_backend, + }; + + if serialized_state.is_some() { + let bytes = serialized_state.unwrap(); + let state: CheckpointedGroupedWindowAggStream = + bincode::deserialize(bytes.as_ref()).unwrap(); + let ranges: Vec<(SystemTime, SystemTime)> = state + .frames + .iter() + .map(|f| (f.window_start_time, f.window_end_time)) + .collect(); + let _ = stream.ensure_window_frames_for_ranges(&ranges); + state.frames.iter().for_each(|f| { + let _ = stream.update_accumulators_for_frame(f.window_start_time, &f); + }); + let state_watermark = state.watermark.unwrap(); + stream.process_watermark(RecordBatchWatermark { + min_timestamp: state_watermark, + max_timestamp: state_watermark, + }); + debug!( + "successfully read the last checkpoint. partition was {} and watermark was at {:?}", + state.partition, state.watermark + ); + } + + Ok(stream) } pub fn output_schema_with_window(&self) -> SchemaRef { @@ -130,17 +209,23 @@ impl GroupedWindowAggStream { let watermark_lock: std::sync::MutexGuard<'_, Option> = self.latest_watermark.lock().unwrap(); + let output_schema_with_window = self.output_schema_with_window(); if let Some(watermark) = *watermark_lock { let mut window_frames_to_remove: Vec = Vec::new(); for (timestamp, frame) in self.window_frames.iter_mut() { if watermark >= frame.window_end_time { let rb = frame.evaluate()?; - let result = add_window_columns_to_record_batch( - rb, - frame.window_start_time, - frame.window_end_time, - ); + let result = if rb.num_rows() > 0 { + add_window_columns_to_record_batch( + rb, + frame.window_start_time, + frame.window_end_time, + ) + } else { + RecordBatch::new_empty(output_schema_with_window.clone()) + }; + results.push(result); window_frames_to_remove.push(*timestamp); } @@ -155,7 +240,6 @@ impl GroupedWindowAggStream { } fn process_watermark(&mut self, watermark: RecordBatchWatermark) { - // should this be within a mutex? let mut watermark_lock: std::sync::MutexGuard> = self.latest_watermark.lock().unwrap(); @@ -215,6 +299,16 @@ impl GroupedWindowAggStream { Ok(()) } + fn update_accumulators_for_frame( + &mut self, + window_start_time: SystemTime, + state: &CheckpointedGroupedWindowFrame, + ) -> Result<(), DataFusionError> { + let frame = self.window_frames.get_mut(&window_start_time).unwrap(); + let _ = frame.initialize_from_state(state); + Ok(()) + } + #[inline] fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { let result: std::prelude::v1::Result = match self @@ -246,6 +340,71 @@ impl GroupedWindowAggStream { return Poll::Pending; } }; + self.epoch += 1; + + if orchestrator::SHOULD_CHECKPOINT { + let r = self.receiver.as_ref().unwrap(); + let mut epoch: u128 = 0; + for message in r.try_iter() { + debug!("received checkpoint barrier for {:?}", message); + if let OrchestrationMessage::CheckpointBarrier(epoch_ts) = message { + epoch = epoch_ts; + } + } + + if epoch != 0 { + // Prepare data for checkpointing + + // Clone or extract necessary data + let frames: Vec = self + .window_frames + .values_mut() + .map(|frame| { + let num_groups = frame.group_values.len(); + let accumulators: Vec = frame + .accumulators + .iter_mut() + .map(|acc| { + let states = acc.state(EmitTo::All).unwrap(); + SerializedAccumulator { + states: ArrayContainer { arrays: states }, + num_groups, + } + }) + .collect(); + let window_start_time = frame.window_start_time; + let window_end_time = frame.window_end_time; + let checkpointed_frame = CheckpointedGroupedWindowFrame { + window_start_time, + window_end_time, + accumulators, + }; + // accumulators get reset on .state(), so reseed them with state + let _ = frame.initialize_from_state(&checkpointed_frame); + checkpointed_frame + }) + .collect(); + + let watermark = { + let watermark_lock = self.latest_watermark.lock().unwrap(); + watermark_lock.clone() + }; + + let checkpointed_state = CheckpointedGroupedWindowAggStream { + partition: self.partition, + watermark, + frames, + }; + + let serialized_checkpoint = bincode::serialize(&checkpointed_state).unwrap(); + let key = self.channel_tag.as_bytes().to_vec(); + + // Clone or use `Arc` for `state_backend` + let state_backend = self.state_backend.clone(); + + state_backend.put(key, serialized_checkpoint); + } + } Poll::Ready(Some(result)) } } @@ -459,6 +618,26 @@ impl GroupedAggWindowFrame { let batch = RecordBatch::try_new(schema, output)?; Ok(batch) } + + fn initialize_from_state( + &mut self, + state: &CheckpointedGroupedWindowFrame, + ) -> Result<(), DataFusionError> { + let _ = self + .accumulators + .iter_mut() + .zip(state.accumulators.iter()) + .map(|(acc, checkpointed_acc)| { + let group_indices = (0..checkpointed_acc.num_groups).collect::>(); + acc.merge_batch( + &checkpointed_acc.states.arrays, + &group_indices, + None, + checkpointed_acc.num_groups, + ) + }); + Ok(()) + } } pub(crate) fn evaluate_group_by( diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index f69b3d1..e70261b 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -38,6 +38,10 @@ use datafusion::{ common::{internal_err, stats::Precision, DataFusionError, Statistics}, physical_plan::Distribution, }; +use denormalized_orchestrator::{ + channel_manager::{create_channel, get_sender}, + orchestrator::{self, OrchestrationMessage}, +}; use futures::{Stream, StreamExt}; use tracing::debug; @@ -418,6 +422,22 @@ impl ExecutionPlan for StreamingWindowExec { partition: usize, context: Arc, ) -> Result { + let node_id = self + .properties() + .node_id() + .expect("expected node id to be set."); + + let channel_tag = if orchestrator::SHOULD_CHECKPOINT { + let tag = format!("{}_{}", node_id, partition); + create_channel(tag.as_str(), 10); + let orchestrator_sender = get_sender("orchestrator"); + let msg: OrchestrationMessage = OrchestrationMessage::RegisterStream(tag.clone()); + orchestrator_sender.as_ref().unwrap().send(msg).unwrap(); + Some(tag) + } else { + None + }; + if self.group_by.is_empty() { debug!("GROUP BY expression is empty creating a SimpleWindowAggStream"); if self.mode == AggregateMode::Partial { @@ -428,6 +448,7 @@ impl ExecutionPlan for StreamingWindowExec { self.watermark.clone(), self.window_type, self.mode, + channel_tag, )?)) } else { Ok(Box::pin(FullWindowAggStream::try_new( @@ -446,6 +467,7 @@ impl ExecutionPlan for StreamingWindowExec { self.watermark.clone(), self.window_type, self.mode, + channel_tag, )?)) } } @@ -617,6 +639,7 @@ pub struct WindowAggStream { window_frames: BTreeMap, window_type: PhysicalStreamingWindowType, aggregation_mode: AggregateMode, + _channel_tag: Option, } #[allow(dead_code)] @@ -628,6 +651,7 @@ impl WindowAggStream { watermark: Arc>>, window_type: PhysicalStreamingWindowType, aggregation_mode: AggregateMode, + channel_tag: Option, ) -> Result { let agg_schema = Arc::clone(&exec_operator.schema); let agg_filter_expr = exec_operator.filter_expressions.clone(); @@ -659,6 +683,7 @@ impl WindowAggStream { window_frames: BTreeMap::new(), window_type, aggregation_mode, + _channel_tag: channel_tag, }) } diff --git a/crates/core/src/state_backend/mod.rs b/crates/core/src/state_backend/mod.rs index 3b23a3a..e64865d 100644 --- a/crates/core/src/state_backend/mod.rs +++ b/crates/core/src/state_backend/mod.rs @@ -1 +1,2 @@ pub mod rocksdb_backend; +pub mod slatedb; diff --git a/crates/core/src/state_backend/slatedb.rs b/crates/core/src/state_backend/slatedb.rs new file mode 100644 index 0000000..0aca1cb --- /dev/null +++ b/crates/core/src/state_backend/slatedb.rs @@ -0,0 +1,92 @@ +use denormalized_common::DenormalizedError; +use object_store::{local::LocalFileSystem, path::Path}; +use slatedb::error::SlateDBError; +use slatedb::{config::DbOptions, db::Db}; +use std::sync::{Arc, OnceLock}; + +static GLOBAL_SLATEDB: OnceLock> = OnceLock::new(); + +pub async fn initialize_global_slatedb(path: &str) -> Result<(), DenormalizedError> { + let backend = SlateDBWrapper::initialize(path).await.unwrap(); + GLOBAL_SLATEDB + .set(Arc::new(backend)) + .map_err(|_| DenormalizedError::RocksDB("Global SlateDB already initialized".to_string())) +} + +pub fn get_global_slatedb() -> Result, DenormalizedError> { + GLOBAL_SLATEDB + .get() + .cloned() + .ok_or_else(|| DenormalizedError::RocksDB("Global SlateDB not initialized".to_string())) +} + +pub struct SlateDBWrapper { + db: Arc, +} + +//TODO: This is a WIP. the comments will be cleaned up 10/19/2024 +impl SlateDBWrapper { + // Function to initialize the SlateDB instance + pub async fn initialize(path_str: &str) -> Result { + // let os: Arc = Arc::new( + // AmazonS3Builder::new() + // .with_allow_http(true) + // .with_endpoint("http://localhost:4566") + // .with_access_key_id("test") + // .with_secret_access_key("test") + // .with_bucket_name("slatedb") + // .with_region("us-east-1") + // .with_conditional_put(S3ConditionalPut::Dynamo(DynamoCommit::new( + // "slatedb".to_string(), + // ))) + // .build() + // .expect("failed to create object store"), + // ); + + let os = Arc::new(LocalFileSystem::new()); + let db_options = DbOptions::default(); + // let compactor_opts = db_options + // .clone() + // .compactor_options + // .map(|co| co.with_compactor_handle(runtime.handle().clone())) + // .unwrap(); + + let path = Path::from(path_str); + + //let db_options_with_new_compactor = db_options.with_compactor_options(compactor_opts); + // Capture the runtime handle and block on database initialization + + let db = Db::open_with_opts(path, db_options, os).await?; + + Ok(SlateDBWrapper { db: Arc::new(db) }) + } + + // Non-async method to perform a put operation + pub fn put(&self, key: Vec, value: Vec) { + let db_clone = self.db.clone(); + //let handle = self.runtime_handle.clone(); + + // Use the runtime handle to spawn a blocking task for the put operation + // self.runtime.spawn_blocking(move || { + let rt = tokio::runtime::Handle::current(); + rt.spawn(async move { + db_clone.put(&key, &value).await; + }); + //}); //.expect("Failed to spawn blocking task"); + } + + // Non-async method to perform a get operation + pub async fn get(&self, key: Vec) -> Option> { + let db_clone = self.db.clone(); + + // Block on the get operation using the runtime handle + let result = db_clone.get(&key).await.unwrap_or(None); + + result.map(|bytes| bytes.to_vec()) + } + + pub async fn close(&self) -> Result<(), SlateDBError> { + let db_clone = self.db.clone(); + db_clone.close().await + } +} diff --git a/crates/core/src/utils/serialization.rs b/crates/core/src/utils/serialization.rs index c793599..13169e7 100644 --- a/crates/core/src/utils/serialization.rs +++ b/crates/core/src/utils/serialization.rs @@ -197,17 +197,44 @@ fn deserialize_array_data( .map_err(|e| e.into()) } -#[derive(Clone)] +#[derive(Serialize, Deserialize, Clone)] pub struct ArrayContainer { + #[serde(with = "array_ref_serialization")] pub arrays: Vec, } - impl ArrayContainer { fn new(arrays: Vec) -> Self { Self { arrays } } } +mod array_ref_serialization { + use super::*; + use serde::{Deserializer, Serializer}; + + pub fn serialize(arrays: &Vec, serializer: S) -> Result + where + S: Serializer, + { + let serialized_arrays: Vec> = arrays + .iter() + .map(|arr| serialize_array(arr).expect("Failed to serialize array")) + .collect(); + serialized_arrays.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let serialized_arrays: Vec> = Vec::deserialize(deserializer)?; + serialized_arrays + .into_iter() + .map(|bytes| deserialize_array(&bytes).map_err(serde::de::Error::custom)) + .collect() + } +} + #[derive(Serialize, Deserialize)] struct SerializedArrayContainer { serialized_arrays: Vec>, diff --git a/crates/orchestrator/src/orchestrator.rs b/crates/orchestrator/src/orchestrator.rs index 3ada4ea..8529848 100644 --- a/crates/orchestrator/src/orchestrator.rs +++ b/crates/orchestrator/src/orchestrator.rs @@ -1,13 +1,17 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + time::{Instant, SystemTime, UNIX_EPOCH}, +}; use crate::channel_manager::{create_channel, get_sender, take_receiver}; use crossbeam::channel; -use log::{debug, info}; +use log::debug; +use tokio::sync::watch; #[derive(Clone, Debug)] pub enum OrchestrationMessage { RegisterStream(String), - CheckpointBarrier(String), + CheckpointBarrier(u128), CheckpointComplete(String), } @@ -16,23 +20,63 @@ pub struct Orchestrator { senders: HashMap>, } -pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG +pub const SHOULD_CHECKPOINT: bool = true; // THIS WILL BE MOVED INTO CONFIG +/** + * 1. Keep track of checkpoint per source. + * 2. Tell each downstream which checkpoints it needs to know. + * 3. Send a checkpoint request every time you have a new barrier + * 4. Figure out if the checkpoint was complete. + */ impl Orchestrator { - pub fn run(&mut self, managed_tasks: usize) { - info!("Orchestrator started."); - create_channel("orchestrator", managed_tasks); + pub fn run(&mut self, managed_tasks: usize, shutdown_rx: watch::Receiver) { + debug!("Orchestrator started."); + create_channel("orchestrator", managed_tasks); // Currently we are going to use unbounded channels let receiver = take_receiver("orchestrator"); + let mut last_checkpoint = Instant::now(); loop { - let msg: OrchestrationMessage = receiver.as_ref().unwrap().recv().unwrap(); - match msg { - OrchestrationMessage::RegisterStream(stream_id) => { - debug!("registering stream {}", stream_id); - let sender = get_sender(&stream_id).unwrap(); - self.senders.insert(stream_id, sender); + // Check if shutdown signal has been received + if shutdown_rx.has_changed().unwrap_or(false) && *shutdown_rx.borrow() { + debug!("Shutdown signal received. Exiting orchestrator..."); + break; + } + + if !receiver.as_ref().unwrap().is_empty() { + let msg: OrchestrationMessage = receiver.as_ref().unwrap().recv().unwrap(); + match msg { + OrchestrationMessage::RegisterStream(stream_id) => { + debug!("registering stream {}", stream_id); + let sender = get_sender(&stream_id).unwrap(); + self.senders.insert(stream_id, sender); + } + OrchestrationMessage::CheckpointBarrier(_) => todo!(), + OrchestrationMessage::CheckpointComplete(_) => todo!(), + } + } + + let time_now = Instant::now(); + let diff = time_now - last_checkpoint; + + if diff.as_millis() >= 10_000 { + let epoch_ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + + for (stream_id, sender) in self.senders.iter() { + match sender.try_send(OrchestrationMessage::CheckpointBarrier(epoch_ts)) { + Ok(_) => continue, + Err(_) => log::error!( + "Error in sending checkpoint barrier to stream {}", + stream_id + ), + } } - OrchestrationMessage::CheckpointBarrier(_) => todo!(), - OrchestrationMessage::CheckpointComplete(_) => todo!(), + last_checkpoint = time_now; + debug!( + "completed sending checkpoint barrier for {:?}", + last_checkpoint + ) } } } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 2af54c7..2c1a95d 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -9,7 +9,7 @@ denormalized = { workspace = true } datafusion = { workspace = true } -arrow = { workspace = true, features = ["prettyprint"] } +arrow = { workspace = true, features = ["prettyprint"] } arrow-schema = { workspace = true } arrow-array = { workspace = true } tracing = { workspace = true } @@ -18,9 +18,15 @@ tracing-log = { workspace = true } tracing-subscriber = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } +tokio = { workspace = true, features = [ + "rt-multi-thread", + "parking_lot", + "full", + "tracing", +] } tempfile = { version = "3" } rdkafka = { workspace = true } rand = "0.8.5" log = { workspace = true } env_logger = "0.11.5" +console-subscriber = "0.4.0" diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 749a2d4..af43e43 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -27,7 +27,18 @@ async fn main() -> Result<()> { let producer = producer.clone(); tasks.spawn(async move { - let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"]; + let sensors = [ + "sensor_0", + "sensor_1", + "sensor_2", + "sensor_3", + "sensor_4", + "sensor_10", + "sensor_11", + "sensor_12", + "sensor_13", + "sensor_14", + ]; loop { let sensor_name = sensors.choose(&mut rand::thread_rng()).unwrap().to_string(); diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 27481e8..60c3221 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use datafusion::functions_aggregate::average::avg; use datafusion::functions_aggregate::count::count; use datafusion::functions_aggregate::expr_fn::{max, min}; use datafusion::logical_expr::{col, lit}; @@ -15,11 +14,15 @@ use denormalized_examples::get_sample_json; #[tokio::main] async fn main() -> Result<()> { env_logger::builder() - .filter_level(log::LevelFilter::Info) + .filter_level(log::LevelFilter::Debug) .init(); - let ctx = Context::new()?; - let mut topic_builder = KafkaTopicBuilder::new("localhost:9092".to_string()); + let bootstrap_servers = String::from("localhost:9092"); + + let ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + .await; + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic let source_topic = topic_builder @@ -41,7 +44,7 @@ async fn main() -> Result<()> { count(col("reading")).alias("count"), min(col("reading")).alias("min"), max(col("reading")).alias("max"), - avg(col("reading")).alias("average"), + //avg(col("reading")).alias("average"), ], Duration::from_millis(1_000), // aggregate every 1 second None, // None means tumbling window