Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cyclotron #24228

Merged
merged 47 commits into from
Aug 21, 2024
Merged

feat: cyclotron #24228

merged 47 commits into from
Aug 21, 2024

Conversation

oliverb123
Copy link
Contributor

Problem

We want our delivery system (aka "the CDP" or "destinations" or probably other names too, I find it hard to keep up) to deliver (insert very large number here) of events per second. This is the start of a PG-based, sharded job queue system that's intended to let us do that (while managing QoS on a per-user or per-endpoint basis, and doing other fancy tricks kafka makes difficult). The underlying implementation is written in rust, but it's designed to be easy to expose bindings to other languages, so projects or teams favouring TS/JS (e.g. the hog folks) or python (I haven't been in the company long enough to call out anyone specific) can still interact with our "delivery engine" (queue work, manage work for a given team or function or endpoint or queue or whatever, or even ship a worker to consume jobs from the queue, if needed).

Right now only the PG part is in place here, the sharding is mostly signs in the ground labelled "draw the rest of the F'ing owl", and the management commands I first ship with will be executed directly on the DB (or DBs) by the issuing manager instance, even though the intention is for them to be pushed onto a kafka queue and for each shard to individually manage it's control tables. Also I need to do a bunch of query optimisation (although one of the nice things here is that, because the DB is sharded, we can afford to be a bit inefficient with our DB ops, since if we need more throughput we just spin up more shards), and so on and so on and so on. It's v0, it's going to be a little rough around the edges.

You might notice this looks a bit like rustyhook. You'd be correct, we stole a fair amount of both inspiration and literal SQL from it while sketching this out. We didn't try to simply extend rustyhook because the queue implementation there made a series of design choices that make it unsuitable for 1) being embedded in other languages 2) having more than 1 type of worker operate atop it and 3) supporting the kind of work-management features we expect the delivery system to need. These were good decisions - rusty hook was designed to only use it's own queue internally for retries, needed to ship, and was /definitely/ never meant to be embedded in other languages - but our delivery solution is maturing and growing in complexity, and it's needs have outgrown rustyhook. Onwards and upwards.

Copy link
Contributor

@bretthoerner bretthoerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good starting point, nice work.

I have to admit after seeing the code it's not super obvious what Rust is doing for us. 😓

But I guess being able to write workers like fetch in it will be nice.

@@ -0,0 +1 @@
Ripped from rusty-hook, since it'll be used across more or less all cyclotron stuff, as well as rustyhook
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total nit, but this seems more like a PR comment than a README

-- TODO - I go back and forth on whether this should just be an open text field,
-- rather than an enum - that makes it faster to add new kinds of workers to the
-- system (since you don't have to bump library versions for anything consuming the
-- cyclotron-core crate), but having a defined set of workers means you can spin up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd only have to bump it for producers/consumers of that queue anyway, right? Seems fine.

created TIMESTAMPTZ NOT NULL,
-- Queue bookkeeping - invisible to the worker
lock_id UUID, -- This is set when a job is in a running state, and is required to update the job.
last_heartbeat TIMESTAMPTZ, -- This is updated by the worker to indicate that the job is making forward progress even without transitions (and should not be reaped)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to note, as discussed on the call, if we think many jobs will run long enough to require a heartbeat (beyond the initial dequeue) I think we'd save a lot by heartbeating a single session (unique per worker process).

// rather than doing that, it could just put the job in a "dead letter" state, and no worker or janitor process
// will touch it... maybe the table moving isn't needed? but either way, being able to debug jobs that cause workers
// to stall would be good (and, thinking about it, moving it to a new table means we don't have to clear the lock,
// so have a potential way to trace back to the last worker that died holding the job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or the entire row could be written outside of the DB, even just to logs in the short term.

Anyway, this is a good idea. Poison pills really hurt us in the txn-based Rusty-Hook, and this will be a nice safety net.

let oldest_valid_heartbeat = Utc::now() - timeout;
// NOTE - we don't check the lock_id here, because it probably doesn't matter (the lock_id should be set if the
// job state is "running"), but perhaps we should only delete jobs with a set lock_id, and report an error
// if we find a job with a state of "running" and no lock_id. Also, we delete jobs whose last_heartbeat is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it could be nice for the janitor to check some of our invariants and report anything suspicious. Doesn't have to be now though.

// All dequeued job IDs that haven't been flushed yet. The idea is this lets us
// manage, on the rust side of any API boundary, the "pending" update of any given
// job, such that a user can progressively build up a full update, and then flush it,
// rather than having to track the update state on their side and submit it all at once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I'll have to read more code but it's surprising to me that a worker would even have lots of little updates to do for a job, rather than a single simple update at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most wont, but e.g. in the fetch impl, I have a pattern of queuing up the set of updates that would put a job into the deal letter queue, then doing some serde or other "this should never fail, if it does that's a coding error in the fetch worker" work, and if any of that causes an error I just flush the update (sending the job to the DLQ), otherwise I then queue up the updates to send it back to an available state or whatever, and flush

It looks like (ripped from in-progress code, but you maybe get the idea)

// Complete the job, either because we got a good response, or because the jobs retries
// have been exceeded.
pub async fn complete_job(
    worker: &Worker,
    job: &Job,
    return_worker: WaitingOn,
    return_queue: Option<String>,
    on_finish: OnFinish,
    result: FetchResult,
) -> Result<(), WorkerError> {
    // If we fail any serde, we just want to flush to the DLQ and bail
    worker.set_state(job.id, JobState::Available)?;
    worker.set_queue(job.id, DEAD_LETTER_QUEUE)?;

    let is_completed = result.is_completed();

    let result = match serde_json::to_string(&result) {
        Ok(r) => r,
        Err(e) => {
            // Leave behind a hint for debugging
            worker.set_metadata(job.id, Some(format!("Failed to serialise result: {}", e)))?;
            worker.flush_job(job.id).await?;
            return Err(WorkerError::SerdeError(e));
        }
    };

    worker.set_queue(
        job.id,
        &return_queue.unwrap_or_else(|| job.queue_name.clone()),
    )?;
    worker.set_waiting_on(job.id, return_worker)?;

    match (is_completed, on_finish) {
        (true, _) | (false, OnFinish::Return) => {
            worker.set_state(job.id, JobState::Available)?;
        }
        (false, OnFinish::Complete) => {
            worker.set_state(job.id, JobState::Failed)?;
        }
    }

    worker.set_parameters(job.id, Some(result))?;
    worker.set_metadata(job.id, None)?; // We're finished with the job, so clear our internal state
    worker.flush_job(job.id).await?;

    Ok(())
}

Copy link
Contributor

@bretthoerner bretthoerner Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my opinion, but following what state everything in here seems like more of a burden than just having a happy path that sets things at the end, otherwise throw an error, and have a wrapper function that catches errors and sets the state properly in that case.

Being N lines into a function and having to remember "OK, at the start we set it to DLQ at fn start, so if X happens then Y will happen" only gets worse over time in my experience.

/// job lock). We're more strict here (flushes can only happen once, you must
/// flush some non-running state) to try and enforce a good interaction
/// pattern with the queue. I might return to this and loosen this constraint in the
/// future, if there's a motivating case for needing to flush partial job updates.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of like above, I'm still confused why our whole update API isn't just a "I'm done with the job, here is its new state" type call.

// The general interface for calling our functions takes a JSON serialized stirng,
// because neon has no nice serde support for function arguments (and generally.
// rippping objects from the v8 runtime piece by piece is slower than just passing
// a since chunk of bytes). These are convenience functions for converting between
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// a since chunk of bytes). These are convenience functions for converting between
// a single chunk of bytes). These are convenience functions for converting between

(This kind of trails off, too.)

Copy link
Contributor

github-actions bot commented Aug 14, 2024

Size Change: 0 B

Total Size: 1.08 MB

ℹ️ View Unchanged
Filename Size
frontend/dist/toolbar.js 1.08 MB

compressed-size-action

@oliverb123 oliverb123 mentioned this pull request Aug 16, 2024
25 tasks
@bretthoerner bretthoerner merged commit 9734a40 into master Aug 21, 2024
87 checks passed
@bretthoerner bretthoerner deleted the oliver_cyclotron_lib branch August 21, 2024 18:24
bretthoerner added a commit that referenced this pull request Aug 21, 2024
bretthoerner added a commit that referenced this pull request Aug 21, 2024
This reverts commit 9734a40.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants