Skip to content

Commit

Permalink
task.try_await2
Browse files Browse the repository at this point in the history
  • Loading branch information
lpil committed Oct 24, 2024
1 parent a618564 commit 573a6b5
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 48 deletions.
136 changes: 88 additions & 48 deletions src/gleam/otp/task.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,13 @@
//// [1]: https://hexdocs.pm/elixir/master/Task.html
////

// TODO: await_many
import gleam/dynamic.{type Dynamic}
import gleam/erlang/process.{type Pid, type ProcessMonitor, type Selector}
import gleam/erlang/process.{type Pid, type Selector, type Subject}
import gleam/function
import gleam/option.{type Option, Some}

pub opaque type Task(value) {
Task(
owner: Pid,
pid: Pid,
monitor: ProcessMonitor,
selector: Selector(Message(value)),
)
Task(owner: Pid, pid: Pid, subject: Subject(value))
}

// TODO: test
Expand All @@ -53,12 +49,7 @@ pub fn async(work: fn() -> value) -> Task(value) {
let subject = process.new_subject()
let pid =
process.start(linked: True, running: fn() { process.send(subject, work()) })
let monitor = process.monitor_process(pid)
let selector =
process.new_selector()
|> process.selecting_process_down(monitor, FromMonitor)
|> process.selecting(subject, FromSubject)
Task(owner: owner, pid: pid, monitor: monitor, selector: selector)
Task(owner: owner, pid: pid, subject: subject)
}

pub type AwaitError {
Expand All @@ -80,29 +71,20 @@ fn assert_owner(task: Task(a)) -> Nil {
}
}

type Message(value) {
FromMonitor(process.ProcessDown)
FromSubject(value)
}

// TODO: test
/// Wait for the value computed by a task.
///
/// If the a value is not received before the timeout has elapsed or if the
/// task process crashes then an error is returned.
/// If the a value is not received before the timeout has elapsed then an error
/// is returned.
///
pub fn try_await(task: Task(value), timeout: Int) -> Result(value, AwaitError) {
assert_owner(task)
case process.select(task.selector, timeout) {
let selector =
process.new_selector()
|> process.selecting(task.subject, function.identity)
case process.select(selector, timeout) {
// The task process has sent back a value
Ok(FromSubject(x)) -> {
process.demonitor_process(task.monitor)
Ok(x)
}

// The task process crashed without sending a value
Ok(FromMonitor(process.ProcessDown(reason: reason, ..))) ->
Error(Exit(reason))
Ok(x) -> Ok(x)

// The task process is alive but has not sent a value yet
Error(Nil) -> Error(Timeout)
Expand All @@ -120,34 +102,92 @@ pub fn await(task: Task(value), timeout: Int) -> value {
value
}

/// Wait endlessly for the value computed by a task.
///
/// Be Careful! This function does not return until there is a value to
/// receive. If a value is not received then the process will be stuck waiting
/// forever.
///
@deprecated("Use await_forever")
pub fn try_await_forever(task: Task(value)) -> Result(value, AwaitError) {
assert_owner(task)
case process.select_forever(task.selector) {
let selector =
process.new_selector()
|> process.selecting(task.subject, function.identity)
case process.select_forever(selector) {
// The task process has sent back a value
FromSubject(x) -> {
process.demonitor_process(task.monitor)
Ok(x)
}

// The task process crashed without sending a value
FromMonitor(process.ProcessDown(reason: reason, ..)) -> Error(Exit(reason))
x -> Ok(x)
}
}

/// Wait endlessly for the value computed by a task.
///
/// Be Careful! Like `try_await_forever`, this function does not return until there is a value to
/// receive.
/// Be Careful! Like `try_await_forever`, this function does not return until
/// there is a value to receive.
///
/// If the task process crashes then this function crashes.
///
pub fn await_forever(task: Task(value)) -> value {
let assert Ok(value) = try_await_forever(task)
value
assert_owner(task)
let selector =
process.new_selector()
|> process.selecting(task.subject, function.identity)
process.select_forever(selector)
}

type Message2(t1, t2) {
M2FromSubject1(t1)
M2FromSubject2(t2)
M2Timeout
}

/// Wait for the values computed by multiple tasks.
///
/// For each task, if the a value is not received before the timeout has
/// elapsed then an error is returned.
///
pub fn try_await2(
task1: Task(t1),
task2: Task(t2),
timeout: Int,
) -> #(Result(t1, AwaitError), Result(t2, AwaitError)) {
assert_owner(task1)
assert_owner(task2)

let timeout_subject = process.new_subject()
let timer = process.send_after(timeout_subject, timeout, M2Timeout)

process.new_selector()
|> process.selecting(task1.subject, M2FromSubject1)
|> process.selecting(task2.subject, M2FromSubject2)
|> process.selecting(timeout_subject, function.identity)
|> try_await2_loop(option.None, option.None, task1, task2, timer)
}

fn try_await2_loop(
selector: Selector(Message2(t1, t2)),
t1: Option(Result(t1, AwaitError)),
t2: Option(Result(t2, AwaitError)),
task1: Task(t1),
task2: Task(t2),
timeout: process.Timer,
) -> #(Result(t1, AwaitError), Result(t2, AwaitError)) {
case t1, t2 {
Some(t1), Some(t2) -> #(t1, t2)

_, _ -> {
case process.select_forever(selector) {
// The task process has sent back a value
M2FromSubject1(x) -> {
let t1 = Some(Ok(x))
try_await2_loop(selector, t1, t2, task1, task2, timeout)
}
M2FromSubject2(x) -> {
let t2 = Some(Ok(x))
try_await2_loop(selector, t1, t2, task1, task2, timeout)
}

M2Timeout -> {
#(
option.unwrap(t1, Error(Timeout)),
option.unwrap(t2, Error(Timeout)),
)
}
}
}
}
}
56 changes: 56 additions & 0 deletions test/gleam/otp/task_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,59 @@ pub fn async_await_forever_unmonitor_test() {
|> get_message_queue_length
|> should.equal(0)
}

pub fn try_await2_test() {
// Start with an empty mailbox
flush()

let work = fn(x) {
fn() {
sleep(5)
x
}
}

let task1 = task.async(work(1))
let task2 = task.async(work(2))

task.try_await2(task1, task2, 8)
|> should.equal(#(Ok(1), Ok(2)))
}

pub fn try_await2_timeout_test() {
// Start with an empty mailbox
flush()

let work = fn(x, y) {
fn() {
sleep(y)
x
}
}

// 2 will not finish in time
let task1 = task.async(work(1, 0))
let task2 = task.async(work(2, 10))

task.try_await2(task1, task2, 5)
|> should.equal(#(Ok(1), Error(Timeout)))
}

pub fn try_await3_timeout_test() {
// Start with an empty mailbox
flush()

let work = fn(x, y) {
fn() {
sleep(y)
x
}
}

// 1 will not finish in time
let task1 = task.async(work(1, 100))
let task2 = task.async(work(2, 1))

task.try_await2(task1, task2, 20)
|> should.equal(#(Error(Timeout), Ok(2)))
}

0 comments on commit 573a6b5

Please sign in to comment.