From a574fb1a75a04cade71d63d3b68a3a0151283a88 Mon Sep 17 00:00:00 2001 From: max-lt Date: Mon, 25 Mar 2024 10:30:41 +0100 Subject: [PATCH] Fix healthcheck and pgconn --- Dockerfile | 5 ++++- bin/main.rs | 33 ++++++++++++++++++++++++++------- src/event_scheduled.rs | 13 +++++++++++-- src/store.rs | 10 ++++------ 4 files changed, 45 insertions(+), 16 deletions(-) diff --git a/Dockerfile b/Dockerfile index cf1e626..1f1267a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,14 +11,17 @@ COPY . /build/openworkers-runner RUN touch $RUNTIME_SNAPSHOT_PATH -RUN --mount=type=cache,target=~/.cargo \ +RUN --mount=type=cache,target=$CARGO_HOME/git \ + --mount=type=cache,target=$CARGO_HOME/registry \ --mount=type=cache,target=/build/openworkers-runner/target \ cargo run --release --bin snapshot && \ + # Build the runner and copy executable out of the cache so it can be used in the next stage cargo build --release && cp /build/openworkers-runner/target/release/openworkers-runner /build/output FROM debian:bookworm-slim RUN apt-get update \ + # Install ca-certificates and wget (used for healthcheck) && apt-get install -y ca-certificates wget \ && rm -rf /var/lib/apt/lists/* diff --git a/bin/main.rs b/bin/main.rs index 2effc97..4d44e97 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -16,10 +16,8 @@ use sqlx::postgres::PgPoolOptions; use openworkers_runner::store::WorkerIdentifier; -type Database = sqlx::Pool; - struct AppState { - db: Database, + db: sqlx::Pool, } async fn handle_request(data: Data, req: HttpRequest, body: Bytes) -> HttpResponse { @@ -30,6 +28,16 @@ async fn handle_request(data: Data, req: HttpRequest, body: Bytes) -> std::thread::current().id() ); + // Acquire a database connection from the pool. + let mut conn: sqlx::pool::PoolConnection = match data.db.acquire().await { + Ok(db) => db, + Err(err) => { + error!("Failed to acquire a database connection: {}", err); + return HttpResponse::InternalServerError() + .body("Failed to acquire a database connection"); + } + }; + // Expect x-request-id header let request_id = match req.headers().get("x-request-id") { Some(value) => value.to_str().unwrap(), @@ -70,7 +78,7 @@ async fn handle_request(data: Data, req: HttpRequest, body: Bytes) -> worker_name = Some(host.split('.').next().unwrap().to_string()); } else { worker_id = - openworkers_runner::store::get_worker_id_from_domain(&data.db, host).await; + openworkers_runner::store::get_worker_id_from_domain(&mut conn, host).await; } } } @@ -91,7 +99,7 @@ async fn handle_request(data: Data, req: HttpRequest, body: Bytes) -> } }; - let worker = openworkers_runner::store::get_worker(&data.db, worker_identifier).await; + let worker = openworkers_runner::store::get_worker(&mut conn, worker_identifier).await; debug!("worker found: {:?}", worker.is_some()); @@ -172,7 +180,13 @@ async fn handle_request(data: Data, req: HttpRequest, body: Bytes) -> response } -async fn health_check() -> HttpResponse { +async fn health_check(req: HttpRequest) -> HttpResponse { + debug!( + "health_check of: {:?} {}", + req.connection_info(), + req.method() + ); + HttpResponse::Ok().body("ok") } @@ -230,7 +244,12 @@ async fn main() -> std::io::Result<()> { App::new() .app_data(Data::new(AppState { db: pool.clone() })) - .route("/health", web::get().to(health_check)) + .service( + web::resource("/health") + .guard(actix_web::guard::Header("host", "127.0.0.1:8080")) + .route(web::head().to(health_check)) + .route(web::get().to(health_check)), + ) .default_service(web::to(handle_request)) }) .bind(("0.0.0.0", 8080))? diff --git a/src/event_scheduled.rs b/src/event_scheduled.rs index f0f9cac..9e06c6e 100644 --- a/src/event_scheduled.rs +++ b/src/event_scheduled.rs @@ -54,7 +54,7 @@ fn run_scheduled(data: ScheduledData, script: Script) { }); } -pub fn handle_scheduled(db: store::Database) { +pub fn handle_scheduled(db: sqlx::Pool) { std::thread::spawn(move || { let local = tokio::task::LocalSet::new(); @@ -64,6 +64,15 @@ pub fn handle_scheduled(db: store::Database) { .unwrap(); let handle = local.spawn_local(async move { + // Acquire a database connection from the pool. + let mut conn: sqlx::pool::PoolConnection = match db.acquire().await { + Ok(db) => db, + Err(err) => { + log::error!("Failed to acquire a database connection: {}", err); + return; + } + }; + let nc = crate::nats::nats_connect(); let sub = nc .queue_subscribe("scheduled", "runner") @@ -85,7 +94,7 @@ pub fn handle_scheduled(db: store::Database) { log::debug!("scheduled task parsed: {:?}", data); let worker_id = store::WorkerIdentifier::Id(data.worker_id.clone()); - let worker = match store::get_worker(&db, worker_id).await { + let worker = match store::get_worker(&mut conn, worker_id).await { Some(worker) => worker, None => { log::error!("worker not found: {:?}", data.worker_id); diff --git a/src/store.rs b/src/store.rs index 001ddea..e1341da 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,5 @@ use sqlx::prelude::FromRow; -pub type Database = sqlx::Pool; - #[derive(Debug)] pub enum WorkerIdentifier { Id(String), @@ -25,7 +23,7 @@ pub struct WorkerData { pub language: WorkerLanguage, } -pub async fn get_worker(db: &Database, identifier: WorkerIdentifier) -> Option { +pub async fn get_worker(conn: &mut sqlx::PgConnection, identifier: WorkerIdentifier) -> Option { log::debug!("get_worker: {:?}", identifier); let query = format!( @@ -56,7 +54,7 @@ pub async fn get_worker(db: &Database, identifier: WorkerIdentifier) -> Option(query.as_str()) .bind(identifier) - .fetch_one(db) + .fetch_one(conn) .await { Ok(worker) => { @@ -70,13 +68,13 @@ pub async fn get_worker(db: &Database, identifier: WorkerIdentifier) -> Option Option { +pub async fn get_worker_id_from_domain(conn: &mut sqlx::PgConnection, domain: String) -> Option { let query = sqlx::query_scalar!( "SELECT worker_id::text FROM domains WHERE name = $1 LIMIT 1", domain ); - match query.fetch_one(db).await { + match query.fetch_one(conn).await { Ok(worker_id) => worker_id, Err(err) => { log::warn!("failed to get worker id from domain: {:?}", err);