From eb78ff8badb435aa7af4f5f3b0eab929ac70148e Mon Sep 17 00:00:00 2001 From: Oleh Martsokha Date: Mon, 9 Dec 2024 06:38:08 +0100 Subject: [PATCH] feat(all): server 2/n --- .github/dependabot.yaml | 28 ++--- .github/workflows/build.yaml | 47 ++++++++ .gitignore | 2 +- Cargo.toml | 33 +++--- crates/cli/Cargo.toml | 34 +++--- crates/cli/config/load_yaml.rs | 7 -- crates/cli/config/mod.rs | 9 +- crates/cli/main.rs | 13 ++- crates/cli/middleware/mod.rs | 41 +++++++ crates/cli/server/mod.rs | 2 +- crates/cli/server/run_default.rs | 2 +- crates/cli/server/run_secure.rs | 2 +- crates/cli/server/serv_config.rs | 11 +- crates/database/Cargo.toml | 19 ++-- crates/database/Makefile | 31 ------ crates/database/README.md | 27 +++-- crates/database/build.rs | 16 --- crates/database/config/custom_hooks.rs | 55 +++++++++ crates/database/config/mod.rs | 105 ++++++++++++++++++ crates/database/config/pool_configs.rs | 76 +++++++++++++ crates/database/connect/constraints.rs | 46 -------- crates/database/connect/mod.rs | 81 -------------- crates/database/connect/pool_configs.rs | 60 ---------- crates/database/lib.rs | 65 +++++++---- crates/database/migrate/mod.rs | 66 ----------- .../account_actions.rs} | 0 .../account_permissions.rs} | 0 .../query/account_sessions.rs} | 0 crates/database/query/accounts.rs | 1 + crates/database/query/mod.rs | 15 +++ crates/database/query/project_invites.rs | 1 + crates/database/query/project_members.rs | 1 + crates/database/query/project_webhooks.rs | 1 + crates/database/query/projects.rs | 1 + crates/database/query/workflow_executions.rs | 1 + crates/database/query/workflow_schedules.rs | 1 + crates/database/query/workflow_webhooks.rs | 1 + crates/database/query/workflows.rs | 1 + crates/graph/Cargo.toml | 26 +++-- crates/graph/lib.rs | 2 + crates/graph/outputs/task_registry.rs | 8 +- crates/graph/worker/mod.rs | 1 + crates/runtime/Cargo.toml | 18 +-- crates/runtime/_/client/client_builder.rs | 32 ++++++ crates/runtime/_/client/mod.rs | 48 ++++++++ .../runtime/{ => _}/client/task_registry.rs | 0 .../runtime/_/runtime/connection_instance.rs | 65 +++++++++++ .../{ => _}/runtime/connection_metadata.rs | 0 crates/runtime/_/runtime/cycle_condition.rs | 1 + crates/runtime/{ => _}/runtime/mod.rs | 0 .../{ => _}/runtime/runtime_manager.rs | 14 ++- crates/runtime/client/client_builder.rs | 23 ++-- crates/runtime/client/mod.rs | 76 +++++++------ crates/runtime/client/recycle_method.rs | 1 + crates/runtime/config/custom_hooks.rs | 36 ++++++ crates/runtime/config/mod.rs | 50 +++++++++ crates/runtime/config/pool_manager.rs | 65 +++++++++++ crates/runtime/config/runtime_config.rs | 58 ++++++++++ crates/runtime/lib.rs | 43 +++++-- crates/runtime/protobuf/runtime.proto | 10 -- crates/runtime/runtime/connection_instance.rs | 51 --------- crates/server/Cargo.toml | 15 ++- crates/server/extract/auth_state.rs | 6 +- crates/server/extract/auth_token.rs | 30 ++++- .../server/{service => extract}/conn_info.rs | 4 +- crates/server/extract/mod.rs | 2 + crates/server/handler/accounts/accounts.rs | 8 +- crates/server/handler/accounts/auth.rs | 14 +-- crates/server/handler/projects/projects.rs | 12 +- crates/server/middleware/error_handling.rs | 20 +++- crates/server/middleware/mod.rs | 13 +-- crates/server/middleware/observability.rs | 62 +++-------- crates/server/service/app_config.rs | 14 ++- crates/server/service/app_hashing.rs | 52 --------- crates/server/service/argon2_hasher.rs | 66 +++++++++++ crates/server/service/database_err.rs | 11 -- crates/server/service/graph_queue.rs | 51 +++++++++ crates/server/service/mod.rs | 80 +++++++------ crates/server/service/scheduler.rs | 56 ++++++++++ docker-compose.yaml | 62 +++++------ 80 files changed, 1316 insertions(+), 792 deletions(-) create mode 100644 .github/workflows/build.yaml delete mode 100644 crates/cli/config/load_yaml.rs delete mode 100644 crates/database/Makefile delete mode 100644 crates/database/build.rs create mode 100644 crates/database/config/custom_hooks.rs create mode 100644 crates/database/config/mod.rs create mode 100644 crates/database/config/pool_configs.rs delete mode 100644 crates/database/connect/constraints.rs delete mode 100644 crates/database/connect/mod.rs delete mode 100644 crates/database/connect/pool_configs.rs delete mode 100644 crates/database/migrate/mod.rs rename crates/database/{connect/custom_hooks.rs => query/account_actions.rs} (100%) rename crates/database/{connect/with_tracing.rs => query/account_permissions.rs} (100%) rename crates/{runtime/runtime/cycle_condition.rs => database/query/account_sessions.rs} (100%) create mode 100644 crates/database/query/accounts.rs create mode 100644 crates/database/query/mod.rs create mode 100644 crates/database/query/project_invites.rs create mode 100644 crates/database/query/project_members.rs create mode 100644 crates/database/query/project_webhooks.rs create mode 100644 crates/database/query/projects.rs create mode 100644 crates/database/query/workflow_executions.rs create mode 100644 crates/database/query/workflow_schedules.rs create mode 100644 crates/database/query/workflow_webhooks.rs create mode 100644 crates/database/query/workflows.rs create mode 100644 crates/runtime/_/client/client_builder.rs create mode 100644 crates/runtime/_/client/mod.rs rename crates/runtime/{ => _}/client/task_registry.rs (100%) create mode 100644 crates/runtime/_/runtime/connection_instance.rs rename crates/runtime/{ => _}/runtime/connection_metadata.rs (100%) create mode 100644 crates/runtime/_/runtime/cycle_condition.rs rename crates/runtime/{ => _}/runtime/mod.rs (100%) rename crates/runtime/{ => _}/runtime/runtime_manager.rs (83%) create mode 100644 crates/runtime/client/recycle_method.rs create mode 100644 crates/runtime/config/custom_hooks.rs create mode 100644 crates/runtime/config/mod.rs create mode 100644 crates/runtime/config/pool_manager.rs create mode 100644 crates/runtime/config/runtime_config.rs delete mode 100644 crates/runtime/protobuf/runtime.proto delete mode 100644 crates/runtime/runtime/connection_instance.rs rename crates/server/{service => extract}/conn_info.rs (93%) delete mode 100644 crates/server/service/app_hashing.rs create mode 100644 crates/server/service/argon2_hasher.rs delete mode 100644 crates/server/service/database_err.rs create mode 100644 crates/server/service/graph_queue.rs create mode 100644 crates/server/service/scheduler.rs diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml index f6ece42..171971d 100644 --- a/.github/dependabot.yaml +++ b/.github/dependabot.yaml @@ -1,18 +1,18 @@ version: 2 updates: - - package-ecosystem: "cargo" - directory: "/" - schedule: - interval: "weekly" - timezone: "Europe/Warsaw" - day: "friday" - time: "18:00" + - package-ecosystem: "cargo" + directory: "/" + schedule: + interval: "weekly" + timezone: "Europe/Warsaw" + day: "friday" + time: "18:00" - - package-ecosystem: "npm" - directory: "/" - schedule: - interval: "weekly" - timezone: "Europe/Warsaw" - day: "friday" - time: "18:00" + - package-ecosystem: "npm" + directory: "/" + schedule: + interval: "weekly" + timezone: "Europe/Warsaw" + day: "friday" + time: "18:00" diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..8cf358c --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,47 @@ +name: ci & cd + +on: + push: + branches: + - "main" # Trigger on main branch. + tags: + - "v*.*.*" # Trigger on semantic version tags. + pull_request: # Validation only (without pushing). + +jobs: + build: + runs-on: ubuntu-22.04 + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Rust + uses: dtolnay/rust-toolchain@master + with: + toolchain: stable + + - name: Run Cargo:fmt + run: cargo +nightly fmt --all -- --check + + - name: Run Cargo:clippy + run: cargo clippy --all-features -- -D warnings + + - name: Run Cargo:test + run: cargo test --verbose --all-features + + publish: + runs-on: ubuntu-22.04 + if: github.event_name == 'push' + steps: + - name: Check out + uses: actions/checkout@v3 + + - name: Set up Rust + uses: dtolnay/rust-toolchain@master + with: + toolchain: stable + + - name: Publish + run: cargo publish --token ${CRATES_TOKEN} + env: + CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }} diff --git a/.gitignore b/.gitignore index badadd1..99dae4f 100644 --- a/.gitignore +++ b/.gitignore @@ -33,7 +33,7 @@ output/ build/ # Generated -/crates/database/entities/ +crates/runtime/protobuf/ # Binaries *.exe diff --git a/Cargo.toml b/Cargo.toml index 9dc9c8e..8ef368b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,23 @@ repository = "https://github.com/axiston/axiston" homepage = "https://github.com/axiston/axiston" documentation = "https://docs.rs/axiston" +# TODO: Import axiston-database-migrate from crates.io. +# TODO: Import axiston-database-schema from crates.io. + [workspace.dependencies] +axiston-db-generate = { path = "../database/crates/generate" } +axiston-db-migrate = { path = "../database/crates/migrate" } +axiston-db-connect = { path = "./crates/database", version = "0.1.0" } + +axiston-graph = { path = "./crates/graph", version = "0.1.0" } +axiston-runtime = { path = "./crates/runtime", version = "0.1.0" } +axiston-server = { path = "./crates/server", version = "0.1.0" } + tokio = { version = "1.41", features = ["rt-multi-thread", "macros"] } +deadpool = { version = "0.12", features = ["rt_tokio_1"] } futures = { version = "0.3", features = [] } -async-trait = { version = "0.1.83", features = [] } +thiserror = { version = "2.0", features = [] } +anyhow = { version = "1.0", features = ["backtrace"] } tracing = { version = "0.1", features = [] } tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] } @@ -35,26 +48,20 @@ axum = { version = "0.7", features = ["http2", "macros", "ws"] } axum-server = { version = "0.7.1", features = ["tls-rustls"] } axum-extra = { version = "0.9", features = ["typed-header"] } axum-test = { version = "16.4", features = [] } - -sea-orm = { version = "1.1", features = ["runtime-tokio-rustls", "sqlx-postgres", "macros"] } -sea-orm-migration = { version = "1.1", features = ["runtime-tokio-rustls", "sqlx-postgres"] } -sea-orm-cli = { version = "1.1", features = ["runtime-tokio-rustls"] } - -thiserror = { version = "2.0", features = [] } -anyhow = { version = "1.0", features = ["backtrace"] } +tower = { version = "0.4", features = ["full"] } +tower-http = { version = "0.5", features = ["full"] } derive_more = { version = "1.0", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_with = { version = "3.11", features = [] } +serde_toml = { package = "toml", version = "0.8", features = [] } serde_json = { version = "1.0", features = [] } +ts-rs = { version = "10.1", features = ["uuid-impl"] } bytes = { version = "1.9", features = ["serde"] } uuid = { version = "1.11", features = ["v4", "serde"] } time = { version = "0.3", features = ["serde"] } base64 = { version = "0.22", features = [] } ecow = { version = "0.2", features = ["serde"] } - -axiston-database = { path = "./crates/database", version = "0.1.0" } -axiston-graph = { path = "./crates/graph", version = "0.1.0" } -axiston-runtime = { path = "./crates/runtime", version = "0.1.0" } -axiston-server = { path = "./crates/server", version = "0.1.0" } +cron = { version = "0.13", features = [] } +ipnet = { version = "2.10", features = ["serde"] } diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 5c4af78..924df4b 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -23,6 +23,7 @@ default = [] # - Enables HTTPS support with the axum-server crate. # - Spawns a server that redirects incoming HTTP requests. support-https = ["dep:axum-server"] + # - Builds the graph editor app into the app directory. # - Enables service that serves files from app directory. support-files = [] @@ -31,26 +32,23 @@ support-files = [] axiston-server = { workspace = true } clap = { version = "4.5", features = ["derive"] } -tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "signal"] } -anyhow = { version = "1.0", features = ["backtrace"] } - -serde = { version = "1.0", features = ["derive"] } -serde_with = { version = "3.11", features = [] } -serde_toml = { package = "toml", version = "0.8", features = [] } -serde_json = { version = "1.0", features = [] } -countio = { version = "0.2", features = [] } +tokio = { workspace = true } +anyhow = { workspace = true } -tracing = { version = "0.1", features = [] } -tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] } -tracing-opentelemetry = { version = "0.27.0", features = [] } -opentelemetry = { version = "0.26.0", features = [] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +tracing-opentelemetry = { workspace = true } +opentelemetry = { workspace = true } -axum = { version = "0.7", features = ["http2", "macros", "ws"] } -axum-server = { version = "0.7", optional = true, features = ["tls-rustls"] } -axum-extra = { version = "0.9", features = ["typed-header"] } +axum = { workspace = true } +axum-server = { workspace = true, optional = true } +axum-extra = { workspace = true } +tower = { workspace = true } +tower-http = { workspace = true } -tower = { version = "0.4", features = ["full"] } -tower-http = { version = "0.5", features = ["full"] } +serde = { workspace = true } +serde_toml = { workspace = true } +serde_json = { workspace = true } [dev-dependencies] -axum-test = { version = "16.3.0", features = [] } +axum-test = { workspace = true } diff --git a/crates/cli/config/load_yaml.rs b/crates/cli/config/load_yaml.rs deleted file mode 100644 index 2c47375..0000000 --- a/crates/cli/config/load_yaml.rs +++ /dev/null @@ -1,7 +0,0 @@ -use std::path::Path; - -use crate::config::Args; - -pub fn load_yaml>(path: P) -> anyhow::Result { - todo!() -} diff --git a/crates/cli/config/mod.rs b/crates/cli/config/mod.rs index a501797..5ccdabc 100644 --- a/crates/cli/config/mod.rs +++ b/crates/cli/config/mod.rs @@ -1,9 +1,8 @@ -//! TODO. +//! Loads and parses configuration files. //! mod load_json; mod load_toml; -mod load_yaml; use std::ffi::OsStr; use std::path::PathBuf; @@ -14,7 +13,6 @@ use serde::Deserialize; use crate::config::load_json::load_json; use crate::config::load_toml::load_toml; -use crate::config::load_yaml::load_yaml; use crate::server::ServerConfig; /// Command-line arguments. @@ -68,7 +66,6 @@ impl Args { Cli::File { path } => match path.extension() { Some(ext) if OsStr::new("toml") == ext => load_toml(path), Some(ext) if OsStr::new("json") == ext => load_json(path), - Some(ext) if OsStr::new("yaml") == ext => load_yaml(path), _ => Err(anyhow::anyhow!("should specify a supported file extension")), }, } @@ -77,8 +74,8 @@ impl Args { /// Returns a new [`AppConfig`]. pub fn build_app_config(&self) -> AppConfig { AppConfig { - database_conn: self.database.clone(), - multiple_gateways: self.multiple, + database: self.database.clone(), + multiple: self.multiple, } } diff --git a/crates/cli/main.rs b/crates/cli/main.rs index 99d1df4..976e94f 100644 --- a/crates/cli/main.rs +++ b/crates/cli/main.rs @@ -7,12 +7,13 @@ mod server; use std::time::Duration; use axiston_server::handler::routes; -use axiston_server::middleware::{initialize_tracing, RouterExt}; -use axiston_server::service::AppState; +use axiston_server::middleware::RouterTracingExt; +use axiston_server::service::{AppState, SchedulerRuntime}; use axum::Router; use clap::Parser; use crate::config::Args; +use crate::middleware::initialize_tracing; use crate::server::run_supported_server; #[tokio::main] @@ -25,15 +26,19 @@ async fn main() -> anyhow::Result<()> { let app_config = args.build_app_config(); let state = AppState::connect(app_config).await?; + let scheduler = SchedulerRuntime::new(state.clone()); + let scheduler_handler = scheduler.run_trigger_loop(); + let app = Router::new() .merge(routes()) - .with_error_handling_layer(Duration::from_secs(60)) - .with_observability_layer() + .with_inner_error_handling_layer(Duration::from_secs(60)) + .with_inner_observability_layer() .with_state(state); // Listen. let server_config = args.build_server_config(); run_supported_server(server_config, app).await?; + let _ = scheduler_handler.await?; Ok(()) } diff --git a/crates/cli/middleware/mod.rs b/crates/cli/middleware/mod.rs index dafbde4..a8fa395 100644 --- a/crates/cli/middleware/mod.rs +++ b/crates/cli/middleware/mod.rs @@ -1 +1,42 @@ // TODO: Move initialize_tracing here + +#[must_use] +fn build_env_filter() -> tracing_subscriber::EnvFilter { + let current = std::env::var("RUST_LOG") + .or_else(|_| std::env::var("OTEL_LOG_LEVEL")) + .unwrap_or_else(|_| "info".to_string()); + + let env = format!("{},server=trace,otel=debug,tower_http=debug", current); + std::env::set_var("RUST_LOG", env); + tracing_subscriber::EnvFilter::from_default_env() +} + +pub async fn initialize_tracing() -> anyhow::Result<()> { + use tracing_subscriber::fmt::layer; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + // Setups a temporary subscriber to log output during setup. + let env_filter = build_env_filter(); + let fmt_layer = layer().pretty(); + let subscriber = tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer); + + let _guard = tracing::subscriber::set_default(subscriber); + tracing::trace!(target: "server:otel", "initialized temporary subscriber"); + + // TODO: Enable OpenTelemetry. + // https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk + + // Setups an actual subscriber. + let env_filter = build_env_filter(); + let fmt_layer = layer().pretty(); + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .init(); + + tracing::trace!(target: "server:otel", "initialized subscriber"); + Ok(()) +} diff --git a/crates/cli/server/mod.rs b/crates/cli/server/mod.rs index aee5c68..5ccb4a3 100644 --- a/crates/cli/server/mod.rs +++ b/crates/cli/server/mod.rs @@ -31,7 +31,7 @@ pub async fn run_supported_server( server_config: ServerConfig, app_router: Router, ) -> anyhow::Result<()> { - let timeout = Duration::from_secs(60); + let timeout = Duration::from_secs(server_config.shutdown); let fut = shutdown_signal(timeout); #[cfg(not(feature = "support-https"))] diff --git a/crates/cli/server/run_default.rs b/crates/cli/server/run_default.rs index 0c861de..0c49c61 100644 --- a/crates/cli/server/run_default.rs +++ b/crates/cli/server/run_default.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::net::{Ipv4Addr, SocketAddr}; -use axiston_server::service::AppConnectInfo; +use axiston_server::extract::AppConnectInfo; use axum::Router; use tokio::net::TcpListener; diff --git a/crates/cli/server/run_secure.rs b/crates/cli/server/run_secure.rs index 3ce50f7..5ae6916 100644 --- a/crates/cli/server/run_secure.rs +++ b/crates/cli/server/run_secure.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::net::{Ipv4Addr, SocketAddr}; -use axiston_server::service::AppConnectInfo; +use axiston_server::extract::AppConnectInfo; use axum::Router; use axum_server::tls_rustls::RustlsConfig; use axum_server::Handle; diff --git a/crates/cli/server/serv_config.rs b/crates/cli/server/serv_config.rs index 1656373..dba2e67 100644 --- a/crates/cli/server/serv_config.rs +++ b/crates/cli/server/serv_config.rs @@ -7,11 +7,12 @@ use std::path::PathBuf; #[must_use = "configs do nothing unless you use them"] #[derive(Debug, Clone)] pub struct ServerConfig { - /// Used by the primary server. + /// Port exposed by the primary server. pub port: u16, + // Shutdown duration in seconds. + pub shutdown: u64, - // TODO: Shutdown duration. - /// Used by the secondary (`http` to `https` redirection) server. + /// Port exposed by the secondary (redirection) server. #[cfg(feature = "support-https")] #[cfg_attr(docsrs, doc(cfg(feature = "support-https")))] pub redirect: u16, @@ -53,6 +54,7 @@ impl Default for ServerConfig { #[derive(Debug, Default, Clone)] pub struct ServerBuilder { pub port: Option, + pub shutdown: Option, #[cfg(feature = "support-https")] #[cfg_attr(docsrs, doc(cfg(feature = "support-https")))] @@ -78,8 +80,9 @@ impl ServerBuilder { pub fn build(self) -> ServerConfig { ServerConfig { port: self.port.unwrap_or(3000), + shutdown: self.shutdown.unwrap_or(10), #[cfg(feature = "support-https")] - redirect: self.port.unwrap_or(3001), + redirect: self.redirect.unwrap_or(3001), #[cfg(feature = "support-https")] cert: self.cert.unwrap_or(PathBuf::from("./cert.pem")), #[cfg(feature = "support-https")] diff --git a/crates/database/Cargo.toml b/crates/database/Cargo.toml index 668dc3e..243a7d5 100644 --- a/crates/database/Cargo.toml +++ b/crates/database/Cargo.toml @@ -1,7 +1,7 @@ # https://doc.rust-lang.org/cargo/reference/manifest.html [package] -name = "axiston-database" +name = "axiston-db-connect" version = { workspace = true } edition = { workspace = true } license = { workspace = true } @@ -21,26 +21,21 @@ rustdoc-args = ["--cfg", "docsrs"] path = "lib.rs" [dependencies] -sea-orm = { workspace = true } -sea-orm-migration = { workspace = true } +axiston-db-generate = { workspace = true } +diesel = { version = "2.2", features = ["time", "uuid", "ipnet-address"] } +diesel-async = { version = "0.5", features = ["postgres", "pool", "deadpool"] } +deadpool = { workspace = true } futures = { workspace = true } tracing = { workspace = true } -async-trait = { workspace = true } -thiserror = { workspace = true } derive_more = { workspace = true } +thiserror = { workspace = true } serde = { workspace = true } -serde_with = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true } +ipnet = { workspace = true } time = { workspace = true } [dev-dependencies] tokio = { workspace = true } - -[build-dependencies] -sea-orm-cli = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } -anyhow = { workspace = true } diff --git a/crates/database/Makefile b/crates/database/Makefile deleted file mode 100644 index 61cd37a..0000000 --- a/crates/database/Makefile +++ /dev/null @@ -1,31 +0,0 @@ -# Makefile for sea-orm-cli entity generation. - -# Variables -SEA_ORM_CLI = sea-orm-cli -DATABASE_URL = postgresql://usr:pwd@localhost:5432/db - -# Default target -.PHONY: generate -generate: - $(SEA_ORM_CLI) generate entity \ - --database-url $(DATABASE_URL) \ - --database-schema public \ - --output-dir ./entity \ - --date-time-crate time \ - --with-serde both \ - --verbose - -# Helper to clean for a next re-run (if necessary). -.PHONY: clean -clean: - rm -rf ./entity - @echo "Clean-up target (if needed)." - -# Optionally, include a run target for demonstration -.PHONY: run -run: generate - @echo "Entity generation complete!" - -.PHONY: install -install: - @echo "Installation complete!" diff --git a/crates/database/README.md b/crates/database/README.md index e4075cd..71ec4f3 100644 --- a/crates/database/README.md +++ b/crates/database/README.md @@ -1,21 +1,20 @@ -### axiston/database +### database/connect -Persistence layer / data-access module for the Axiston gateway. Lorem Ipsum. -Lorem Ipsum. Lorem Ipsum. +[![Build Status][action-badge]][action-url] +[![Crate Docs][docs-badge]][docs-url] +[![Crate Version][crates-badge]][crates-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/database/build.yaml?branch=main&label=build&logo=github&style=flat-square +[action-url]: https://github.com/axiston/database/actions/workflows/build.yaml +[crates-badge]: https://img.shields.io/crates/v/axiston-database-connect.svg?logo=rust&style=flat-square +[crates-url]: https://crates.io/crates/axiston-database-connect +[docs-badge]: https://img.shields.io/docsrs/axiston-database-connect?logo=Docs.rs&style=flat-square +[docs-url]: http://docs.rs/axiston-database-connect + +Lorem Ipsum. Lorem Ipsum. Lorem Ipsum. #### Notes - Lorem Ipsum. - Lorem Ipsum. - Lorem Ipsum. - -#### Guidelines - -- Migrations are append-only. Once a migration is merged into the `main` branch, - do not modify it. -- Migrations in `migration/` must be idempotent, ensuring they can be run - multiple times without causing issues. -- Self-hosted Axiston users should update role passwords separately after - running all migrations. -- Production releases are done by publishing a new GitHub release from the - `main` branch. diff --git a/crates/database/build.rs b/crates/database/build.rs deleted file mode 100644 index 27f96f6..0000000 --- a/crates/database/build.rs +++ /dev/null @@ -1,16 +0,0 @@ -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // make sure flag is enabled, - // make sure make is installed, - // make sure seaorm is installed, - // run generate script with make, - - Ok(()) -} - -// sea-orm-cli generate entity -// --database-url postgresql://usr:pwd@localhost:5432/db -// --database-url public -// --date-time-crate time -// --with-serde both -// --verbose diff --git a/crates/database/config/custom_hooks.rs b/crates/database/config/custom_hooks.rs new file mode 100644 index 0000000..6f85ebf --- /dev/null +++ b/crates/database/config/custom_hooks.rs @@ -0,0 +1,55 @@ +//! Includes all callbacks and hooks for [`diesel`] and [`deadpool`]. + +use deadpool::managed::{HookResult, Metrics}; +use diesel::ConnectionResult; +use diesel_async::pooled_connection::PoolError; +use diesel_async::{AsyncConnection, AsyncPgConnection}; +use futures::future::BoxFuture; +use futures::FutureExt; + +/// Custom setup procedure used to establish a new connection. +/// +/// See [`ManagerConfig`] and [`SetupCallback`] for more details. +/// +/// [`ManagerConfig`]: diesel_async::pooled_connection::ManagerConfig +/// [`SetupCallback`]: diesel_async::pooled_connection::SetupCallback +pub fn setup_callback(addr: &str) -> BoxFuture> +where + C: AsyncConnection + 'static, +{ + tracing::trace!(target: "database", "setup_callback"); + C::establish(addr).boxed() +} + +/// Custom hook called after a new connection has been established. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn post_create(_conn: &mut AsyncPgConnection, _metrics: &Metrics) -> HookResult { + // Note: should never return an error. + tracing::trace!(target: "database", "post_create"); + Ok(()) +} + +/// Custom hook called before a connection has been recycled. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn pre_recycle(_conn: &mut AsyncPgConnection, _metrics: &Metrics) -> HookResult { + // Note: should never return an error. + tracing::trace!(target: "database", "pre_recycle"); + Ok(()) +} + +/// Custom hook called after a connection has been recycled. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn post_recycle(_conn: &mut AsyncPgConnection, _metrics: &Metrics) -> HookResult { + // Note: should never return an error. + tracing::trace!(target: "database", "post_recycle"); + Ok(()) +} diff --git a/crates/database/config/mod.rs b/crates/database/config/mod.rs new file mode 100644 index 0000000..53e571a --- /dev/null +++ b/crates/database/config/mod.rs @@ -0,0 +1,105 @@ +//! Asynchronous `postgres` connection pool and its configuration. + +mod custom_hooks; +mod pool_configs; + +use std::fmt; + +use deadpool::Runtime; +use diesel_async::pooled_connection::deadpool::{Hook, Object, Pool}; +use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig}; +use diesel_async::AsyncPgConnection; + +use crate::config::custom_hooks::{post_create, post_recycle, pre_recycle, setup_callback}; +pub use crate::config::pool_configs::DatabaseConfig; +use crate::DatabaseResult; + +/// Asynchronous `postgres` connection pool. +/// +/// - Implemented with [`diesel`] and [`deadpool`]. +/// - Includes predefined create/recycle hooks. +/// - Emits traces on lifecycle events. +/// - Uses [`DatabaseConfig`] for configuration. +#[derive(Clone)] +pub struct Database { + conn: Pool, +} + +impl Database { + /// Returns a new [`Database`] connection pool. + pub fn new(addr: A, pool_config: DatabaseConfig) -> Self + where + A: Into, + { + let mut manager_config = ManagerConfig::default(); + manager_config.custom_setup = Box::new(setup_callback); + + let conn = AsyncDieselConnectionManager::new_with_config(addr, manager_config); + let pool = Pool::builder(conn) + .max_size(pool_config.max_conn.unwrap_or(8)) + .create_timeout(pool_config.create_timeout) + .wait_timeout(pool_config.wait_timeout) + .recycle_timeout(pool_config.recycle_timeout) + .post_create(Hook::sync_fn(post_create)) + .pre_recycle(Hook::sync_fn(pre_recycle)) + .post_recycle(Hook::sync_fn(post_recycle)) + .runtime(Runtime::Tokio1); + + let pool = pool.build().expect("should not require runtime"); + Self { conn: pool } + } + + /// Returns a new [`Database`] connection pool for a single gateway. + pub fn new_single_gateway(addr: A) -> Self + where + A: Into, + { + Self::new(addr, DatabaseConfig::new_single_gateway()) + } + + /// Returns a new [`Database`] connection pool for multiple gateways. + pub fn new_multiple_gateways(addr: A) -> Self + where + A: Into, + { + Self::new(addr, DatabaseConfig::new_multiple_gateways()) + } + + /// Retrieves a connection from this pool or waits for one to become available. + pub async fn get_connection(&self) -> DatabaseResult> { + self.conn.get().await.map_err(Into::into) + } +} + +impl fmt::Debug for Database { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let status = self.conn.status(); + let is_closed = self.conn.is_closed(); + f.debug_struct("Database") + .field("size", &status.size) + .field("max_size", &status.max_size) + .field("available", &status.available) + .field("waiting", &status.waiting) + .field("is_closed", &is_closed) + .finish() + } +} + +#[cfg(test)] +mod test { + use crate::{Database, DatabaseResult}; + + #[tokio::test] + fn single_gateway() -> DatabaseResult<()> { + let addr = "postgresql://usr:pwd@localhost:5432/db"; + let _ = Database::new_single_gateway(addr); + Ok(()) + } + + #[tokio::test] + fn multiple_gateways() -> DatabaseResult<()> { + let addr = "postgresql://usr:pwd@localhost:5432/db"; + let _ = Database::new_multiple_gateways(addr); + Ok(()) + } +} diff --git a/crates/database/config/pool_configs.rs b/crates/database/config/pool_configs.rs new file mode 100644 index 0000000..c41858e --- /dev/null +++ b/crates/database/config/pool_configs.rs @@ -0,0 +1,76 @@ +//! Custom database connection pool configurations. + +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +/// Configures [`Database`] for one or more gateways. +/// +/// [`Database`]: crate::Database +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[must_use = "configs do nothing unless you use them"] +pub struct DatabaseConfig { + pub max_conn: Option, + pub create_timeout: Option, + pub wait_timeout: Option, + pub recycle_timeout: Option, +} + +impl DatabaseConfig { + /// Creates a new [`DatabaseConfig`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Overwrites the default value of [`DatabaseConfig`]`::max_conn`. + pub fn with_max_conn(mut self, max_conn: usize) -> Self { + self.max_conn = Some(max_conn); + self + } + + /// Overwrites the default value of [`DatabaseConfig`]`::create_timeout`. + pub fn with_create_timeout(mut self, create_timeout: Duration) -> Self { + self.create_timeout = Some(create_timeout); + self + } + + /// Overwrites the default value of [`DatabaseConfig`]`::wait_timeout`. + pub fn with_wait_timeout(mut self, wait_timeout: Duration) -> Self { + self.wait_timeout = Some(wait_timeout); + self + } + + /// Overwrites the default value of [`DatabaseConfig`]`::recycle_timeout`. + pub fn with_recycle_timeout(mut self, recycle_timeout: Duration) -> Self { + self.recycle_timeout = Some(recycle_timeout); + self + } + + /// Creates a new [`DatabaseConfig`] for a single gateway. + pub fn new_single_gateway() -> Self { + Self::default().with_max_conn(64) + } + + /// Creates a new [`DatabaseConfig`] for multiple gateways. + pub fn new_multiple_gateways() -> Self { + Self::default().with_max_conn(8) + } +} + +#[cfg(test)] +mod test { + use crate::{DatabaseConfig, DatabaseResult}; + + #[test] + fn single_gateway() -> DatabaseResult<()> { + let _ = DatabaseConfig::new_single_gateway(); + Ok(()) + } + + #[test] + fn multiple_gateways() -> DatabaseResult<()> { + let _ = DatabaseConfig::new_multiple_gateways(); + Ok(()) + } +} diff --git a/crates/database/connect/constraints.rs b/crates/database/connect/constraints.rs deleted file mode 100644 index be98a3f..0000000 --- a/crates/database/connect/constraints.rs +++ /dev/null @@ -1,46 +0,0 @@ -use sea_orm::SqlErr; - -/// TODO. -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] -pub enum ConstraintViolation { - #[default] - Unknown, -} - -impl ConstraintViolation { - /// Parses an error from unsuccessful SQL query. - pub fn new(error: SqlErr) -> Self { - match error { - SqlErr::UniqueConstraintViolation(constraint) => { - Self::from_unique_constraint_violation(constraint.as_str()) - } - SqlErr::ForeignKeyConstraintViolation(constraint) => { - Self::from_foreign_constraint_violation(constraint.as_str()) - } - _ => Self::Unknown, - } - } - - /// Parses a [`ConstraintViolation`] from a [`SqlErr::UniqueConstraintViolation`]. - fn from_unique_constraint_violation(constraint: &str) -> Self { - match constraint { - "" => Self::Unknown, - _ => Self::Unknown, - } - } - - /// Parses a [`ConstraintViolation`] from a [`SqlErr::ForeignKeyConstraintViolation`]. - fn from_foreign_constraint_violation(constraint: &str) -> Self { - match constraint { - "" => Self::Unknown, - _ => Self::Unknown, - } - } -} - -impl From for ConstraintViolation { - #[inline] - fn from(value: SqlErr) -> Self { - Self::new(value) - } -} diff --git a/crates/database/connect/mod.rs b/crates/database/connect/mod.rs deleted file mode 100644 index 5182a48..0000000 --- a/crates/database/connect/mod.rs +++ /dev/null @@ -1,81 +0,0 @@ -//! TODO. -//! - -mod constraints; -mod custom_hooks; -mod pool_configs; -mod with_tracing; - -use std::fmt; - -use derive_more::From; -use sea_orm::{ConnectOptions, Database, DatabaseConnection}; - -pub use crate::connect::constraints::ConstraintViolation; -pub use crate::connect::pool_configs::ConnectOptionsExt; -use crate::Result; - -/// Contains a preconfigured `Postgres` database connection pool. -#[derive(Clone, From)] -pub struct AppDatabase { - inner: DatabaseConnection, -} - -impl AppDatabase { - /// Returns a new [`AppDatabase`] connection. - #[inline] - pub fn new(inner: DatabaseConnection) -> Self { - Self { inner } - } - - /// Connects to the database and returns a new [`AppDatabase`]. - pub async fn connect>(connect_options: C) -> Result { - let conn = Database::connect(connect_options).await; - conn.map(Into::into).map_err(Into::into) - } - - /// Connects to the database configured for a single gateway. - pub async fn connect_single_instance>(addr: C) -> Result { - Self::connect(ConnectOptions::new_single_instance(addr.as_ref())).await - } - - /// Connects to the database configured for multiple gateways. - pub async fn connect_multiple_instances>(addr: C) -> Result { - Self::connect(ConnectOptions::new_multiple_instances(addr.as_ref())).await - } - - /// Returns the underlying database connection. - #[inline] - pub fn database_connection(&self) -> DatabaseConnection { - self.inner.clone() - } - - /// Returns a reference to the underlying database connection. - #[inline] - pub fn as_database_connection(&self) -> &DatabaseConnection { - &self.inner - } -} - -impl fmt::Debug for AppDatabase { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Database").finish_non_exhaustive() - } -} - -#[cfg(test)] -mod test { - use crate::Result; - - #[tokio::test] - async fn single_instance() -> Result<()> { - // TODO: run tests on AppDatabase::connect_single_instance - Ok(()) - } - - #[test] - async fn multiple_instances() -> Result<()> { - // TODO: run tests on AppDatabase::connect_multiple_instances - Ok(()) - } -} diff --git a/crates/database/connect/pool_configs.rs b/crates/database/connect/pool_configs.rs deleted file mode 100644 index 2d53764..0000000 --- a/crates/database/connect/pool_configs.rs +++ /dev/null @@ -1,60 +0,0 @@ -//! Custom configurations for [`ConnectOptions`]. -//! - -use std::time::Duration; - -use sea_orm::ConnectOptions; - -/// Extends [`ConnectOptions`] with preconfigured constructors. -pub trait ConnectOptionsExt { - /// Returns [`ConnectOptions`] suitable for a setup with a single gateway. - fn new_single_instance>(addr: C) -> ConnectOptions; - - /// Returns [`ConnectOptions`] suitable for a setup with a single gateway. - fn new_multiple_instances>(addr: C) -> ConnectOptions; -} - -impl ConnectOptionsExt for ConnectOptions { - fn new_single_instance>(addr: C) -> ConnectOptions { - let mut connection_options = ConnectOptions::new(addr); - connection_options - .idle_timeout(Duration::from_secs(8 * 60)) - .acquire_timeout(Duration::from_secs(40)) - .max_lifetime(Duration::from_secs(40 * 60)) - .min_connections(2) - .max_connections(64); - - connection_options - } - - fn new_multiple_instances>(addr: C) -> ConnectOptions { - let mut connection_options = ConnectOptions::new(addr); - connection_options - .idle_timeout(Duration::from_secs(2 * 60)) - .acquire_timeout(Duration::from_secs(40)) - .max_lifetime(Duration::from_secs(20 * 60)) - .min_connections(0) - .max_connections(32); - - connection_options - } -} - -#[cfg(test)] -mod test { - use sea_orm::ConnectOptions; - - use crate::connect::ConnectOptionsExt; - - #[test] - fn single_instance() { - let addr = "postgresql://usr:pwd@localhost:5432/db"; - let _ = ConnectOptions::new_single_instance(addr); - } - - #[test] - fn multiple_instances() { - let addr = "postgresql://usr:pwd@localhost:5432/db"; - let _ = ConnectOptions::new_multiple_instances(addr); - } -} diff --git a/crates/database/lib.rs b/crates/database/lib.rs index fac9411..22edf21 100644 --- a/crates/database/lib.rs +++ b/crates/database/lib.rs @@ -1,50 +1,69 @@ #![forbid(unsafe_code)] +#![allow(async_fn_in_trait)] #![cfg_attr(docsrs, feature(doc_cfg))] #![doc = include_str!("./README.md")] //! ### Examples //! //! ```rust,no_run -//! use axiston_database::{Result, AppDatabase}; +//! use axiston_db_connect::{DatabaseResult, Database}; //! //! #[tokio::main] -//! async fn main() -> Result<()> { +//! async fn main() -> DatabaseResult<()> { //! let addr = "postgresql://usr:pwd@localhost:5432/db"; -//! let conn = AppDatabase::connect_single_instance(addr).await; +//! let _ = Database::new_single_gateway(addr); //! Ok(()) //! } //! ``` -mod connect; -mod migrate; +use deadpool::managed::TimeoutType; +use diesel::result::{ConnectionError, Error}; +use diesel_async::pooled_connection::deadpool::PoolError; +use diesel_async::pooled_connection::PoolError as PoolError2; -use derive_more::From; -use sea_orm::DbErr as SeaError; +pub use crate::config::{Database, DatabaseConfig}; -pub use crate::connect::{AppDatabase, ConnectOptionsExt, ConstraintViolation}; -pub use crate::migrate::AppDatabaseExt; +mod config; +mod query; /// Unrecoverable failure of the [`Database`]. /// /// Includes all error types that may occur. -/// -/// [`Database`]: database::Database -#[derive(Debug, From, thiserror::Error)] -#[error("underlying sql driver failure: {inner}")] +#[derive(Debug, thiserror::Error)] #[must_use = "errors do nothing unless you use them"] -pub struct Error { - inner: SeaError, +pub enum DatabaseError { + /// [`deadpool::managed::PoolError::Timeout`]. + #[error("timeout error")] + Timeout(TimeoutType), + /// [`diesel_async::pooled_connection::PoolError::ConnectionError`] + #[error("connection error: {0}")] + Connection(ConnectionError), + /// [`diesel_async::pooled_connection::PoolError::QueryError`] + #[error("query error: {0}")] + Query(Error), } -impl Error { - /// Returns a new [`Error`]. +impl From for DatabaseError { + fn from(value: PoolError) -> Self { + match value { + PoolError::Timeout(timeout_type) => Self::Timeout(timeout_type), + PoolError::Backend(PoolError2::ConnectionError(connection_error)) => { + Self::Connection(connection_error) + } + PoolError::Backend(PoolError2::QueryError(query_error)) => Self::Query(query_error), + PoolError::PostCreateHook(_) => unreachable!(), + PoolError::NoRuntimeSpecified => unreachable!(), + PoolError::Closed => unreachable!(), + } + } +} + +impl From for DatabaseError { #[inline] - pub fn new(inner: SeaError) -> Self { - Self { inner } + fn from(value: Error) -> Self { + Self::Query(value) } } -/// Specialized [`Result`] alias for the [`Error`] type. -/// -/// [`Result`]: std::result::Result -pub type Result = std::result::Result; +/// Specialized [`Result`] alias for the [`DatabaseError`] type. +pub type DatabaseResult = Result; diff --git a/crates/database/migrate/mod.rs b/crates/database/migrate/mod.rs deleted file mode 100644 index 02f4955..0000000 --- a/crates/database/migrate/mod.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! TODO. - -use sea_orm::sea_query::{Alias, IntoIden}; -use sea_orm::DynIden; -use sea_orm_migration::{MigrationTrait, MigratorTrait}; - -use crate::{AppDatabase, Result}; - -/// TODO. -#[derive(Debug, Clone, Default)] -pub struct AppDatabaseMigrator; - -impl AppDatabaseMigrator { - /// Returns a new [`AppDatabaseMigrator`]. - #[inline] - pub fn new() -> Self { - Self::default() - } -} - -#[async_trait::async_trait] -impl MigratorTrait for AppDatabaseMigrator { - fn migrations() -> Vec> { - vec![] - } - - fn migration_table_name() -> DynIden { - Alias::new("migrations").into_iden() - } -} - -/// Extends [`AppDatabase`] with migration methods. -pub trait AppDatabaseExt { - /// Applies `steps` pending migrations. - async fn apply_migrations(&self, steps: Option) -> Result<()>; - - /// Rolls back `steps` pending migrations. - async fn rollback_migrations(&self, steps: Option) -> Result<()>; -} - -impl AppDatabaseExt for AppDatabase { - async fn apply_migrations(&self, steps: Option) -> Result<()> { - let conn = self.as_database_connection(); - AppDatabaseMigrator::up(conn, steps).await?; - Ok(()) - } - - async fn rollback_migrations(&self, steps: Option) -> Result<()> { - let conn = self.as_database_connection(); - AppDatabaseMigrator::down(conn, steps).await?; - Ok(()) - } -} - -#[cfg(test)] -mod test { - use crate::{AppDatabase, AppDatabaseExt, Result}; - - #[tokio::test] - async fn migrator() -> Result<()> { - let addr = "postgresql://usr:pwd@localhost:5432/db"; - let conn = AppDatabase::connect_single_instance(addr).await?; - let _ = conn.apply_migrations(None).await?; - Ok(()) - } -} diff --git a/crates/database/connect/custom_hooks.rs b/crates/database/query/account_actions.rs similarity index 100% rename from crates/database/connect/custom_hooks.rs rename to crates/database/query/account_actions.rs diff --git a/crates/database/connect/with_tracing.rs b/crates/database/query/account_permissions.rs similarity index 100% rename from crates/database/connect/with_tracing.rs rename to crates/database/query/account_permissions.rs diff --git a/crates/runtime/runtime/cycle_condition.rs b/crates/database/query/account_sessions.rs similarity index 100% rename from crates/runtime/runtime/cycle_condition.rs rename to crates/database/query/account_sessions.rs diff --git a/crates/database/query/accounts.rs b/crates/database/query/accounts.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/accounts.rs @@ -0,0 +1 @@ + diff --git a/crates/database/query/mod.rs b/crates/database/query/mod.rs new file mode 100644 index 0000000..0dc8e83 --- /dev/null +++ b/crates/database/query/mod.rs @@ -0,0 +1,15 @@ +//! TODO. +//! + +mod account_actions; +mod account_permissions; +mod account_sessions; +mod accounts; +mod project_invites; +mod project_members; +mod project_webhooks; +mod projects; +mod workflow_executions; +mod workflow_schedules; +mod workflow_webhooks; +mod workflows; diff --git a/crates/database/query/project_invites.rs b/crates/database/query/project_invites.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/project_invites.rs @@ -0,0 +1 @@ + diff --git a/crates/database/query/project_members.rs b/crates/database/query/project_members.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/project_members.rs @@ -0,0 +1 @@ + diff --git a/crates/database/query/project_webhooks.rs b/crates/database/query/project_webhooks.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/project_webhooks.rs @@ -0,0 +1 @@ + diff --git a/crates/database/query/projects.rs b/crates/database/query/projects.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/projects.rs @@ -0,0 +1 @@ + diff --git a/crates/database/query/workflow_executions.rs b/crates/database/query/workflow_executions.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/workflow_executions.rs @@ -0,0 +1 @@ + diff --git a/crates/database/query/workflow_schedules.rs b/crates/database/query/workflow_schedules.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/workflow_schedules.rs @@ -0,0 +1 @@ + diff --git a/crates/database/query/workflow_webhooks.rs b/crates/database/query/workflow_webhooks.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/workflow_webhooks.rs @@ -0,0 +1 @@ + diff --git a/crates/database/query/workflows.rs b/crates/database/query/workflows.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/database/query/workflows.rs @@ -0,0 +1 @@ + diff --git a/crates/graph/Cargo.toml b/crates/graph/Cargo.toml index 593fd97..7470576 100644 --- a/crates/graph/Cargo.toml +++ b/crates/graph/Cargo.toml @@ -24,18 +24,20 @@ path = "lib.rs" default = [] [dependencies] -serde = { version = "1.0", features = ["derive"] } -serde_with = { version = "3.11", features = [] } -thiserror = { version = "1.0", features = [] } -derive_more = { version = "1.0", features = ["full"] } -ts-rs = { version = "10.0", features = ["uuid-impl"] } - petgraph = { version = "0.6", features = [] } -uuid = { version = "1.11", features = ["serde", "v4"] } -cron = { version = "0.12", features = [] } -tracing = { version = "0.1", features = [] } -time = { version = "0.3", features = [] } + +tracing = { workspace = true } +thiserror = { workspace = true } + +derive_more = { workspace = true } +serde = { workspace = true } +serde_with = { workspace = true } +ts-rs = { workspace = true } + +uuid = { workspace = true } +cron = { workspace = true } +time = { workspace = true } [dev-dependencies] -tokio = { version = "1.41", features = ["rt-multi-thread", "macros"] } -serde_json = { version = "1.0", features = [] } +tokio = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/graph/lib.rs b/crates/graph/lib.rs index b3cc4b2..c18ed9b 100644 --- a/crates/graph/lib.rs +++ b/crates/graph/lib.rs @@ -23,3 +23,5 @@ pub mod worker; /// [`ReportBundle`]: crate::outputs::ReportBundle /// [`Result`]: std::result::Result pub type Result = std::result::Result; + +// TODO: check https://github.com/rbatis/fast_pool diff --git a/crates/graph/outputs/task_registry.rs b/crates/graph/outputs/task_registry.rs index cd9911f..fbcabad 100644 --- a/crates/graph/outputs/task_registry.rs +++ b/crates/graph/outputs/task_registry.rs @@ -25,14 +25,14 @@ impl TaskRegistry { TaskRegistryChunk::new([].into_iter()) } - fn find_by_tags(&self, tags: Vec<&str>) -> TaskRegistryChunk { + fn find_by_tags(&self, tags: &str) -> TaskRegistryChunk { TaskRegistryChunk::new([].into_iter()) } - pub fn find(&self) -> TaskRegistryChunk { + pub fn find(&self, query: &str) -> TaskRegistryChunk { let mut chunk = TaskRegistryChunk::default(); - chunk.merge(self.find_by_name()); - chunk.merge(self.find_by_tags()); + chunk.merge(self.find_by_name(query)); + chunk.merge(self.find_by_tags(query)); chunk } } diff --git a/crates/graph/worker/mod.rs b/crates/graph/worker/mod.rs index 5bd13c6..8564e74 100644 --- a/crates/graph/worker/mod.rs +++ b/crates/graph/worker/mod.rs @@ -10,6 +10,7 @@ use crate::worker::graph_compiler::{CompileGraph, DefaultGraphCompiler}; use crate::worker::graph_executor::{DefaultGraphExecutor, ExecuteGraph}; use crate::worker::internal_graph::{ProcessEdge, ProcessGraph, ProcessNode}; +/// TODO. #[derive(Debug)] pub struct GraphWorker { graph_compiler: C, diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 635b43d..d4c56ff 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -21,20 +21,24 @@ rustdoc-args = ["--cfg", "docsrs"] path = "lib.rs" [dependencies] -# tokio = { version = "1.39", features = [] } -thiserror = { version = "1.0", features = [] } -tracing = { version = "0.1", features = [] } -deadpool = { version = "0.12", features = ["managed"] } +thiserror = { workspace = true } +derive_more = { workspace = true } +tracing = { workspace = true } +serde = { workspace = true } tonic = { version = "0.12", features = [] } prost = { version = "0.13", features = [] } tonic-types = { version = "0.12", features = [] } prost-types = { version = "0.13", features = [] } +deadpool = { version = "0.12", features = ["managed", "rt_tokio_1"] } -tower = { version = "0.4", features = ["full"] } -tower-http = { version = "0.5", features = ["full"] } +tower = { workspace = true } +tower-http = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } [build-dependencies] -anyhow = { version = "1.0", features = ["backtrace"] } +anyhow = { workspace = true } tonic-build = { version = "0.12", features = [] } prost-build = { version = "0.13", features = [] } diff --git a/crates/runtime/_/client/client_builder.rs b/crates/runtime/_/client/client_builder.rs new file mode 100644 index 0000000..0e9f3fc --- /dev/null +++ b/crates/runtime/_/client/client_builder.rs @@ -0,0 +1,32 @@ +use crate::runtime::RuntimeManager; +use crate::{RuntimeResult, RuntimeClient}; + +/// [`RuntimeClient`] builder. +#[derive(Debug, Default)] +pub struct RuntimeClientBuilder { + inner: RuntimeManager, +} + +impl RuntimeClientBuilder { + /// Returns a new [`RuntimeClientBuilder`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Builds a new [`RuntimeClient`]. + pub fn build(self) -> RuntimeResult { + let pool = self.inner.pool(); + Ok(RuntimeClient::new(pool)) + } +} + +#[cfg(test)] +mod test { + use crate::{RuntimeResult, RuntimeClientBuilder}; + + fn instance() -> RuntimeResult<()> { + let builder = RuntimeClientBuilder::new(); + let _client = builder.build()?; + } +} diff --git a/crates/runtime/_/client/mod.rs b/crates/runtime/_/client/mod.rs new file mode 100644 index 0000000..2ce7c26 --- /dev/null +++ b/crates/runtime/_/client/mod.rs @@ -0,0 +1,48 @@ +//! TODO. +//! + +mod client_builder; +mod task_registry; + +use deadpool::managed::Pool; + +pub use crate::client::client_builder::RuntimeClientBuilder; +use crate::runtime::RuntimeManager; +use crate::RuntimeResult; + +/// TODO. +#[derive(Debug, Clone)] +pub struct RuntimeClient { + inner: Pool, +} + +impl RuntimeClient { + /// Returns a new [`RuntimeClient`]. + #[inline] + fn new(inner: Pool) -> Self { + Self { inner } + } + + /// Dynamically registers TODO. + pub fn register(&self) -> RuntimeResult<()> { + // self.inner.manager() + Ok(()) + } + + /// Returns a new [`RuntimeClientBuilder`]. + #[inline] + pub fn builder() -> RuntimeClientBuilder { + RuntimeClientBuilder::new() + } +} + +#[cfg(test)] +mod test { + use crate::{RuntimeResult, RuntimeClient}; + + #[test] + fn build_from_default() -> RuntimeResult<()> { + let _ = RuntimeClient::builder().build(); + Ok(()) + } +} diff --git a/crates/runtime/client/task_registry.rs b/crates/runtime/_/client/task_registry.rs similarity index 100% rename from crates/runtime/client/task_registry.rs rename to crates/runtime/_/client/task_registry.rs diff --git a/crates/runtime/_/runtime/connection_instance.rs b/crates/runtime/_/runtime/connection_instance.rs new file mode 100644 index 0000000..5550d0e --- /dev/null +++ b/crates/runtime/_/runtime/connection_instance.rs @@ -0,0 +1,65 @@ +use std::fmt; +use std::sync::LazyLock; +use tonic::transport::{Channel, Endpoint}; + +use crate::runtime::connection_instance::runtime_proto::runtime_client::RuntimeClient; +use crate::runtime::connection_instance::runtime_proto::HelloRequest; +use crate::RuntimeResult; + +pub mod runtime_proto { + tonic::include_proto!("runtime"); +} + +/// TODO. +#[must_use] +pub struct ConnectionInstance { + client: RuntimeClient, +} + +// TODO: Replace with `static USER_AGENT: String` once const `format!` is stable. +static USER_AGENT: LazyLock String> = LazyLock::new(format_user_agent); +fn format_user_agent() -> String { + format!( + "Axiston/{} (Rust; Ver {})", + env!("CARGO_PKG_VERSION"), + env!("CARGO_PKG_RUST_VERSION") + ) +} + +impl ConnectionInstance { + /// Returns a new [`ConnectionInstance`]. + pub async fn connect(conn: &str) -> RuntimeResult { + let endpoint = Endpoint::from_shared(conn.to_owned())?; + let endpoint = endpoint.user_agent(USER_AGENT.as_str())?; + let client = RuntimeClient::connect(endpoint).await?; + Ok(Self { client }) + } + + /// Returns the underlying (generated) runtime client. + #[inline] + pub fn as_inner(&self) -> RuntimeClient { + self.client.clone() + } + + /// TODO. + pub async fn check(&mut self) -> RuntimeResult<()> { + let x = self.client.hello(HelloRequest::default()).await; + Ok(()) + } +} + +impl fmt::Debug for ConnectionInstance { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Connection").finish_non_exhaustive() + } +} + +#[cfg(test)] +mod test { + use crate::RuntimeResult; + + #[test] + fn build_from_address() -> RuntimeResult<()> { + Ok(()) + } +} diff --git a/crates/runtime/runtime/connection_metadata.rs b/crates/runtime/_/runtime/connection_metadata.rs similarity index 100% rename from crates/runtime/runtime/connection_metadata.rs rename to crates/runtime/_/runtime/connection_metadata.rs diff --git a/crates/runtime/_/runtime/cycle_condition.rs b/crates/runtime/_/runtime/cycle_condition.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/runtime/_/runtime/cycle_condition.rs @@ -0,0 +1 @@ + diff --git a/crates/runtime/runtime/mod.rs b/crates/runtime/_/runtime/mod.rs similarity index 100% rename from crates/runtime/runtime/mod.rs rename to crates/runtime/_/runtime/mod.rs diff --git a/crates/runtime/runtime/runtime_manager.rs b/crates/runtime/_/runtime/runtime_manager.rs similarity index 83% rename from crates/runtime/runtime/runtime_manager.rs rename to crates/runtime/_/runtime/runtime_manager.rs index f380ed1..9660ca8 100644 --- a/crates/runtime/runtime/runtime_manager.rs +++ b/crates/runtime/_/runtime/runtime_manager.rs @@ -3,10 +3,13 @@ use std::fmt; use std::sync::{Arc, Mutex}; use deadpool::managed::{Manager, Metrics, Pool, RecycleResult}; - +use deadpool::Runtime; use crate::runtime::connection_metadata::RuntimeMetadata; use crate::runtime::ConnectionInstance; -use crate::Error; + +#[derive(Debug)] +pub struct Error {} + /// [`ConnectionInstance`] connection manager. #[derive(Default)] @@ -29,7 +32,8 @@ impl RuntimeManager { /// Returns a pool using the connection manager. pub fn pool(self) -> Pool { - Pool::builder(self).build().unwrap() + let pool = Pool::builder(self).runtime(Runtime::Tokio1).build(); + pool.expect("should not require runtime") } } @@ -55,10 +59,10 @@ impl fmt::Debug for RuntimeManager { #[cfg(test)] mod test { use crate::runtime::RuntimeManager; - use crate::Result; + use crate::RuntimeResult; #[test] - fn build_from_address() -> Result<()> { + fn build_from_address() -> RuntimeResult<()> { let _ = RuntimeManager::new(); Ok(()) } diff --git a/crates/runtime/client/client_builder.rs b/crates/runtime/client/client_builder.rs index 6df6b30..dbf3c5e 100644 --- a/crates/runtime/client/client_builder.rs +++ b/crates/runtime/client/client_builder.rs @@ -1,21 +1,20 @@ -use crate::runtime::RuntimeManager; -use crate::RuntimeClient; +use crate::client::RuntimeConn; -/// [`RuntimeClient`] builder. -#[derive(Debug, Default)] -pub struct ClientBuilder { - inner: RuntimeManager, -} +/// [`RuntimeConn`] builder. +#[derive(Debug, Default, Clone)] +#[must_use = "builders do nothing unless you use them"] +pub struct RuntimeConnBuilder {} -impl ClientBuilder { - /// Returns a new [`ClientBuilder`]. +impl RuntimeConnBuilder { + /// Returns a new [`RuntimeConnBuilder`]. #[inline] pub fn new() -> Self { Self::default() } - /// Builds a new [`RuntimeClient`]. - pub fn build(self) -> RuntimeClient { - RuntimeClient::new(self.inner.pool()) + /// Builds a new [`RuntimeConn`]. + pub fn build(self) -> RuntimeConn { + // TODO: Build endpoint here. + todo!() } } diff --git a/crates/runtime/client/mod.rs b/crates/runtime/client/mod.rs index b9bf2f0..c8f5e2f 100644 --- a/crates/runtime/client/mod.rs +++ b/crates/runtime/client/mod.rs @@ -1,48 +1,62 @@ -//! TODO. -//! +use std::sync::LazyLock; + +use derive_more::From; +use tonic::transport::{Channel, Endpoint}; + +pub use crate::client::client_builder::RuntimeConnBuilder; +use crate::client::runtime_proto::runtime_client::RuntimeClient; +use crate::RuntimeResult; mod client_builder; -mod task_registry; +mod recycle_method; -use deadpool::managed::Pool; +pub mod runtime_proto { + tonic::include_proto!("runtime"); +} -pub use crate::client::client_builder::ClientBuilder; -use crate::runtime::RuntimeManager; -use crate::Result; +// TODO: Replace with `static USER_AGENT: String` once const `format!` is stable. +static USER_AGENT: LazyLock String> = LazyLock::new(format_user_agent); +fn format_user_agent() -> String { + format!( + "Axiston/{} (Rust; Ver {})", + env!("CARGO_PKG_VERSION"), + env!("CARGO_PKG_RUST_VERSION") + ) +} /// TODO. +#[derive(Debug, From, thiserror::Error)] +#[must_use = "errors do nothing unless you use them"] +pub enum RuntimeConnError { + /// Transport failure (from the client or server). + #[error("transport failure: {0}")] + Transport(tonic::transport::Error), +} + #[derive(Debug, Clone)] -pub struct RuntimeClient { - inner: Pool, +#[must_use = "clients do nothing unless you use them"] +pub struct RuntimeConn { + client: RuntimeClient, } -impl RuntimeClient { - /// Returns a new [`RuntimeClient`]. +impl RuntimeConn { + /// Returns a new [`RuntimeConn`]. #[inline] - fn new(inner: Pool) -> Self { - Self { inner } + pub fn new(client: RuntimeClient) -> Self { + Self { client } } - /// Dynamically registers TODO. - pub fn register(&self) -> Result<()> { - // self.inner.manager() - Ok(()) + /// Returns a new [`RuntimeConn`]. + pub async fn connect(conn: &str) -> RuntimeResult { + let endpoint = Endpoint::from_shared(conn.to_owned())?; + let endpoint = endpoint.user_agent(USER_AGENT.as_str())?; + let client = RuntimeClient::connect(endpoint).await?; + Ok(Self { client }) } - /// Returns a new [`ClientBuilder`]. + /// Returns a new [`RuntimeConnBuilder`]. #[inline] - pub fn builder() -> ClientBuilder { - ClientBuilder::new() - } -} - -#[cfg(test)] -mod test { - use crate::{Result, RuntimeClient}; - - #[test] - fn build_from_default() -> Result<()> { - let _ = RuntimeClient::builder().build(); - Ok(()) + pub fn builder() -> RuntimeConnBuilder { + RuntimeConnBuilder::new() } } diff --git a/crates/runtime/client/recycle_method.rs b/crates/runtime/client/recycle_method.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/runtime/client/recycle_method.rs @@ -0,0 +1 @@ + diff --git a/crates/runtime/config/custom_hooks.rs b/crates/runtime/config/custom_hooks.rs new file mode 100644 index 0000000..2f49db6 --- /dev/null +++ b/crates/runtime/config/custom_hooks.rs @@ -0,0 +1,36 @@ +use deadpool::managed::{HookResult, Metrics}; + +use crate::client::{RuntimeConn, RuntimeConnError}; + +/// Custom hook called after a new connection has been established. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn post_create(_conn: &mut RuntimeConn, _metrics: &Metrics) -> HookResult { + // Note: should never return an error. + tracing::trace!(target: "runtime", "post_create"); + Ok(()) +} + +/// Custom hook called before a connection has been recycled. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn pre_recycle(_conn: &mut RuntimeConn, _metrics: &Metrics) -> HookResult { + // Note: should never return an error. + tracing::trace!(target: "runtime", "pre_recycle"); + Ok(()) +} + +/// Custom hook called after a connection has been recycled. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn post_recycle(_conn: &mut RuntimeConn, _metrics: &Metrics) -> HookResult { + // Note: should never return an error. + tracing::trace!(target: "runtime", "post_recycle"); + Ok(()) +} diff --git a/crates/runtime/config/mod.rs b/crates/runtime/config/mod.rs new file mode 100644 index 0000000..fff95f3 --- /dev/null +++ b/crates/runtime/config/mod.rs @@ -0,0 +1,50 @@ +mod custom_hooks; +mod pool_manager; +mod runtime_config; + +use deadpool::managed::{Hook, Object, Pool}; +use derive_more::{Deref, DerefMut, From}; + +use crate::config::custom_hooks::{post_create, post_recycle, pre_recycle}; +pub use crate::config::pool_manager::{RuntimeManager, RuntimeManagerConfig}; +pub use crate::config::runtime_config::RuntimeConfig; +use crate::RuntimeResult; + +/// Asynchronous `runtime` connection pool. +pub struct Runtime { + inner: Pool, +} + +/// [`RuntimeConnection`] wrapper. +/// +/// Hides connection pool manager types. +/// +/// [`RuntimeConnection`]: crate::RuntimeConn +#[derive(Debug, From, Deref, DerefMut)] +pub struct RuntimeObject { + inner: Object, +} + +impl Runtime { + /// Returns a new [`Runtime`]. + pub fn new(addr: (), pool_config: RuntimeConfig) -> Self { + let manager_config = RuntimeManagerConfig::new(); + let manager = RuntimeManager::new_with_config(addr, manager_config); + let pool = Pool::builder(manager) + .max_size(pool_config.max_conn.unwrap_or(8)) + .create_timeout(pool_config.create_timeout) + .wait_timeout(pool_config.wait_timeout) + .recycle_timeout(pool_config.recycle_timeout) + .post_create(Hook::sync_fn(post_create)) + .pre_recycle(Hook::sync_fn(pre_recycle)) + .post_recycle(Hook::sync_fn(post_recycle)) + .runtime(deadpool::Runtime::Tokio1); + + let pool = pool.build().expect("should not require runtime"); + Self { inner: pool } + } + + pub async fn get_connection(&self) -> RuntimeResult { + self.inner.get().await.map(Into::into).map_err(Into::into) + } +} diff --git a/crates/runtime/config/pool_manager.rs b/crates/runtime/config/pool_manager.rs new file mode 100644 index 0000000..9e56cf2 --- /dev/null +++ b/crates/runtime/config/pool_manager.rs @@ -0,0 +1,65 @@ +use deadpool::managed::{Manager, Metrics, RecycleResult}; +use serde::{Deserialize, Serialize}; + +use crate::{RuntimeConn, RuntimeConnBuilder, RuntimeConnError}; + +#[derive(Debug)] +pub struct RuntimeManager { + addr: (), +} + +impl RuntimeManager { + /// Returns a new [`RuntimeManager`]. + #[inline] + pub fn new(addr: ()) -> Self { + Self::new_with_config(addr, RuntimeManagerConfig::new()) + } + + /// Returns a new [`RuntimeManager`] with a custom configuration. + pub fn new_with_config(addr: (), config: RuntimeManagerConfig) -> Self { + todo!() + } +} + +impl Manager for RuntimeManager { + type Type = RuntimeConn; + type Error = RuntimeConnError; + + async fn create(&self) -> Result { + let builder = RuntimeConnBuilder::new(); + todo!() + } + + async fn recycle( + &self, + conn: &mut Self::Type, + metrics: &Metrics, + ) -> RecycleResult { + todo!() + } +} + +/// Configures `RuntimeManager` for one or more runtimes. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[must_use = "configs do nothing unless you use them"] +pub struct RuntimeManagerConfig {} + +impl RuntimeManagerConfig { + /// Returns a new [`RuntimeManagerConfig`]. + #[inline] + pub fn new() -> Self { + Self::default() + } +} + +#[cfg(test)] +mod test { + use crate::config::RuntimeManagerConfig; + use crate::RuntimeResult; + + #[test] + fn default_settings() -> RuntimeResult<()> { + let _ = RuntimeManagerConfig::new(); + Ok(()) + } +} diff --git a/crates/runtime/config/runtime_config.rs b/crates/runtime/config/runtime_config.rs new file mode 100644 index 0000000..091fb55 --- /dev/null +++ b/crates/runtime/config/runtime_config.rs @@ -0,0 +1,58 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +/// Configures [`Runtime`] for one or more runtimes. +/// +/// [`Runtime`]: crate::Runtime +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[must_use = "configs do nothing unless you use them"] +pub struct RuntimeConfig { + pub max_conn: Option, + pub create_timeout: Option, + pub wait_timeout: Option, + pub recycle_timeout: Option, +} + +impl RuntimeConfig { + /// Creates a new [`RuntimeConfig`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Overwrites the default value of [`RuntimeConfig`]`::max_conn`. + pub fn with_max_conn(mut self, max_conn: usize) -> Self { + self.max_conn = Some(max_conn); + self + } + + /// Overwrites the default value of [`RuntimeConfig`]`::create_timeout`. + pub fn with_create_timeout(mut self, create_timeout: Duration) -> Self { + self.create_timeout = Some(create_timeout); + self + } + + /// Overwrites the default value of [`RuntimeConfig`]`::wait_timeout`. + pub fn with_wait_timeout(mut self, wait_timeout: Duration) -> Self { + self.wait_timeout = Some(wait_timeout); + self + } + + /// Overwrites the default value of [`RuntimeConfig`]`::recycle_timeout`. + pub fn with_recycle_timeout(mut self, recycle_timeout: Duration) -> Self { + self.recycle_timeout = Some(recycle_timeout); + self + } +} + +#[cfg(test)] +mod test { + use crate::{RuntimeConfig, RuntimeResult}; + + #[test] + fn default_settings() -> RuntimeResult<()> { + let _ = RuntimeConfig::new(); + Ok(()) + } +} diff --git a/crates/runtime/lib.rs b/crates/runtime/lib.rs index 733e00c..55a4e35 100644 --- a/crates/runtime/lib.rs +++ b/crates/runtime/lib.rs @@ -5,31 +5,50 @@ //! ### Examples //! //! ```rust -//! use axiston_runtime::{RuntimeClient, Result}; +//! use axiston_runtime::{Runtime, RuntimeConfig, RuntimeResult}; //! -//! fn main() -> Result<()> { -//! let client = RuntimeClient::builder().build(); +//! #[tokio::main] +//! async fn main() -> RuntimeResult<()> { +//! let config = RuntimeConfig::new(); +//! let runtime = Runtime::new((), config); +//! let _conn = runtime.get_connection().await?; //! Ok(()) //! } //! ``` -pub use crate::client::{ClientBuilder, RuntimeClient}; +use deadpool::managed::PoolError; +use derive_more::From; + +use crate::client::{RuntimeConn, RuntimeConnBuilder, RuntimeConnError}; +pub use crate::config::{Runtime, RuntimeConfig, RuntimeObject}; mod client; -mod runtime; +mod config; -/// Unrecoverable failure of the [`RuntimeClient`]. +/// Unrecoverable failure of the [`RuntimeConn`]. /// /// Includes all error types that may occur. +#[derive(Debug, From, thiserror::Error)] #[must_use = "errors do nothing unless you use them"] -#[derive(Debug, thiserror::Error)] -pub enum Error {} +pub enum RuntimeError { + /// Transport failure (from the client or server). + #[error("transport failure: {0}")] + Transport(tonic::transport::Error), +} -/// Specialized [`Result`] alias for the [`Error`] type. -/// -/// [`Result`]: std::result::Result -pub type Result = std::result::Result; +impl From> for RuntimeError { + fn from(value: PoolError) -> Self { + todo!() + } +} + +/// Specialized [`Result`] alias for the [`RuntimeError`] type. +pub type RuntimeResult = Result; // TODO: Trait to implement adding new runtimes and picking the right one for the user: // Use the first available one on local. // Use the dedicated one on remote. + +// TODO: best practices or whatever +// https://github.com/weiznich/diesel_async/blob/main/src/pooled_connection/deadpool.rs +// https://github.com/bikeshedder/deadpool/blob/master/diesel/src/manager.rs diff --git a/crates/runtime/protobuf/runtime.proto b/crates/runtime/protobuf/runtime.proto deleted file mode 100644 index ea48f26..0000000 --- a/crates/runtime/protobuf/runtime.proto +++ /dev/null @@ -1,10 +0,0 @@ -syntax = "proto3"; - -package runtime; - -message HelloRequest {} -message HelloResponse {} - -service Runtime { - rpc Hello(HelloRequest) returns (HelloResponse); -} diff --git a/crates/runtime/runtime/connection_instance.rs b/crates/runtime/runtime/connection_instance.rs deleted file mode 100644 index a756989..0000000 --- a/crates/runtime/runtime/connection_instance.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::fmt; - -use tonic::transport::{Channel, Endpoint}; - -use crate::runtime::connection_instance::runtime_proto::runtime_client::RuntimeClient; -use crate::runtime::connection_instance::runtime_proto::HelloRequest; -use crate::Result; - -pub mod runtime_proto { - tonic::include_proto!("runtime"); -} - -/// TODO. -#[must_use] -pub struct ConnectionInstance { - client: RuntimeClient, -} - -impl ConnectionInstance { - /// Returns a new [`ConnectionInstance`]. - pub async fn connect(conn: &str) -> Result { - let endpoint = Endpoint::from_shared(conn.to_owned()).unwrap(); - let endpoint = endpoint.user_agent("runtime").unwrap(); - - let client = RuntimeClient::connect(endpoint).await.unwrap(); - - Ok(Self { client }) - } - - /// TODO. - pub async fn check(&mut self) -> Result<()> { - let x = self.client.hello(HelloRequest::default()).await; - Ok(()) - } -} - -impl fmt::Debug for ConnectionInstance { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - todo!() - } -} - -#[cfg(test)] -mod test { - use crate::Result; - - #[test] - fn build_from_address() -> Result<()> { - Ok(()) - } -} diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 1085235..825a77d 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -33,23 +33,22 @@ support-oauth2 = [] support-invite = [] [dependencies] -axiston-database = { workspace = true } +axiston-db-connect = { workspace = true } +axiston-db-migrate = { workspace = true } + axiston-graph = { workspace = true } axiston-runtime = { workspace = true } tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "signal"] } futures = { version = "0.3", features = [] } anyhow = { version = "1.0", features = ["backtrace"] } +tracing = { version = "0.1", features = [] } + serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = [] } derive_more = { version = "1.0", features = ["full"] } ts-rs = { version = "10.0", features = [] } -tracing = { version = "0.1", features = [] } -tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] } -tracing-opentelemetry = { version = "0.27.0", features = [] } -opentelemetry = { version = "0.26.0", features = [] } - axum = { version = "0.7", features = ["http2", "macros", "ws"] } axum-server = { version = "0.7", optional = true, features = ["tls-rustls"] } axum-extra = { version = "0.9", features = ["typed-header"] } @@ -62,11 +61,11 @@ validator = { version = "0.18", features = [] } rand = { version = "0.8.5", features = [] } argon2 = { version = "0.5.3", features = ["std"] } -# bytes = { version = "1.8", features = ["serde"] } +bytes = { version = "1.8", features = ["serde"] } time = { version = "0.3", features = ["serde"] } uuid = { version = "1.11", features = ["serde", "v4"] } base64 = { version = "0.22", features = [] } -# ecow = { version = "0.2.3", features = ["serde"] } +ecow = { version = "0.2.3", features = ["serde"] } [dev-dependencies] axum-test = { version = "16.3", features = [] } diff --git a/crates/server/extract/auth_state.rs b/crates/server/extract/auth_state.rs index 1135c8c..2ce8364 100644 --- a/crates/server/extract/auth_state.rs +++ b/crates/server/extract/auth_state.rs @@ -62,8 +62,8 @@ where return Ok(auth_state.clone()); }; - let app_database = AppDatabase::from_ref(state); let auth_token = AuthToken::from_request_parts(parts, state).await?; + let app_database = AppDatabase::from_ref(state); #[derive(Debug, Clone, Copy)] pub enum AuthReason { @@ -85,13 +85,13 @@ where } } -/// +/// TODO. #[must_use] #[derive(Debug, Clone, Copy)] pub enum AuthRole { /// The [`AuthToken`] belongs to a regular user. Unprivileged, - /// The [`AuthToken`] belongs to a gateway admin. + /// The [`AuthToken`] belongs to an admin. Privileged, } diff --git a/crates/server/extract/auth_token.rs b/crates/server/extract/auth_token.rs index ba3bed0..7fefc6a 100644 --- a/crates/server/extract/auth_token.rs +++ b/crates/server/extract/auth_token.rs @@ -5,6 +5,7 @@ use axum_extra::headers::authorization::{Authorization, Bearer}; use axum_extra::typed_header::{TypedHeader, TypedHeaderRejectionReason}; use base64::Engine; use serde::{Deserialize, Serialize}; +use time::ext::NumericalDuration; use time::{Duration, OffsetDateTime}; use uuid::Uuid; @@ -34,7 +35,9 @@ use crate::handler::{Error, ErrorKind, Result}; /// use axiston_server::extract::AuthToken; /// /// async fn write_auth_token() -> AuthToken { -/// AuthToken::new(Uuid::new_v4(), Uuid::new_v4(), 7.days()) +/// AuthToken::new(Uuid::new_v4(), Uuid::new_v4()) +/// .with_region_id("0A") +/// .with_expires_in(7.days()) /// } /// ``` /// @@ -42,29 +45,46 @@ use crate::handler::{Error, ErrorKind, Result}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AuthToken { - // TODO: Attach region identifier. - // TODO: Encode iat/eat as timestamps. #[serde(rename = "pid")] pub account_id: Uuid, #[serde(rename = "seq")] pub token_seq: Uuid, + #[serde(rename = "rid")] + pub region_id: Option, #[serde(rename = "iat")] + #[serde(with = "time::serde::rfc3339")] pub issued_at: OffsetDateTime, #[serde(rename = "eat")] + #[serde(with = "time::serde::rfc3339")] pub expired_at: OffsetDateTime, } impl AuthToken { /// Returns a new [`AuthToken`]. - pub fn new(account_id: Uuid, token_seq: Uuid, expires_in: Duration) -> Self { + pub fn new(account_id: Uuid, token_seq: Uuid) -> Self { Self { account_id, token_seq, + region_id: None, issued_at: OffsetDateTime::now_utc(), - expired_at: OffsetDateTime::now_utc() + expires_in.abs(), + expired_at: OffsetDateTime::now_utc() + 7.days(), } } + /// Overwrites the region identifier. + #[inline] + pub fn with_region_id(mut self, region_id: &str) -> Self { + self.region_id = Some(region_id.to_owned()); + self + } + + /// Overwrites the auth token valid duration. + #[inline] + pub fn with_expires_in(mut self, expires_in: Duration) -> Self { + self.expired_at = self.issued_at + expires_in.abs(); + self + } + /// Returns the duration the token is valid for. #[inline] pub fn expires_in(&self) -> Duration { diff --git a/crates/server/service/conn_info.rs b/crates/server/extract/conn_info.rs similarity index 93% rename from crates/server/service/conn_info.rs rename to crates/server/extract/conn_info.rs index 33bc8d7..8ecf2cd 100644 --- a/crates/server/service/conn_info.rs +++ b/crates/server/extract/conn_info.rs @@ -3,8 +3,6 @@ use std::net::SocketAddr; use axum::extract::connect_info::Connected; use axum::serve::IncomingStream; -// TODO: Move into ./server folder - /// Produces information about the connection. #[derive(Debug, Clone)] #[must_use] @@ -32,8 +30,8 @@ mod test { use axum::routing::{any, Router}; use axum_test::TestServer; + use crate::extract::AppConnectInfo; use crate::handler::Result; - use crate::service::AppConnectInfo; async fn handler(ConnectInfo(_): ConnectInfo) -> Result<()> { Ok(()) diff --git a/crates/server/extract/mod.rs b/crates/server/extract/mod.rs index 855660d..45a24c6 100644 --- a/crates/server/extract/mod.rs +++ b/crates/server/extract/mod.rs @@ -4,6 +4,7 @@ pub use crate::extract::auth_state::{AuthRole, AuthState}; pub use crate::extract::auth_token::AuthToken; +pub use crate::extract::conn_info::AppConnectInfo; pub use crate::extract::reject_json::Json; pub use crate::extract::reject_path::Path; pub use crate::extract::validate::Validated; @@ -17,6 +18,7 @@ pub type ValidatedPath = Validated>; mod auth_state; mod auth_token; +mod conn_info; mod reject_json; mod reject_path; mod validate; diff --git a/crates/server/handler/accounts/accounts.rs b/crates/server/handler/accounts/accounts.rs index 70b8383..4970a9a 100644 --- a/crates/server/handler/accounts/accounts.rs +++ b/crates/server/handler/accounts/accounts.rs @@ -1,4 +1,4 @@ -use axiston_database::AppDatabase; +use axiston_database_connect::Database; use axum::extract::State; use axum::http::StatusCode; use axum::middleware::from_fn_with_state; @@ -41,7 +41,7 @@ struct RetrieveAccountResponse { async fn retrieve_account( authentication: AuthState, params: Option>, - State(database): State, + State(database): State, ) -> Result<(StatusCode, Json)> { let account_id = params .map(|params| params.account) @@ -103,7 +103,7 @@ struct UpdateAccountResponse { async fn update_account( authentication: AuthState, params: Option>, - State(database): State, + State(database): State, Json(request): Json, ) -> Result<(StatusCode, Json)> { let account_id = params @@ -150,7 +150,7 @@ struct DeleteAccountResponse { async fn delete_account( authentication: AuthState, params: Option>, - State(database): State, + State(database): State, ) -> Result<(StatusCode, Json)> { let account_id = params .map(|params| params.account) diff --git a/crates/server/handler/accounts/auth.rs b/crates/server/handler/accounts/auth.rs index d76d23e..0ceba57 100644 --- a/crates/server/handler/accounts/auth.rs +++ b/crates/server/handler/accounts/auth.rs @@ -1,4 +1,4 @@ -use axiston_database::AppDatabase; +use axiston_database_connect::Database; use axum::extract::State; use axum::http::StatusCode; use axum::middleware::from_fn_with_state; @@ -9,7 +9,7 @@ use ts_rs::TS; use crate::extract::{AuthToken, Json}; use crate::handler::Result; use crate::middleware::authentication_guard; -use crate::service::{AppHashing, AppState}; +use crate::service::{AppState, Argon2Hasher}; /// See [`sign_up`]. #[must_use] @@ -33,8 +33,8 @@ struct SignUpResponse { #[tracing::instrument] async fn sign_up( - State(database): State, - State(hashing): State, + State(database): State, + State(hashing): State, Json(request): Json, ) -> Result<(StatusCode, Json)> { todo!() @@ -53,8 +53,8 @@ struct SignInRequest { /// Can be used as a sign-in method. #[tracing::instrument] async fn sign_in( - State(database): State, - State(hashing): State, + State(database): State, + State(hashing): State, Json(request): Json, ) -> Result<(StatusCode, AuthToken)> { todo!() @@ -82,7 +82,7 @@ struct SignOutResponse { /// Can be used as a sign-out. #[tracing::instrument] async fn sign_out( - State(database): State, + State(database): State, Json(request): Json, ) -> Result<(StatusCode, Json)> { todo!() diff --git a/crates/server/handler/projects/projects.rs b/crates/server/handler/projects/projects.rs index a8eb506..6d34d7d 100644 --- a/crates/server/handler/projects/projects.rs +++ b/crates/server/handler/projects/projects.rs @@ -1,4 +1,4 @@ -use axiston_database::AppDatabase; +use axiston_database_connect::Database; use axum::extract::State; use axum::http::StatusCode; use axum::routing::{delete, get, patch, post, Router}; @@ -39,7 +39,7 @@ struct CreateProjectResponse { #[tracing::instrument] async fn create_new_project( authentication: AuthState, - State(database): State, + State(database): State, Json(request): Json, ) -> Result<(StatusCode, Json)> { let response = CreateProjectResponse { @@ -106,7 +106,7 @@ struct ListProjectsResponse { #[tracing::instrument] async fn list_all_projects( authentication: AuthState, - State(database): State, + State(database): State, Path(params): Path, request: Option>, ) -> Result<(StatusCode, Json)> { @@ -134,7 +134,7 @@ struct RetrieveProjectResponse { #[tracing::instrument] async fn retrieve_project_details( authentication: AuthState, - State(database): State, + State(database): State, Path(params): Path, ) -> Result<(StatusCode, Json)> { let response = RetrieveProjectResponse { @@ -174,7 +174,7 @@ struct ModifyProjectResponse { #[tracing::instrument] async fn modify_project( authentication: AuthState, - State(database): State, + State(database): State, Path(params): Path, Json(request): Json, ) -> Result<(StatusCode, Json)> { @@ -198,7 +198,7 @@ struct DeleteProjectResponse { #[tracing::instrument] async fn delete_project( authentication: AuthState, - State(database): State, + State(database): State, Path(params): Path, ) -> Result<(StatusCode, Json)> { let response = DeleteProjectResponse { diff --git a/crates/server/middleware/error_handling.rs b/crates/server/middleware/error_handling.rs index 620271f..881b105 100644 --- a/crates/server/middleware/error_handling.rs +++ b/crates/server/middleware/error_handling.rs @@ -40,14 +40,22 @@ pub fn catch_panic(err: Panic) -> Response { ErrorKind::InternalServerError.into_response() } -pub fn setup_error_handling(router: Router, timeout: Duration) -> Router +/// Extension trait for `axum::`[`Router`] for error handling. +pub trait RouterHandlingExt { + /// Stacks [`HandleError`], [`CatchPanic`] and [`Timeout`] layers. + fn with_inner_error_handling_layer(self, timeout: Duration) -> Self; +} + +impl RouterHandlingExt for Router where S: Clone + Send + Sync + 'static, { - let middlewares = ServiceBuilder::new() - .layer(HandleErrorLayer::new(handle_error)) - .layer(CatchPanicLayer::custom(catch_panic)) - .layer(TimeoutLayer::new(timeout)); + fn with_inner_error_handling_layer(self, timeout: Duration) -> Self { + let middlewares = ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_error)) + .layer(CatchPanicLayer::custom(catch_panic)) + .layer(TimeoutLayer::new(timeout)); - router.layer(middlewares) + self.layer(middlewares) + } } diff --git a/crates/server/middleware/mod.rs b/crates/server/middleware/mod.rs index d7ec2a5..35d1023 100644 --- a/crates/server/middleware/mod.rs +++ b/crates/server/middleware/mod.rs @@ -7,9 +7,8 @@ use axum::Router; use tower::ServiceBuilder; pub use crate::middleware::auth_guards::{authentication_guard, authorization_guard}; -use crate::middleware::error_handling::setup_error_handling; -pub use crate::middleware::observability::initialize_tracing; -use crate::middleware::observability::setup_observability; +use crate::middleware::error_handling::RouterHandlingExt; +use crate::middleware::observability::RouterTracingExt; mod auth_guards; mod error_handling; @@ -23,14 +22,14 @@ impl ServiceBuilderExt for ServiceBuilder {} /// Extension trait for `axum::`[`Router`] for layering middleware. pub trait RouterExt { - /// Stacks [`HandleError`], [`CatchPanic`] and [`Timeout`] layers. + /// Layers [`HandleError`], [`CatchPanic`] and [`Timeout`] middlewares. /// /// [`HandleError`]: axum::error_handling::HandleErrorLayer /// [`CatchPanic`]: tower_http::catch_panic::CatchPanicLayer /// [`Timeout`]: tower::timeout::TimeoutLayer fn with_error_handling_layer(self, timeout: Duration) -> Self; - /// Stacks [`SetRequestId`], [`Trace`] and [`PropagateRequestId`] layers. + /// Layers [`SetRequestId`], [`Trace`] and [`PropagateRequestId`] middlewares. /// /// [`SetRequestId`]: tower_http::request_id::SetRequestIdLayer /// [`Trace`]: tower_http::trace::TraceLayer @@ -44,11 +43,11 @@ where { #[inline] fn with_error_handling_layer(self, timeout: Duration) -> Self { - setup_error_handling(self, timeout) + self.with_inner_error_handling_layer(timeout) } #[inline] fn with_observability_layer(self) -> Self { - setup_observability(self) + self.with_inner_observability_layer() } } diff --git a/crates/server/middleware/observability.rs b/crates/server/middleware/observability.rs index 6687bd9..ef297ba 100644 --- a/crates/server/middleware/observability.rs +++ b/crates/server/middleware/observability.rs @@ -4,55 +4,25 @@ use tower_http::request_id::MakeRequestUuid; use tower_http::trace::TraceLayer; use tower_http::ServiceBuilderExt; -#[must_use] -fn build_env_filter() -> tracing_subscriber::EnvFilter { - let current = std::env::var("RUST_LOG") - .or_else(|_| std::env::var("OTEL_LOG_LEVEL")) - .unwrap_or_else(|_| "info".to_string()); - - let env = format!("{},server=trace,otel=debug,tower_http=debug", current); - std::env::set_var("RUST_LOG", env); - tracing_subscriber::EnvFilter::from_default_env() +/// Extension trait for `axum::`[`Router`] for improved observability. +pub trait RouterTracingExt { + /// Stacks [`SetRequestId`], [`Trace`] and [`PropagateRequestId`] layers. + /// + /// [`SetRequestId`]: tower_http::request_id::SetRequestIdLayer + /// [`PropagateRequestId`]: tower_http::request_id::PropagateRequestIdLayer + fn with_inner_observability_layer(self) -> Self; } -pub async fn initialize_tracing() -> anyhow::Result<()> { - use tracing_subscriber::fmt::layer; - use tracing_subscriber::layer::SubscriberExt; - use tracing_subscriber::util::SubscriberInitExt; - - // Setups a temporary subscriber to log output during setup. - let env_filter = build_env_filter(); - let fmt_layer = layer().pretty(); - let subscriber = tracing_subscriber::registry() - .with(env_filter) - .with(fmt_layer); - - let _guard = tracing::subscriber::set_default(subscriber); - tracing::trace!(target: "server:otel", "initialized temporary subscriber"); - - // TODO: Enable OpenTelemetry. - // https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk - - // Setups an actual subscriber. - let env_filter = build_env_filter(); - let fmt_layer = layer().pretty(); - tracing_subscriber::registry() - .with(env_filter) - .with(fmt_layer) - .init(); - - tracing::trace!(target: "server:otel", "initialized subscriber"); - Ok(()) -} - -pub fn setup_observability(router: Router) -> Router +impl RouterTracingExt for Router where S: Clone + Send + Sync + 'static, { - let middlewares = ServiceBuilder::new() - .set_x_request_id(MakeRequestUuid) - .layer(TraceLayer::new_for_http()) - .propagate_x_request_id(); - - router.layer(middlewares) + fn with_inner_observability_layer(self) -> Self { + let middlewares = ServiceBuilder::new() + .set_x_request_id(MakeRequestUuid) + .layer(TraceLayer::new_for_http()) + .propagate_x_request_id(); + + self.layer(middlewares) + } } diff --git a/crates/server/service/app_config.rs b/crates/server/service/app_config.rs index fe807d9..9f99102 100644 --- a/crates/server/service/app_config.rs +++ b/crates/server/service/app_config.rs @@ -5,8 +5,9 @@ #[must_use = "configs do nothing unless you use them"] pub struct AppConfig { /// TODO: Store database conn string as a URL. - pub database_conn: String, - pub multiple_gateways: bool, + pub database: String, + /// Enables multiple gateway instances. + pub multiple: bool, } impl AppConfig { @@ -34,7 +35,8 @@ impl Default for AppConfig { #[derive(Debug, Default, Clone)] #[must_use = "configs do nothing unless you use them"] pub struct AppBuilder { - pub database_conn: Option, + pub database: Option, + pub multiple: Option, } impl AppBuilder { @@ -46,10 +48,10 @@ impl AppBuilder { /// Returns a new [`AppConfig`]. pub fn build(self) -> AppConfig { - let default_database = "postgresql://usr:pwd@localhost:5432/db".to_owned(); + let default_database = "postgresql://usr:pwd@localhost:5432/db"; AppConfig { - database_conn: self.database_conn.unwrap_or(default_database), - multiple_gateways: false, + database: self.database.unwrap_or(default_database.to_owned()), + multiple: self.multiple.unwrap_or_default(), } } } diff --git a/crates/server/service/app_hashing.rs b/crates/server/service/app_hashing.rs deleted file mode 100644 index 99a6c0c..0000000 --- a/crates/server/service/app_hashing.rs +++ /dev/null @@ -1,52 +0,0 @@ -use argon2::password_hash::SaltString; -use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier}; -use rand::rngs::OsRng; - -use crate::handler::{ErrorKind, Result}; - -/// TODO. -#[derive(Debug, Clone)] -pub struct AppHashing { - argon2_instance: Argon2<'static>, -} - -impl AppHashing { - /// Returns a new [`AppHashing`]. - #[inline] - pub fn new() -> Self { - Self { - argon2_instance: Argon2::default(), - } - } - - /// TODO. - pub fn hash_password(&self, password: &str) -> Result { - let salt = SaltString::generate(&mut OsRng); - match self - .argon2_instance - .hash_password(password.as_bytes(), &salt) - { - Ok(password_hash) => Ok(password_hash.to_string()), - Err(_) => Err(ErrorKind::InternalServerError.into()), - } - } - - /// TODO. - pub fn verify_password(&self, password: &str, password_hash: &str) -> Result<()> { - let password_buf = password.as_bytes(); - let Ok(parsed_hash) = PasswordHash::new(&password_hash) else { - return Err(ErrorKind::InternalServerError.into()); - }; - - if self - .argon2_instance - .verify_password(password_buf, &parsed_hash) - .is_err() - { - // TODO: BadPassword error kind. - return Err(ErrorKind::Unauthorized.into()); - }; - - Ok(()) - } -} diff --git a/crates/server/service/argon2_hasher.rs b/crates/server/service/argon2_hasher.rs new file mode 100644 index 0000000..a261379 --- /dev/null +++ b/crates/server/service/argon2_hasher.rs @@ -0,0 +1,66 @@ +use argon2::password_hash::{Error, SaltString}; +use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier}; +use derive_more::From; +use rand::rngs::OsRng; + +use crate::handler::{ErrorKind, Result}; + +/// Implements an `argon2` password hashing function. +#[derive(Debug, Default, Clone, From)] +pub struct Argon2Hasher { + inner: Argon2<'static>, +} + +impl Argon2Hasher { + /// Returns a new [`Argon2Hasher`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Returns the password hash of the provided password. + pub fn hash_password(&self, password: &str) -> Result { + let salt = SaltString::generate(&mut OsRng); + match self.inner.hash_password(password.as_bytes(), &salt) { + Ok(password_hash) => Ok(password_hash.to_string()), + Err(_) => Err(ErrorKind::InternalServerError.into()), + } + } + + /// Returns `Ok()` if the password was successfully verified. + pub fn verify_password(&self, password: &str, password_hash: &str) -> Result<()> { + let Ok(parsed_hash) = PasswordHash::new(&password_hash) else { + return Err(ErrorKind::InternalServerError.into()); + }; + + let password_buf = password.as_bytes(); + match self.inner.verify_password(password_buf, &parsed_hash) { + Ok(_) => Ok(()), + Err(Error::Password) => Err(ErrorKind::Unauthorized.into()), + Err(_) => Err(ErrorKind::InternalServerError.into()), + } + } +} + +#[cfg(test)] +mod test { + use crate::handler::Result; + use crate::service::Argon2Hasher; + + const PASSWORD: &str = "qwerty12345"; + + #[test] + fn hash_password() -> Result<()> { + let hasher = Argon2Hasher::new(); + let _ = hasher.hash_password(PASSWORD)?; + Ok(()) + } + + #[test] + fn verify_password() -> Result<()> { + let hasher = Argon2Hasher::new(); + let hash = hasher.hash_password(PASSWORD)?; + hasher.verify_password(PASSWORD, &hash)?; + Ok(()) + } +} diff --git a/crates/server/service/database_err.rs b/crates/server/service/database_err.rs deleted file mode 100644 index dbe5bae..0000000 --- a/crates/server/service/database_err.rs +++ /dev/null @@ -1,11 +0,0 @@ -use axiston_database::Error as DbError; - -use crate::handler::Error; - -// TODO: Split into handlers. -impl From for Error { - fn from(value: DbError) -> Self { - // let constraint = value.constraint(); - todo!() - } -} diff --git a/crates/server/service/graph_queue.rs b/crates/server/service/graph_queue.rs new file mode 100644 index 0000000..14ff536 --- /dev/null +++ b/crates/server/service/graph_queue.rs @@ -0,0 +1,51 @@ +use std::collections::{HashMap, VecDeque}; + +use axiston_graph::worker::GraphWorker; +use uuid::Uuid; + +/// TODO. +#[derive(Debug)] +struct UserGraphData { + graph_id: Uuid, + graph_data: (), +} + +/// TODO. +#[derive(Debug)] +struct UserGraphWorker { + user_graph: UserGraphData, + graph_worker: GraphWorker, +} + +/// TODO. +#[derive(Debug, Default)] +pub struct GraphQueue { + waiting: VecDeque, + working: HashMap, +} + +impl GraphQueue { + /// Creates a new [`GraphQueue`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + // TODO. + // pub fn is_active(&self, graph_id: Uuid) -> bool { + // self.working.contains_key(&graph_id) + // || self.waiting.iter().any(|data| data.graph_id == graph_id) + // } + + /// Returns `true` if the [`GraphQueue`] is empty. + #[inline] + pub fn is_empty(&self) -> bool { + self.waiting.is_empty() + } + + /// Returns `true` if the [`GraphQueue`] is full. + #[inline] + pub fn is_full(&self) -> bool { + self.waiting.capacity() == self.waiting.len() + } +} diff --git a/crates/server/service/mod.rs b/crates/server/service/mod.rs index 96d56b1..d3d9682 100644 --- a/crates/server/service/mod.rs +++ b/crates/server/service/mod.rs @@ -1,20 +1,20 @@ //! TODO. //! -pub use axiston_database::AppDatabase; -use axiston_database::AppDatabaseExt; +pub use axiston_db_connect::Database; +use axiston_db_migrate::DatabaseMigrator; pub use axiston_graph::worker::GraphWorker; -pub use axiston_runtime::RuntimeClient; +pub use axiston_runtime::RuntimeConn; pub use crate::service::app_config::{AppBuilder, AppConfig}; -pub use crate::service::app_hashing::AppHashing; -pub use crate::service::conn_info::AppConnectInfo; +pub use crate::service::argon2_hasher::Argon2Hasher; +pub use crate::service::scheduler::{SchedulerError, SchedulerRuntime}; pub use crate::service::socket_room::{WebsocketRoom, WebsocketServer}; mod app_config; -mod app_hashing; -mod conn_info; -mod database_err; +mod argon2_hasher; +mod graph_queue; +mod scheduler; mod socket_room; /// Application state. @@ -25,9 +25,9 @@ mod socket_room; #[must_use = "state does nothing unless you use it"] #[derive(Debug, Clone)] pub struct AppState { - app_database: AppDatabase, - app_hashing: AppHashing, - client_runtime: RuntimeClient, + database: Database, + hasher: Argon2Hasher, + client_runtime: RuntimeConn, websocket_room: WebsocketServer, } @@ -35,27 +35,45 @@ impl AppState { /// Returns a new [`AppState`]. #[inline] pub async fn connect(app_config: AppConfig) -> anyhow::Result { - // TODO: Load all tasks with checked triggers. - let app_database = if app_config.multiple_gateways { - AppDatabase::connect_multiple_instances(&app_config.database_conn).await? + let database = Self::connect_database(&app_config).await?; + let runtime = Self::connect_runtime(&app_config).await?; + + Ok(Self { + database, + hasher: Argon2Hasher::new(), + client_runtime: runtime, + websocket_room: WebsocketServer::new(), + }) + } + + async fn connect_database(app_config: &AppConfig) -> anyhow::Result { + let database = if app_config.multiple { + Database::new_multiple_gateways(&app_config.database) } else { - AppDatabase::connect_single_instance(&app_config.database_conn).await? + Database::new_single_gateway(&app_config.database) }; - if let Err(migration_err) = app_database.apply_migrations(None).await { - let _ = app_database.rollback_migrations(None).await; - return Err(migration_err.into()); + let _ = { + let connection = database.get_connection().await?; + let mut migrator = DatabaseMigrator::new(connection); + let migrations = migrator.apply_migrations().await?; + tracing::info!(target: "database", migrations); }; + Ok(database) + } + + async fn connect_runtime(app_config: &AppConfig) -> anyhow::Result { // TODO: Load startups clients. - let runtime = RuntimeClient::builder().build(); + let runtime = RuntimeConn::builder().build(); + Ok(runtime) + } - Ok(Self { - app_database, - app_hashing: AppHashing::new(), - client_runtime: runtime, - websocket_room: WebsocketServer::new(), - }) + async fn run_trigger_daemon(database: Database) -> anyhow::Result<()> { + // TODO: Load all tasks with checked triggers. + let connection = database.get_connection().await?; + + Ok(()) } } @@ -69,9 +87,9 @@ macro_rules! impl_di { )+}; } -impl_di!(app_database: AppDatabase); -impl_di!(app_hashing: AppHashing); -impl_di!(client_runtime: RuntimeClient); +impl_di!(database: Database); +impl_di!(hasher: Argon2Hasher); +impl_di!(client_runtime: RuntimeConn); impl_di!(websocket_room: WebsocketServer); #[cfg(test)] @@ -83,10 +101,4 @@ mod test { let config = AppConfig::builder(); let _ = AppState::connect(config.build()); } - - #[test] - fn configure_app_state() { - let config = AppConfig::builder(); - let _ = AppState::connect(config.build()); - } } diff --git a/crates/server/service/scheduler.rs b/crates/server/service/scheduler.rs new file mode 100644 index 0000000..8bdd73b --- /dev/null +++ b/crates/server/service/scheduler.rs @@ -0,0 +1,56 @@ +use std::time::Duration; + +use axiston_database::AppDatabase; +use tokio::task::JoinHandle; + +use crate::service::AppState; + +/// TODO. +#[derive(Debug)] +pub struct SchedulerError {} + +/// Runs all active triggers sequentially in the priority order. +#[derive(Debug)] +pub struct SchedulerRuntime { + query_every: Option, + app_database: AppDatabase, +} + +impl SchedulerRuntime { + /// Returns a new [`SchedulerRuntime`]. + #[inline] + pub fn new(app_state: AppState) -> Self { + Self { + query_every: None, + app_database: app_state.app_database, + } + } + + pub fn run_trigger_loop(self) -> JoinHandle> { + tracing::info!(target: "scheduler", "waiting"); + let handle = tokio::spawn(async move { + tracing::info!(target: "scheduler", "running"); + self.execute_loop().await + }); + + handle + } + + async fn execute_loop(&self) -> Result<(), SchedulerError> { + loop { + self.execute_once().await?; + if let Some(query_every) = self.query_every { + tokio::time::sleep(query_every).await; + } + } + } + + async fn execute_once(&self) -> Result<(), SchedulerError> { + todo!() + } +} + +#[cfg(test)] +mod test {} + +// TODO GraphWorker diff --git a/docker-compose.yaml b/docker-compose.yaml index 006a751..d43fc57 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,39 +1,39 @@ version: '3.8' services: -# axiston-gateway: -# container_name: gateway -# build: -# context: ./ -# dockerfile: Dockerfile -# environment: -# - DATABASE=postgresql://usr:pwd@localhost:5432/db -# depends_on: -# - axiston-database -# ports: -# - "8000:8000" -# networks: -# - axiston + # axiston-gateway: + # container_name: gateway + # build: + # context: ./ + # dockerfile: Dockerfile + # environment: + # - DATABASE=postgresql://usr:pwd@localhost:5432/db + # depends_on: + # - axiston-database + # ports: + # - "8000:8000" + # networks: + # - axiston - # https://hub.docker.com/u/axiston - axiston-database: - container_name: database - build: - context: ./postgres/ - dockerfile: ./Dockerfile - environment: - POSTGRES_USER: usr - POSTGRES_PASSWORD: pwd - POSTGRES_DB: db - ports: - - "5432:5432" - volumes: - - pgdata:/var/lib/postgresql/data - networks: - - axiston + # https://hub.docker.com/u/axiston + axiston-database: + container_name: database + build: + context: ./postgres/ + dockerfile: ./Dockerfile + environment: + POSTGRES_USER: usr + POSTGRES_PASSWORD: pwd + POSTGRES_DB: db + ports: + - "5432:5432" + volumes: + - pgdata:/var/lib/postgresql/data + networks: + - axiston volumes: - pgdata: + pgdata: networks: - axiston: + axiston: