Skip to content

Commit

Permalink
chore(cyclotron): change columns to bytes, add blob column (#24563)
Browse files Browse the repository at this point in the history
Co-authored-by: Oliver Browne <[email protected]>
  • Loading branch information
bretthoerner and oliverb123 authored Aug 27, 2024
1 parent ce43b85 commit 1fb4e7f
Show file tree
Hide file tree
Showing 30 changed files with 554 additions and 301 deletions.
33 changes: 24 additions & 9 deletions plugin-server/src/cdp/async-function-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,11 @@ export class AsyncFunctionExecutor {
url,
method,
headers,
body,
}),
metadata: JSON.stringify({
// TODO: It seems like Fetch expects metadata to have this shape, which
// I don't understand. I think `metadata` is where all the other Hog
// state is going to be stored? For now I'm just trying to make fetch
// work.
tries: 0,
trace: [],
// The body is passed in the `blob` field below.
}),
metadata: JSON.stringify({}),
// Fetch bodies are passed in the binary blob column/field.
blob: toUint8Array(body),
})
} catch (e) {
status.error(
Expand Down Expand Up @@ -193,3 +188,23 @@ export class AsyncFunctionExecutor {
return response
}
}

function toUint8Array(data: any): Uint8Array | undefined {
if (data === null || data === undefined) {
return undefined
}

if (data instanceof Uint8Array) {
return data
}

if (data instanceof ArrayBuffer) {
return new Uint8Array(data)
}

if (typeof data === 'string') {
return new TextEncoder().encode(data)
}

return new TextEncoder().encode(JSON.stringify(data))
}
2 changes: 1 addition & 1 deletion rust/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[env]
# Force SQLX to run in offline mode for CI. Devs can change this if they want, to live code against the DB,
# but we use it at the workspace level here to allow use of sqlx macros across all crates
SQLX_OFFLINE = "true"
SQLX_OFFLINE = "true"
2 changes: 1 addition & 1 deletion rust/.env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database
DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE cyclotron_jobs
ALTER COLUMN vm_state TYPE bytea USING vm_state::bytea,
ALTER COLUMN metadata TYPE bytea USING metadata::bytea,
ALTER COLUMN parameters TYPE bytea USING parameters::bytea,
ADD COLUMN blob bytea;
1 change: 1 addition & 0 deletions rust/cyclotron-core/src/bin/create_test_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async fn main() {
vm_state: None,
parameters: None,
metadata: None,
blob: None,
};

manager.create_job(test_job).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions rust/cyclotron-core/src/bin/load_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn producer_loop(manager: QueueManager, shared_context: Arc<SharedContext>
function_id: Some(Uuid::now_v7()),
vm_state: None,
parameters: None,
blob: None,
metadata: None,
};

Expand Down
1 change: 1 addition & 0 deletions rust/cyclotron-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod ops;
// Types
mod types;
pub use types::BulkInsertResult;
pub use types::Bytes;
pub use types::Job;
pub use types::JobInit;
pub use types::JobState;
Expand Down
17 changes: 12 additions & 5 deletions rust/cyclotron-core/src/ops/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ INSERT INTO cyclotron_jobs
priority,
vm_state,
metadata,
parameters
parameters,
blob
)
VALUES
($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10)
($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10, $11)
"#,
id,
data.team_id,
Expand All @@ -44,7 +45,8 @@ VALUES
data.priority,
data.vm_state,
data.metadata,
data.parameters
data.parameters,
data.blob
)
.execute(executor)
.await?;
Expand Down Expand Up @@ -74,6 +76,7 @@ where
let mut vm_states = Vec::with_capacity(jobs.len());
let mut metadatas = Vec::with_capacity(jobs.len());
let mut parameters = Vec::with_capacity(jobs.len());
let mut blob = Vec::with_capacity(jobs.len());

for d in jobs {
ids.push(Uuid::now_v7());
Expand All @@ -92,6 +95,7 @@ where
vm_states.push(d.vm_state.clone());
metadatas.push(d.metadata.clone());
parameters.push(d.parameters.clone());
blob.push(d.blob.clone());
}

// Using the "unnest" function to turn an array of rows into a set of rows
Expand All @@ -114,7 +118,8 @@ INSERT INTO cyclotron_jobs
priority,
vm_state,
metadata,
parameters
parameters,
blob
)
SELECT *
FROM UNNEST(
Expand All @@ -133,7 +138,8 @@ FROM UNNEST(
$13,
$14,
$15,
$16
$16,
$17
)
"#,
)
Expand All @@ -153,6 +159,7 @@ FROM UNNEST(
.bind(vm_states)
.bind(metadatas)
.bind(parameters)
.bind(blob)
.execute(executor)
.await?;

Expand Down
Loading

0 comments on commit 1fb4e7f

Please sign in to comment.