Skip to content

Commit

Permalink
Fix healthcheck and pgconn
Browse files Browse the repository at this point in the history
  • Loading branch information
max-lt committed Mar 25, 2024
1 parent 6a5029a commit a574fb1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 16 deletions.
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ COPY . /build/openworkers-runner

RUN touch $RUNTIME_SNAPSHOT_PATH

RUN --mount=type=cache,target=~/.cargo \
RUN --mount=type=cache,target=$CARGO_HOME/git \
--mount=type=cache,target=$CARGO_HOME/registry \
--mount=type=cache,target=/build/openworkers-runner/target \
cargo run --release --bin snapshot && \
# Build the runner and copy executable out of the cache so it can be used in the next stage
cargo build --release && cp /build/openworkers-runner/target/release/openworkers-runner /build/output

FROM debian:bookworm-slim

RUN apt-get update \
# Install ca-certificates and wget (used for healthcheck)
&& apt-get install -y ca-certificates wget \
&& rm -rf /var/lib/apt/lists/*

Expand Down
33 changes: 26 additions & 7 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ use sqlx::postgres::PgPoolOptions;

use openworkers_runner::store::WorkerIdentifier;

type Database = sqlx::Pool<sqlx::Postgres>;

struct AppState {
db: Database,
db: sqlx::Pool<sqlx::Postgres>,
}

async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) -> HttpResponse {
Expand All @@ -30,6 +28,16 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
std::thread::current().id()
);

// Acquire a database connection from the pool.
let mut conn: sqlx::pool::PoolConnection<sqlx::Postgres> = match data.db.acquire().await {
Ok(db) => db,
Err(err) => {
error!("Failed to acquire a database connection: {}", err);
return HttpResponse::InternalServerError()
.body("Failed to acquire a database connection");
}
};

// Expect x-request-id header
let request_id = match req.headers().get("x-request-id") {
Some(value) => value.to_str().unwrap(),
Expand Down Expand Up @@ -70,7 +78,7 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
worker_name = Some(host.split('.').next().unwrap().to_string());
} else {
worker_id =
openworkers_runner::store::get_worker_id_from_domain(&data.db, host).await;
openworkers_runner::store::get_worker_id_from_domain(&mut conn, host).await;
}
}
}
Expand All @@ -91,7 +99,7 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
}
};

let worker = openworkers_runner::store::get_worker(&data.db, worker_identifier).await;
let worker = openworkers_runner::store::get_worker(&mut conn, worker_identifier).await;

debug!("worker found: {:?}", worker.is_some());

Expand Down Expand Up @@ -172,7 +180,13 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
response
}

async fn health_check() -> HttpResponse {
async fn health_check(req: HttpRequest) -> HttpResponse {
debug!(
"health_check of: {:?} {}",
req.connection_info(),
req.method()
);

HttpResponse::Ok().body("ok")
}

Expand Down Expand Up @@ -230,7 +244,12 @@ async fn main() -> std::io::Result<()> {

App::new()
.app_data(Data::new(AppState { db: pool.clone() }))
.route("/health", web::get().to(health_check))
.service(
web::resource("/health")
.guard(actix_web::guard::Header("host", "127.0.0.1:8080"))
.route(web::head().to(health_check))
.route(web::get().to(health_check)),
)
.default_service(web::to(handle_request))
})
.bind(("0.0.0.0", 8080))?
Expand Down
13 changes: 11 additions & 2 deletions src/event_scheduled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn run_scheduled(data: ScheduledData, script: Script) {
});
}

pub fn handle_scheduled(db: store::Database) {
pub fn handle_scheduled(db: sqlx::Pool<sqlx::Postgres>) {
std::thread::spawn(move || {
let local = tokio::task::LocalSet::new();

Expand All @@ -64,6 +64,15 @@ pub fn handle_scheduled(db: store::Database) {
.unwrap();

let handle = local.spawn_local(async move {
// Acquire a database connection from the pool.
let mut conn: sqlx::pool::PoolConnection<sqlx::Postgres> = match db.acquire().await {
Ok(db) => db,
Err(err) => {
log::error!("Failed to acquire a database connection: {}", err);
return;
}
};

let nc = crate::nats::nats_connect();
let sub = nc
.queue_subscribe("scheduled", "runner")
Expand All @@ -85,7 +94,7 @@ pub fn handle_scheduled(db: store::Database) {
log::debug!("scheduled task parsed: {:?}", data);

let worker_id = store::WorkerIdentifier::Id(data.worker_id.clone());
let worker = match store::get_worker(&db, worker_id).await {
let worker = match store::get_worker(&mut conn, worker_id).await {
Some(worker) => worker,
None => {
log::error!("worker not found: {:?}", data.worker_id);
Expand Down
10 changes: 4 additions & 6 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use sqlx::prelude::FromRow;

pub type Database = sqlx::Pool<sqlx::Postgres>;

#[derive(Debug)]
pub enum WorkerIdentifier {
Id(String),
Expand All @@ -25,7 +23,7 @@ pub struct WorkerData {
pub language: WorkerLanguage,
}

pub async fn get_worker(db: &Database, identifier: WorkerIdentifier) -> Option<WorkerData> {
pub async fn get_worker(conn: &mut sqlx::PgConnection, identifier: WorkerIdentifier) -> Option<WorkerData> {
log::debug!("get_worker: {:?}", identifier);

let query = format!(
Expand Down Expand Up @@ -56,7 +54,7 @@ pub async fn get_worker(db: &Database, identifier: WorkerIdentifier) -> Option<W

match sqlx::query_as::<_, WorkerData>(query.as_str())
.bind(identifier)
.fetch_one(db)
.fetch_one(conn)
.await
{
Ok(worker) => {
Expand All @@ -70,13 +68,13 @@ pub async fn get_worker(db: &Database, identifier: WorkerIdentifier) -> Option<W
}
}

pub async fn get_worker_id_from_domain(db: &Database, domain: String) -> Option<String> {
pub async fn get_worker_id_from_domain(conn: &mut sqlx::PgConnection, domain: String) -> Option<String> {
let query = sqlx::query_scalar!(
"SELECT worker_id::text FROM domains WHERE name = $1 LIMIT 1",
domain
);

match query.fetch_one(db).await {
match query.fetch_one(conn).await {
Ok(worker_id) => worker_id,
Err(err) => {
log::warn!("failed to get worker id from domain: {:?}", err);
Expand Down

0 comments on commit a574fb1

Please sign in to comment.