Skip to content

Commit

Permalink
chore(cyclotron): change columns to bytes, add blob column
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Aug 23, 2024
1 parent 145cacf commit 02ef5d0
Show file tree
Hide file tree
Showing 27 changed files with 200 additions and 112 deletions.
2 changes: 1 addition & 1 deletion rust/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[env]
# Force SQLX to run in offline mode for CI. Devs can change this if they want, to live code against the DB,
# but we use it at the workspace level here to allow use of sqlx macros across all crates
SQLX_OFFLINE = "true"
SQLX_OFFLINE = "true"
2 changes: 1 addition & 1 deletion rust/.env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database
DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE cyclotron_jobs
ALTER COLUMN vm_state TYPE bytea USING vm_state::bytea,
ALTER COLUMN metadata TYPE bytea USING metadata::bytea,
ALTER COLUMN parameters TYPE bytea USING parameters::bytea,
ADD COLUMN blob bytea;
1 change: 1 addition & 0 deletions rust/cyclotron-core/src/bin/create_test_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async fn main() {
vm_state: None,
parameters: None,
metadata: None,
blob: None,
};

manager.create_job(test_job).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions rust/cyclotron-core/src/bin/load_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn producer_loop(manager: QueueManager, shared_context: Arc<SharedContext>
function_id: Some(Uuid::now_v7()),
vm_state: None,
parameters: None,
blob: None,
metadata: None,
};

Expand Down
38 changes: 31 additions & 7 deletions rust/cyclotron-core/src/ops/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use uuid::Uuid;

use crate::{
error::QueueError,
types::{Job, JobState, JobUpdate},
types::{Bytes, Job, JobState, JobUpdate},
};

use super::meta::throw_if_no_rows;
Expand Down Expand Up @@ -59,9 +59,10 @@ RETURNING
last_transition,
scheduled,
transition_count,
NULL as vm_state,
NULL::bytea as vm_state,
metadata,
parameters,
blob,
lock_id,
last_heartbeat,
janitor_touch_count
Expand Down Expand Up @@ -126,6 +127,7 @@ RETURNING
vm_state,
metadata,
parameters,
blob,
lock_id,
last_heartbeat,
janitor_touch_count
Expand All @@ -142,12 +144,12 @@ pub async fn get_vm_state<'c, E>(
executor: E,
job_id: Uuid,
lock_id: Uuid,
) -> Result<Option<String>, QueueError>
) -> Result<Option<Bytes>, QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
struct VMState {
vm_state: Option<String>,
vm_state: Option<Bytes>,
}

let res = sqlx::query_as!(
Expand Down Expand Up @@ -209,6 +211,10 @@ where
set_parameters(&mut *txn, job_id, parameters, lock_id).await?;
}

if let Some(blob) = updates.blob {
set_blob(&mut *txn, job_id, blob, lock_id).await?;
}

// Calling flush indicates forward progress, so we should touch the heartbeat
set_heartbeat(&mut *txn, job_id, lock_id).await?;

Expand Down Expand Up @@ -316,7 +322,7 @@ where
pub async fn set_vm_state<'c, E>(
executor: E,
job_id: Uuid,
vm_state: Option<String>,
vm_state: Option<Bytes>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
Expand All @@ -334,7 +340,7 @@ where
pub async fn set_metadata<'c, E>(
executor: E,
job_id: Uuid,
metadata: Option<String>,
metadata: Option<Bytes>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
Expand All @@ -352,7 +358,7 @@ where
pub async fn set_parameters<'c, E>(
executor: E,
job_id: Uuid,
parameters: Option<String>,
parameters: Option<Bytes>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
Expand All @@ -367,6 +373,24 @@ where
assert_does_update(executor, job_id, lock_id, q).await
}

pub async fn set_blob<'c, E>(
executor: E,
job_id: Uuid,
blob: Option<Bytes>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let q = sqlx::query!(
"UPDATE cyclotron_jobs SET blob = $1 WHERE id = $2 AND lock_id = $3",
blob,
job_id,
lock_id
);
assert_does_update(executor, job_id, lock_id, q).await
}

pub async fn set_heartbeat<'c, E>(
executor: E,
job_id: Uuid,
Expand Down
Loading

0 comments on commit 02ef5d0

Please sign in to comment.