From 627c93f7a16034357e5df81f759d32ecf6294c72 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Mon, 30 Oct 2023 18:16:39 +0100 Subject: [PATCH] feat: remote client --- Cargo.lock | 553 +++++++++++++++++++++++++++++++- Cargo.toml | 5 +- README.md | 2 + proto/interface.rs | 8 +- proto/kv-connect.md | 9 +- proto/schema/datapath.proto | 4 +- remote/Cargo.toml | 29 ++ remote/lib.rs | 611 ++++++++++++++++++++++++++++++++++++ 8 files changed, 1209 insertions(+), 12 deletions(-) create mode 100644 remote/Cargo.toml create mode 100644 remote/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 57ba834..5af058d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,6 +96,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" + [[package]] name = "bitflags" version = "1.3.2" @@ -150,6 +156,16 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -170,6 +186,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "denokv_remote" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "chrono", + "denokv_proto", + "futures", + "log", + "num-bigint", + "prost", + "rand", + "reqwest", + "serde", + "serde_json", + "tokio", + "url", + "uuid", +] + [[package]] name = "denokv_sqlite" version = "0.1.0" @@ -194,6 +232,15 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -234,6 +281,36 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "form_urlencoded" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.28" @@ -340,6 +417,31 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +[[package]] +name = "h2" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap 1.9.3", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.1" @@ -356,7 +458,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown", + "hashbrown 0.14.1", ] [[package]] @@ -380,6 +482,77 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "http" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "0.14.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.4.10", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -403,6 +576,26 @@ dependencies = [ "cc", ] +[[package]] +name = "idna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.0.2" @@ -410,9 +603,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.1", ] +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + [[package]] name = "itertools" version = "0.10.5" @@ -488,6 +687,12 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -514,6 +719,24 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -569,6 +792,50 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "openssl" +version = "0.10.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -592,6 +859,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "percent-encoding" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" + [[package]] name = "petgraph" version = "0.6.4" @@ -599,7 +872,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 2.0.2", ] [[package]] @@ -776,6 +1049,44 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "reqwest" +version = "0.11.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "rusqlite" version = "0.29.0" @@ -815,12 +1126,44 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "schannel" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" +dependencies = [ + "windows-sys", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.189" @@ -852,6 +1195,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -876,6 +1231,16 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.4" @@ -908,6 +1273,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.8.0" @@ -921,6 +1307,21 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.33.0" @@ -935,7 +1336,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.4", "tokio-macros", "windows-sys", ] @@ -951,12 +1352,93 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "try-lock" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" + +[[package]] +name = "unicode-bidi" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "url" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + [[package]] name = "uuid" version = "1.4.1" @@ -979,6 +1461,15 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1010,6 +1501,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.87" @@ -1039,6 +1542,16 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "web-sys" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "which" version = "4.4.2" @@ -1051,6 +1564,28 @@ dependencies = [ "rustix", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.51.1" @@ -1125,3 +1660,13 @@ name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys", +] diff --git a/Cargo.toml b/Cargo.toml index 0d4e485..bf65c1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["proto", "sqlite"] +members = ["proto", "remote", "sqlite"] resolver = "2" [workspace.package] @@ -25,3 +25,6 @@ serde = { version = "1", features = ["derive"] } serde_json = "1.0.107" tokio = { version = "1.33.0", features = ["full"] } uuid = { version = "1.4.1", features = ["v4", "serde"] } +url = "2" +reqwest = { version = "0.11", features = ["json"] } +bytes = "1" diff --git a/README.md b/README.md index 83179bd..1b5bbff 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ This repository contains two crates: - `denokv_proto` (`/proto`): Shared interfaces backing KV, like definitions of `Key`, `Database`, and `Value`. - `denokv_sqlite` (`/sqlite`): An implementation of `Database` backed by SQLite. +- `denokv_remote` (`/remote`): An implementation of `Database` backed by a + remote KV database, acessible via the KV Connect protocol. These crates are used by the `deno_kv` crate in the Deno repository to provide a JavaScript API for interacting with Deno KV. diff --git a/proto/interface.rs b/proto/interface.rs index 5259488..6ca570f 100644 --- a/proto/interface.rs +++ b/proto/interface.rs @@ -330,10 +330,16 @@ pub struct CommitResult { pub versionstamp: Versionstamp, } +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MetadataExchangeRequest { + #[serde(default)] + pub supported_versions: Vec, +} + /// The database metadata that is returned by the KV Connect metadata endpoint. #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -#[allow(dead_code)] pub struct DatabaseMetadata { pub version: u64, pub database_id: Uuid, diff --git a/proto/kv-connect.md b/proto/kv-connect.md index 3458f1f..d5e4906 100644 --- a/proto/kv-connect.md +++ b/proto/kv-connect.md @@ -103,10 +103,11 @@ exponential backoff strategy with unlimited retries. The client MUST verify that the response is a 200 OK, 3xx class redirect response, or 4xx class HTTP status response. If the response is a 3xx class redirect response, the client MUST follow the redirect and perform the metadata -exchange with the new URL as the base URL. If the response is a 4xx class HTTP -status response, the client MUST fatally abort the metadata exchange and display -the error message to the user. If the response is a 200 OK response, the client -MUST verify that the response body adheres to the JSON schema defined in the +exchange with the new URL as the base URL (the client MAY impose a maximum +redirect depth). If the response is a 4xx class HTTP status response, the client +MUST fatally abort the metadata exchange and display the error message to the +user. If the response is a 200 OK response, the client MUST verify that the +response body adheres to the JSON schema defined in the `schema/kv-metadata-exchange-response.v.json` file. If the response body is invalid, the client MUST fatally abort the metadata exchange and display the error message to the user. diff --git a/proto/schema/datapath.proto b/proto/schema/datapath.proto index 2b02edb..38ac361 100644 --- a/proto/schema/datapath.proto +++ b/proto/schema/datapath.proto @@ -113,8 +113,8 @@ enum MutationType { M_UNSPECIFIED = 0; // Set the value. M_SET = 1; - // Clear the value. - M_CLEAR = 2; + // Delete the value. + M_DELETE = 2; // Sum the stored value with the new value. Both values must be LE64 encoded. M_SUM = 3; // Min the stored value with the new value. Both values must be LE64 encoded. diff --git a/remote/Cargo.toml b/remote/Cargo.toml new file mode 100644 index 0000000..48dfb51 --- /dev/null +++ b/remote/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "denokv_remote" +description = "Remote (KV Connect) backend for Deno KV" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true +authors.workspace = true + +[lib] +path = "lib.rs" + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +chrono.workspace = true +denokv_proto.workspace = true +futures.workspace = true +log.workspace = true +num-bigint.workspace = true +rand.workspace = true +serde_json.workspace = true +tokio.workspace = true +uuid.workspace = true +url.workspace = true +reqwest.workspace = true +serde.workspace = true +prost.workspace = true +bytes.workspace = true diff --git a/remote/lib.rs b/remote/lib.rs new file mode 100644 index 0000000..0989298 --- /dev/null +++ b/remote/lib.rs @@ -0,0 +1,611 @@ +use std::ops::Sub; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use async_trait::async_trait; +use bytes::Bytes; +use chrono::DateTime; +use chrono::Utc; +use denokv_proto::decode_value; +use denokv_proto::AtomicWrite; +use denokv_proto::CommitResult; +use denokv_proto::Consistency; +use denokv_proto::Database; +use denokv_proto::DatabaseMetadata; +use denokv_proto::KvEntry; +use denokv_proto::KvValue; +use denokv_proto::MetadataExchangeRequest; +use denokv_proto::QueueMessageHandle; +use denokv_proto::ReadRange; +use denokv_proto::ReadRangeOutput; +use denokv_proto::SnapshotReadOptions; +use log::error; +use rand::Rng; +use reqwest::StatusCode; +use serde::Deserialize; +use tokio::sync::watch; +use tokio::task::JoinHandle; +use url::Url; +use uuid::Uuid; + +use denokv_proto::datapath as pb; + +pub struct MetadataEndpoint { + pub url: Url, + pub access_token: String, +} + +enum ProtocolVersion { + V1, + V2, +} + +#[derive(PartialEq, Eq)] +enum DataPathConsistency { + Strong, + Eventual, +} + +struct DataPathEndpoint { + url: Url, + consistency: DataPathConsistency, +} + +struct Metadata { + version: ProtocolVersion, + database_id: Uuid, + endpoints: Vec, + token: String, + expires_at: DateTime, +} + +#[derive(Clone)] +enum MetadataState { + Pending, + Ok(Arc), + Error(Arc), +} + +pub trait RemotePermissions { + fn check_net_url(&self, url: &Url) -> Result<(), anyhow::Error>; +} + +enum RetryableResult { + Ok(T), + Retry, + Err(E), +} + +pub struct Remote { + permissions: P, + client: reqwest::Client, + metadata_refresher: JoinHandle<()>, + metadata: watch::Receiver, +} + +impl Remote

{ + pub fn new( + client: reqwest::Client, + permissions: P, + metadata_endpoint: MetadataEndpoint, + ) -> Self { + let (tx, rx) = watch::channel(MetadataState::Pending); + let metadata_refresher = tokio::spawn(metadata_refresh_task( + client.clone(), + metadata_endpoint, + tx, + )); + Self { + client, + permissions, + metadata_refresher, + metadata: rx, + } + } + + pub async fn call_data< + Req: prost::Message, + Resp: prost::Message + Default, + >( + &self, + method: &'static str, + req: Req, + ) -> Result { + let attempt = 0; + let req_body = Bytes::from(req.encode_to_vec()); + loop { + let metadata = loop { + let mut metadata_rx = self.metadata.clone(); + match &*metadata_rx.borrow() { + MetadataState::Pending => {} + MetadataState::Ok(metadata) => break metadata.clone(), + MetadataState::Error(e) => { + return Err(anyhow::anyhow!("{}", e)); + } + }; + if metadata_rx.changed().await.is_err() { + return Err(anyhow::anyhow!("Database is closed.")); + } + }; + + let endpoint = match metadata + .endpoints + .iter() + .find(|endpoint| endpoint.consistency == DataPathConsistency::Strong) + { + Some(endpoint) => endpoint, + None => { + return Err(anyhow::anyhow!( + "No strong consistency endpoints available." + )); + } + }; + + let url = Url::parse(&format!("{}/{}", endpoint.url, method))?; + self.permissions.check_net_url(&url)?; + + let req = self + .client + .post(url.clone()) + .body(req_body.clone()) + .bearer_auth(&metadata.token); + + let req = match metadata.version { + ProtocolVersion::V1 => req + .header("x-transaction-domain-id", metadata.database_id.to_string()), + ProtocolVersion::V2 => req + .header("x-denokv-database-id", metadata.database_id.to_string()) + .header("x-denokv-version", "2"), + }; + + let resp = match req.send().await { + Ok(resp) if resp.status() == StatusCode::OK => resp, + Ok(resp) if resp.status().is_server_error() => { + let status = resp.status(); + let body = resp.text().await.unwrap_or_else(|_| String::new()); + error!( + "KV Connect failed to call '{}' (status={}): {}", + url, status, body + ); + randomized_exponential_backoff(Duration::from_secs(5), attempt).await; + continue; + } + Ok(resp) => { + let status = resp.status(); + let body = resp.text().await.unwrap_or_else(|_| String::new()); + return Err(anyhow::anyhow!( + "KV Connect failed to call '{}' (status={}): {}", + url, + status, + body + )); + } + Err(err) => { + error!("KV Connect failed to call '{}': {}", url, err); + randomized_exponential_backoff(Duration::from_secs(5), attempt).await; + continue; + } + }; + + let resp_body = match resp.bytes().await { + Ok(resp_body) => resp_body, + Err(err) => { + return Err(anyhow::anyhow!( + "KV Connect failed to read response body: {}", + err + )); + } + }; + + let resp = Resp::decode(resp_body) + .context("KV Connect failed to decode response")?; + + return Ok(resp); + } + } +} + +impl Drop for Remote

{ + fn drop(&mut self) { + self.metadata_refresher.abort(); + } +} + +async fn randomized_exponential_backoff(base: Duration, attempt: u64) { + let attempt = attempt.min(12); + let delay = base.as_millis() as u64 + (2 << attempt); + let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1); + tokio::time::sleep(std::time::Duration::from_millis(delay)).await; +} + +async fn metadata_refresh_task( + client: reqwest::Client, + metadata_endpoint: MetadataEndpoint, + tx: watch::Sender, +) { + let mut attempts = 0; + loop { + let res = fetch_metadata(&client, &metadata_endpoint).await; + match res { + RetryableResult::Ok(metadata) => { + attempts = 0; + let expires_in = metadata.expires_at.signed_duration_since(Utc::now()); + + if tx.send(MetadataState::Ok(Arc::new(metadata))).is_err() { + // The receiver has been dropped, so we can stop now. + return; + } + + // Sleep until the token expires, minus a 10 minute buffer, but at minimum + // one minute. + let sleep_time = expires_in + .sub(chrono::Duration::seconds(10)) + .to_std() + .unwrap_or_default() + .min(Duration::from_secs(60)); + + tokio::time::sleep(sleep_time).await; + } + RetryableResult::Retry => { + attempts += 1; + if tx.is_closed() { + // The receiver has been dropped, so we can stop now. + return; + } + randomized_exponential_backoff(Duration::from_secs(5), attempts).await; + } + RetryableResult::Err(err) => { + attempts += 1; + if tx.send(MetadataState::Error(Arc::new(err))).is_err() { + // The receiver has been dropped, so we can stop now. + return; + } + randomized_exponential_backoff(Duration::from_secs(5), attempts).await; + } + } + } +} + +async fn fetch_metadata( + client: &reqwest::Client, + metadata_endpoint: &MetadataEndpoint, +) -> RetryableResult { + let res = match client + .post(metadata_endpoint.url.clone()) + .header( + "authorization", + format!("Bearer {}", metadata_endpoint.access_token), + ) + .json(&MetadataExchangeRequest { + supported_versions: vec![1, 2], + }) + .send() + .await + { + Ok(res) => res, + Err(err) => { + error!( + "KV Connect to '{}' failed to fetch metadata: {}", + metadata_endpoint.url, err + ); + return RetryableResult::Retry; + } + }; + + let res = match res.status() { + StatusCode::OK => res, + status if status.is_client_error() => { + let body = res.text().await.unwrap_or_else(|_| String::new()); + return RetryableResult::Err(format!( + "Failed to fetch metadata: {}", + body + )); + } + status if status.is_server_error() => { + let body = res.text().await.unwrap_or_else(|_| String::new()); + error!( + "KV Connect to '{}' failed to fetch metadata (status={}): {}", + metadata_endpoint.url, status, body + ); + return RetryableResult::Retry; + } + status => { + return RetryableResult::Err(format!( + "Failed to fetch metadata (status={})", + status + )); + } + }; + + let base_url = res.url().clone(); + + let body = match res.text().await { + Ok(body) => body, + Err(err) => { + return RetryableResult::Err(format!( + "Metadata response body invalid: {}", + err + )); + } + }; + + let metadata = match parse_metadata(&base_url, &body) { + Ok(metadata) => metadata, + Err(err) => { + return RetryableResult::Err(format!( + "Failed to parse metadata: {}", + err + )); + } + }; + + RetryableResult::Ok(metadata) +} + +fn parse_metadata(base_url: &Url, body: &str) -> Result { + #[derive(Deserialize)] + struct Version { + version: u64, + } + + let version: Version = match serde_json::from_str(body) { + Ok(metadata) => metadata, + Err(err) => { + return Err(format!("Could not parse out 'version' field: {}", err)); + } + }; + + let version = match version.version { + 1 => ProtocolVersion::V1, + 2 => ProtocolVersion::V2, + version => { + return Err(format!("unsupported metadata version: {}", version)); + } + }; + + // V1 and V2 have the same shape + let metadata: DatabaseMetadata = match serde_json::from_str(body) { + Ok(metadata) => metadata, + Err(err) => { + return Err(format!("could not parse metadata: {}", err)); + } + }; + + let mut endpoints = Vec::new(); + for endpoint in metadata.endpoints { + let url = match version { + ProtocolVersion::V1 => Url::parse(&endpoint.url), + ProtocolVersion::V2 => { + Url::options().base_url(Some(base_url)).parse(&endpoint.url) + } + } + .map_err(|err| format!("invalid endpoint URL: {}", err))?; + + if endpoint.url.ends_with('/') { + return Err(format!("endpoint URL must not end with '/': {}", url)); + } + + let consistency = match &*endpoint.consistency { + "strong" => DataPathConsistency::Strong, + "eventual" => DataPathConsistency::Eventual, + consistency => { + return Err(format!("unsupported consistency level: {}", consistency)); + } + }; + + endpoints.push(DataPathEndpoint { url, consistency }); + } + + Ok(Metadata { + version, + endpoints, + database_id: metadata.database_id, + token: metadata.token.into_owned(), + expires_at: metadata.expires_at, + }) +} + +#[async_trait(?Send)] +impl Database for Remote

{ + type QMH = DummyQueueMessageHandle; + + async fn snapshot_read( + &self, + requests: Vec, + options: SnapshotReadOptions, + ) -> Result, anyhow::Error> { + let ranges = requests + .into_iter() + .map(|r| pb::ReadRange { + start: r.start, + end: r.end, + limit: r.limit.get() as _, + reverse: r.reverse, + }) + .collect(); + let req = pb::SnapshotRead { ranges }; + + let res: pb::SnapshotReadOutput = + self.call_data("snapshot_read", req).await?; + + if res.read_disabled { + // TODO: this should result in a retry after a forced metadata refresh. + return Err(anyhow::anyhow!("Reads are disabled for this database.")); + } + + if !res.read_is_strongly_consistent + && options.consistency == Consistency::Strong + { + // TODO: this should result in a retry after a forced metadata refresh. + return Err(anyhow::anyhow!( + "Strong consistency reads are not available for this database." + )); + } + + let ranges = res + .ranges + .into_iter() + .map(|r| { + Ok(ReadRangeOutput { + entries: r + .values + .into_iter() + .map(|e| { + Ok(KvEntry { + key: e.key, + value: decode_value(e.value, e.encoding as i64).ok_or_else( + || anyhow::anyhow!("Unknown encoding {}", e.encoding), + )?, + versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?, + }) + }) + .collect::>()?, + }) + }) + .collect::>()?; + + Ok(ranges) + } + + async fn atomic_write( + &self, + write: AtomicWrite, + ) -> Result, anyhow::Error> { + if !write.enqueues.is_empty() { + return Err(anyhow::anyhow!( + "Enqueue operations are not supported in KV Connect.", + )); + } + + let mut checks = Vec::new(); + for check in write.checks { + checks.push(pb::Check { + key: check.key, + versionstamp: check + .versionstamp + .map(|v| v.to_vec()) + .unwrap_or_default(), + }); + } + + let mut mutations = Vec::new(); + for mutation in write.mutations { + let expire_at_ms = mutation + .expire_at + .and_then(|t| { + let ts = t.timestamp_millis(); + if ts <= 0 { + Some(1) + } else { + Some(ts) + } + }) + .unwrap_or(0); + match mutation.kind { + denokv_proto::MutationKind::Set(value) => { + mutations.push(pb::Mutation { + key: mutation.key, + value: Some(encode_value_to_pb(value)), + mutation_type: pb::MutationType::MSet as _, + expire_at_ms, + }); + } + denokv_proto::MutationKind::Delete => { + mutations.push(pb::Mutation { + key: mutation.key, + value: Some(encode_value_to_pb(KvValue::Bytes(vec![]))), + mutation_type: pb::MutationType::MDelete as _, + expire_at_ms, + }); + } + denokv_proto::MutationKind::Sum(value) => { + mutations.push(pb::Mutation { + key: mutation.key, + value: Some(encode_value_to_pb(value)), + mutation_type: pb::MutationType::MSum as _, + expire_at_ms, + }); + } + denokv_proto::MutationKind::Max(value) => { + mutations.push(pb::Mutation { + key: mutation.key, + value: Some(encode_value_to_pb(value)), + mutation_type: pb::MutationType::MMax as _, + expire_at_ms, + }); + } + denokv_proto::MutationKind::Min(value) => { + mutations.push(pb::Mutation { + key: mutation.key, + value: Some(encode_value_to_pb(value)), + mutation_type: pb::MutationType::MMin as _, + expire_at_ms, + }); + } + } + } + + assert!(write.enqueues.is_empty()); + + let req = pb::AtomicWrite { + checks, + mutations, + enqueues: Vec::new(), + }; + + let res: pb::AtomicWriteOutput = + self.call_data("atomic_write", req).await?; + + match res.status() { + pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult { + versionstamp: <[u8; 10]>::try_from(&res.versionstamp[..])?, + })), + pb::AtomicWriteStatus::AwCheckFailure => Ok(None), + pb::AtomicWriteStatus::AwWriteDisabled => { + Err(anyhow::anyhow!("Writes are disabled for this database.")) + } + pb::AtomicWriteStatus::AwUnspecified => { + Err(anyhow::anyhow!("Unspecified write error.")) + } + } + } + + async fn dequeue_next_message( + &self, + ) -> Result, anyhow::Error> { + unimplemented!() + } + + fn close(&self) {} +} + +pub struct DummyQueueMessageHandle {} + +#[async_trait(?Send)] +impl QueueMessageHandle for DummyQueueMessageHandle { + async fn take_payload(&mut self) -> Result, anyhow::Error> { + unimplemented!() + } + + async fn finish(&self, _success: bool) -> Result<(), anyhow::Error> { + unimplemented!() + } +} + +fn encode_value_to_pb(value: KvValue) -> pb::KvValue { + match value { + KvValue::V8(data) => pb::KvValue { + encoding: pb::ValueEncoding::VeV8 as _, + data, + }, + KvValue::Bytes(data) => pb::KvValue { + encoding: pb::ValueEncoding::VeBytes as _, + data, + }, + KvValue::U64(x) => pb::KvValue { + data: x.to_le_bytes().to_vec(), + encoding: pb::ValueEncoding::VeLe64 as _, + }, + } +}