Skip to content

Commit

Permalink
[BUG]: keep track of current span if task must be queued (#3548)
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb authored Jan 23, 2025
1 parent 7c11224 commit bf29ead
Showing 1 changed file with 4 additions and 11 deletions.
15 changes: 4 additions & 11 deletions rust/system/src/execution/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use tracing::{trace_span, Instrument, Span};
*/
#[derive(Debug)]
pub struct Dispatcher {
task_queue: Vec<TaskMessage>,
task_queue: Vec<(TaskMessage, Span)>,
waiters: Vec<TaskRequestMessage>,
n_worker_threads: usize,
queue_size: usize,
Expand Down Expand Up @@ -105,18 +105,15 @@ impl Dispatcher {
// If a worker is waiting for a task, send it to the worker in FIFO order
// Otherwise, add it to the task queue
match self.waiters.pop() {
Some(channel) => match channel
.reply_to
.send(task, Some(Span::current().clone()))
.await
Some(channel) => match channel.reply_to.send(task, Some(Span::current())).await
{
Ok(_) => {}
Err(e) => {
tracing::error!("Error sending task to worker: {:?}", e);
}
},
None => {
self.task_queue.push(task);
self.task_queue.push((task, Span::current()));
}
}
}
Expand All @@ -130,11 +127,7 @@ impl Dispatcher {
/// it when one is available
async fn handle_work_request(&mut self, request: TaskRequestMessage) {
match self.task_queue.pop() {
Some(task) => match request
.reply_to
.send(task, Some(Span::current().clone()))
.await
{
Some((task, span)) => match request.reply_to.send(task, Some(span)).await {
Ok(_) => {}
Err(e) => {
println!("Error sending task to worker: {:?}", e);
Expand Down

0 comments on commit bf29ead

Please sign in to comment.