diff --git a/src/gleam/otp/task.gleam b/src/gleam/otp/task.gleam index fff859a..3a0554f 100644 --- a/src/gleam/otp/task.gleam +++ b/src/gleam/otp/task.gleam @@ -28,17 +28,13 @@ //// [1]: https://hexdocs.pm/elixir/master/Task.html //// -// TODO: await_many import gleam/dynamic.{type Dynamic} -import gleam/erlang/process.{type Pid, type ProcessMonitor, type Selector} +import gleam/erlang/process.{type Pid, type Selector, type Subject} +import gleam/function +import gleam/option.{type Option, Some} pub opaque type Task(value) { - Task( - owner: Pid, - pid: Pid, - monitor: ProcessMonitor, - selector: Selector(Message(value)), - ) + Task(owner: Pid, pid: Pid, subject: Subject(value)) } // TODO: test @@ -53,12 +49,7 @@ pub fn async(work: fn() -> value) -> Task(value) { let subject = process.new_subject() let pid = process.start(linked: True, running: fn() { process.send(subject, work()) }) - let monitor = process.monitor_process(pid) - let selector = - process.new_selector() - |> process.selecting_process_down(monitor, FromMonitor) - |> process.selecting(subject, FromSubject) - Task(owner: owner, pid: pid, monitor: monitor, selector: selector) + Task(owner: owner, pid: pid, subject: subject) } pub type AwaitError { @@ -80,29 +71,20 @@ fn assert_owner(task: Task(a)) -> Nil { } } -type Message(value) { - FromMonitor(process.ProcessDown) - FromSubject(value) -} - // TODO: test /// Wait for the value computed by a task. /// -/// If the a value is not received before the timeout has elapsed or if the -/// task process crashes then an error is returned. +/// If the a value is not received before the timeout has elapsed then an error +/// is returned. /// pub fn try_await(task: Task(value), timeout: Int) -> Result(value, AwaitError) { assert_owner(task) - case process.select(task.selector, timeout) { + let selector = + process.new_selector() + |> process.selecting(task.subject, function.identity) + case process.select(selector, timeout) { // The task process has sent back a value - Ok(FromSubject(x)) -> { - process.demonitor_process(task.monitor) - Ok(x) - } - - // The task process crashed without sending a value - Ok(FromMonitor(process.ProcessDown(reason: reason, ..))) -> - Error(Exit(reason)) + Ok(x) -> Ok(x) // The task process is alive but has not sent a value yet Error(Nil) -> Error(Timeout) @@ -120,34 +102,92 @@ pub fn await(task: Task(value), timeout: Int) -> value { value } -/// Wait endlessly for the value computed by a task. -/// -/// Be Careful! This function does not return until there is a value to -/// receive. If a value is not received then the process will be stuck waiting -/// forever. -/// +@deprecated("Use await_forever") pub fn try_await_forever(task: Task(value)) -> Result(value, AwaitError) { assert_owner(task) - case process.select_forever(task.selector) { + let selector = + process.new_selector() + |> process.selecting(task.subject, function.identity) + case process.select_forever(selector) { // The task process has sent back a value - FromSubject(x) -> { - process.demonitor_process(task.monitor) - Ok(x) - } - - // The task process crashed without sending a value - FromMonitor(process.ProcessDown(reason: reason, ..)) -> Error(Exit(reason)) + x -> Ok(x) } } /// Wait endlessly for the value computed by a task. /// -/// Be Careful! Like `try_await_forever`, this function does not return until there is a value to -/// receive. +/// Be Careful! Like `try_await_forever`, this function does not return until +/// there is a value to receive. /// /// If the task process crashes then this function crashes. /// pub fn await_forever(task: Task(value)) -> value { - let assert Ok(value) = try_await_forever(task) - value + assert_owner(task) + let selector = + process.new_selector() + |> process.selecting(task.subject, function.identity) + process.select_forever(selector) +} + +type Message2(t1, t2) { + M2FromSubject1(t1) + M2FromSubject2(t2) + M2Timeout +} + +/// Wait for the values computed by multiple tasks. +/// +/// For each task, if the a value is not received before the timeout has +/// elapsed then an error is returned. +/// +pub fn try_await2( + task1: Task(t1), + task2: Task(t2), + timeout: Int, +) -> #(Result(t1, AwaitError), Result(t2, AwaitError)) { + assert_owner(task1) + assert_owner(task2) + + let timeout_subject = process.new_subject() + let timer = process.send_after(timeout_subject, timeout, M2Timeout) + + process.new_selector() + |> process.selecting(task1.subject, M2FromSubject1) + |> process.selecting(task2.subject, M2FromSubject2) + |> process.selecting(timeout_subject, function.identity) + |> try_await2_loop(option.None, option.None, task1, task2, timer) +} + +fn try_await2_loop( + selector: Selector(Message2(t1, t2)), + t1: Option(Result(t1, AwaitError)), + t2: Option(Result(t2, AwaitError)), + task1: Task(t1), + task2: Task(t2), + timeout: process.Timer, +) -> #(Result(t1, AwaitError), Result(t2, AwaitError)) { + case t1, t2 { + Some(t1), Some(t2) -> #(t1, t2) + + _, _ -> { + case process.select_forever(selector) { + // The task process has sent back a value + M2FromSubject1(x) -> { + let t1 = Some(Ok(x)) + try_await2_loop(selector, t1, t2, task1, task2, timeout) + } + M2FromSubject2(x) -> { + let t2 = Some(Ok(x)) + try_await2_loop(selector, t1, t2, task1, task2, timeout) + } + + M2Timeout -> { + #( + option.unwrap(t1, Error(Timeout)), + option.unwrap(t2, Error(Timeout)), + ) + } + } + } + } } diff --git a/test/gleam/otp/task_test.gleam b/test/gleam/otp/task_test.gleam index c041f76..d294398 100644 --- a/test/gleam/otp/task_test.gleam +++ b/test/gleam/otp/task_test.gleam @@ -100,3 +100,59 @@ pub fn async_await_forever_unmonitor_test() { |> get_message_queue_length |> should.equal(0) } + +pub fn try_await2_test() { + // Start with an empty mailbox + flush() + + let work = fn(x) { + fn() { + sleep(5) + x + } + } + + let task1 = task.async(work(1)) + let task2 = task.async(work(2)) + + task.try_await2(task1, task2, 8) + |> should.equal(#(Ok(1), Ok(2))) +} + +pub fn try_await2_timeout_test() { + // Start with an empty mailbox + flush() + + let work = fn(x, y) { + fn() { + sleep(y) + x + } + } + + // 2 will not finish in time + let task1 = task.async(work(1, 0)) + let task2 = task.async(work(2, 10)) + + task.try_await2(task1, task2, 5) + |> should.equal(#(Ok(1), Error(Timeout))) +} + +pub fn try_await3_timeout_test() { + // Start with an empty mailbox + flush() + + let work = fn(x, y) { + fn() { + sleep(y) + x + } + } + + // 1 will not finish in time + let task1 = task.async(work(1, 100)) + let task2 = task.async(work(2, 1)) + + task.try_await2(task1, task2, 20) + |> should.equal(#(Error(Timeout), Ok(2))) +}