Skip to content

Commit

Permalink
Implement a non-blocking child process interface (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
CompeyDev authored Oct 16, 2024
1 parent 93fa14d commit 309c461
Show file tree
Hide file tree
Showing 22 changed files with 414 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .lune/hello_lune.luau
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ end
]]

print("Sending 4 pings to google 🌏")
local result = process.spawn("ping", {
local result = process.exec("ping", {
"google.com",
"-c 4",
})
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/lune-std-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ directories = "5.0"
pin-project = "1.0"
os_str_bytes = { version = "7.0", features = ["conversions"] }

bstr = "1.9"
bytes = "1.6.0"

tokio = { version = "1", default-features = false, features = [
"io-std",
"io-util",
Expand Down
126 changes: 106 additions & 20 deletions crates/lune-std-process/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
#![allow(clippy::cargo_common_metadata)]

use std::{
cell::RefCell,
env::{
self,
consts::{ARCH, OS},
},
path::MAIN_SEPARATOR,
process::Stdio,
rc::Rc,
sync::Arc,
};

use mlua::prelude::*;

use lune_utils::TableBuilder;
use mlua_luau_scheduler::{Functions, LuaSpawnExt};
use options::ProcessSpawnOptionsStdio;
use os_str_bytes::RawOsString;
use tokio::io::AsyncWriteExt;
use stream::{ChildProcessReader, ChildProcessWriter};
use tokio::{io::AsyncWriteExt, process::Child, sync::RwLock};

mod options;
mod stream;
mod tee_writer;
mod wait_for_child;

use self::options::ProcessSpawnOptions;
use self::wait_for_child::{wait_for_child, WaitForChildResult};
use self::wait_for_child::wait_for_child;

use lune_utils::path::get_current_dir;

Expand Down Expand Up @@ -73,7 +79,8 @@ pub fn module(lua: &Lua) -> LuaResult<LuaTable> {
.with_value("cwd", cwd_str)?
.with_value("env", env_tab)?
.with_value("exit", process_exit)?
.with_async_function("spawn", process_spawn)?
.with_async_function("exec", process_exec)?
.with_function("create", process_create)?
.build_readonly()
}

Expand Down Expand Up @@ -141,11 +148,16 @@ fn process_env_iter<'lua>(
})
}

async fn process_spawn(
async fn process_exec(
lua: &Lua,
(program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions),
) -> LuaResult<LuaTable> {
let res = lua.spawn(spawn_command(program, args, options)).await?;
let res = lua
.spawn(async move {
let cmd = spawn_command_with_stdin(program, args, options.clone()).await?;
wait_for_child(cmd, options.stdio.stdout, options.stdio.stderr).await
})
.await?;

/*
NOTE: If an exit code was not given by the child process,
Expand All @@ -168,30 +180,104 @@ async fn process_spawn(
.build_readonly()
}

async fn spawn_command(
#[allow(clippy::await_holding_refcell_ref)]
fn process_create(
lua: &Lua,
(program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions),
) -> LuaResult<LuaTable> {
// We do not want the user to provide stdio options for process.create,
// so we reset the options, regardless of what the user provides us
let mut spawn_options = options.clone();
spawn_options.stdio = ProcessSpawnOptionsStdio::default();

let (code_tx, code_rx) = tokio::sync::broadcast::channel(4);
let code_rx_rc = Rc::new(RefCell::new(code_rx));

let child = spawn_command(program, args, spawn_options)?;

let child_arc = Arc::new(RwLock::new(child));

let child_arc_clone = Arc::clone(&child_arc);
let mut child_lock = tokio::task::block_in_place(|| child_arc_clone.blocking_write());

let stdin = child_lock.stdin.take().unwrap();
let stdout = child_lock.stdout.take().unwrap();
let stderr = child_lock.stderr.take().unwrap();

let child_arc_inner = Arc::clone(&child_arc);

// Spawn a background task to wait for the child to exit and send the exit code
let status_handle = tokio::spawn(async move {
let res = child_arc_inner.write().await.wait().await;

if let Ok(output) = res {
let code = output.code().unwrap_or_default();

code_tx
.send(code)
.expect("ExitCode receiver was unexpectedly dropped");
}
});

TableBuilder::new(lua)?
.with_value("stdout", ChildProcessReader(stdout))?
.with_value("stderr", ChildProcessReader(stderr))?
.with_value("stdin", ChildProcessWriter(stdin))?
.with_async_function("kill", move |_, ()| {
// First, stop the status task so the RwLock is dropped
status_handle.abort();
let child_arc_clone = Arc::clone(&child_arc);

// Then get another RwLock to write to the child process and kill it
async move { Ok(child_arc_clone.write().await.kill().await?) }
})?
.with_async_function("status", move |lua, ()| {
let code_rx_rc_clone = Rc::clone(&code_rx_rc);
async move {
// Exit code of 9 corresponds to SIGKILL, which should be the only case where
// the receiver gets suddenly dropped
let code = code_rx_rc_clone.borrow_mut().recv().await.unwrap_or(9);

TableBuilder::new(lua)?
.with_value("code", code)?
.with_value("ok", code == 0)?
.build_readonly()
}
})?
.build_readonly()
}

async fn spawn_command_with_stdin(
program: String,
args: Option<Vec<String>>,
mut options: ProcessSpawnOptions,
) -> LuaResult<WaitForChildResult> {
let stdout = options.stdio.stdout;
let stderr = options.stdio.stderr;
) -> LuaResult<Child> {
let stdin = options.stdio.stdin.take();

let mut child = options
.into_command(program, args)
.stdin(if stdin.is_some() {
Stdio::piped()
} else {
Stdio::null()
})
.stdout(stdout.as_stdio())
.stderr(stderr.as_stdio())
.spawn()?;
let mut child = spawn_command(program, args, options)?;

if let Some(stdin) = stdin {
let mut child_stdin = child.stdin.take().unwrap();
child_stdin.write_all(&stdin).await.into_lua_err()?;
}

wait_for_child(child, stdout, stderr).await
Ok(child)
}

fn spawn_command(
program: String,
args: Option<Vec<String>>,
options: ProcessSpawnOptions,
) -> LuaResult<Child> {
let stdout = options.stdio.stdout;
let stderr = options.stdio.stderr;

let child = options
.into_command(program, args)
.stdin(Stdio::piped())
.stdout(stdout.as_stdio())
.stderr(stderr.as_stdio())
.spawn()?;

Ok(child)
}
58 changes: 58 additions & 0 deletions crates/lune-std-process/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use bstr::BString;
use bytes::BytesMut;
use mlua::prelude::*;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

const CHUNK_SIZE: usize = 8;

#[derive(Debug, Clone)]
pub struct ChildProcessReader<R: AsyncRead>(pub R);
#[derive(Debug, Clone)]
pub struct ChildProcessWriter<W: AsyncWrite>(pub W);

impl<R: AsyncRead + Unpin> ChildProcessReader<R> {
pub async fn read(&mut self, chunk_size: Option<usize>) -> LuaResult<Vec<u8>> {
let mut buf = BytesMut::with_capacity(chunk_size.unwrap_or(CHUNK_SIZE));
self.0.read_buf(&mut buf).await?;

Ok(buf.to_vec())
}

pub async fn read_to_end(&mut self) -> LuaResult<Vec<u8>> {
let mut buf = vec![];
self.0.read_to_end(&mut buf).await?;

Ok(buf)
}
}

impl<R: AsyncRead + Unpin + 'static> LuaUserData for ChildProcessReader<R> {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method_mut("read", |lua, this, chunk_size: Option<usize>| async move {
let buf = this.read(chunk_size).await?;

if buf.is_empty() {
return Ok(LuaValue::Nil);
}

Ok(LuaValue::String(lua.create_string(buf)?))
});

methods.add_async_method_mut("readToEnd", |lua, this, ()| async {
Ok(lua.create_string(this.read_to_end().await?))
});
}
}

impl<W: AsyncWrite + Unpin> ChildProcessWriter<W> {
pub async fn write(&mut self, data: BString) -> LuaResult<()> {
self.0.write_all(data.as_ref()).await?;
Ok(())
}
}

impl<W: AsyncWrite + Unpin + 'static> LuaUserData for ChildProcessWriter<W> {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method_mut("write", |_, this, data| async { this.write(data).await });
}
}
17 changes: 10 additions & 7 deletions crates/lune/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,16 @@ create_tests! {
process_cwd: "process/cwd",
process_env: "process/env",
process_exit: "process/exit",
process_spawn_async: "process/spawn/async",
process_spawn_basic: "process/spawn/basic",
process_spawn_cwd: "process/spawn/cwd",
process_spawn_no_panic: "process/spawn/no_panic",
process_spawn_shell: "process/spawn/shell",
process_spawn_stdin: "process/spawn/stdin",
process_spawn_stdio: "process/spawn/stdio",
process_exec_async: "process/exec/async",
process_exec_basic: "process/exec/basic",
process_exec_cwd: "process/exec/cwd",
process_exec_no_panic: "process/exec/no_panic",
process_exec_shell: "process/exec/shell",
process_exec_stdin: "process/exec/stdin",
process_exec_stdio: "process/exec/stdio",
process_spawn_non_blocking: "process/create/non_blocking",
process_spawn_status: "process/create/status",
process_spawn_stream: "process/create/stream",
}

#[cfg(feature = "std-regex")]
Expand Down
4 changes: 2 additions & 2 deletions scripts/generate_compression_test_files.luau
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ local BIN_ZLIB = if process.os == "macos" then "/opt/homebrew/bin/pigz" else "pi

local function checkInstalled(program: string, args: { string }?)
print("Checking if", program, "is installed")
local result = process.spawn(program, args)
local result = process.exec(program, args)
if not result.ok then
stdio.ewrite(string.format("Program '%s' is not installed\n", program))
process.exit(1)
Expand All @@ -123,7 +123,7 @@ checkInstalled(BIN_ZLIB, { "--version" })
-- Run them to generate files

local function run(program: string, args: { string }): string
local result = process.spawn(program, args)
local result = process.exec(program, args)
if not result.ok then
stdio.ewrite(string.format("Command '%s' failed\n", program))
if #result.stdout > 0 then
Expand Down
2 changes: 1 addition & 1 deletion tests/datetime/formatLocalTime.luau
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ if not runLocaleTests then
return
end

local dateCmd = process.spawn("bash", { "-c", "date +\"%A, %d %B %Y\" --date='@1693068988'" }, {
local dateCmd = process.exec("bash", { "-c", "date +\"%A, %d %B %Y\" --date='@1693068988'" }, {
env = {
LC_ALL = "fr_FR.UTF-8 ",
},
Expand Down
21 changes: 21 additions & 0 deletions tests/process/create/kill.luau
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
local process = require("@lune/process")

-- Killing a child process should work as expected

local message = "Hello, world!"
local child = process.create("cat")

child.stdin:write(message)
child.kill()

assert(child.status().code == 9, "Child process should have an exit code of 9 (SIGKILL)")

assert(
child.stdout:readToEnd() == message,
"Reading from stdout of child process should work even after kill"
)

local stdinWriteOk = pcall(function()
child.stdin:write(message)
end)
assert(not stdinWriteOk, "Writing to stdin of child process should not work after kill")
13 changes: 13 additions & 0 deletions tests/process/create/non_blocking.luau
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
local process = require("@lune/process")

-- Spawning a child process should not block the thread

local childThread = coroutine.create(process.create)

local ok, err = coroutine.resume(childThread, "echo", { "hello, world" })
assert(ok, err)

assert(
coroutine.status(childThread) == "dead",
"Child process should not block the thread it is running on"
)
15 changes: 15 additions & 0 deletions tests/process/create/status.luau
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
local process = require("@lune/process")

-- The exit code of an child process should be correct

local randomExitCode = math.random(0, 255)
local isOk = randomExitCode == 0
local child = process.create("exit", { tostring(randomExitCode) }, { shell = true })
local status = child.status()

assert(
status.code == randomExitCode,
`Child process exited with wrong exit code, expected {randomExitCode}`
)

assert(status.ok == isOk, `Child status should be {if status.ok then "ok" else "not ok"}`)
18 changes: 18 additions & 0 deletions tests/process/create/stream.luau
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
local process = require("@lune/process")

-- Should be able to write and read from child process streams

local msg = "hello, world"

local catChild = process.create("cat")
catChild.stdin:write(msg)
assert(
msg == catChild.stdout:read(#msg),
"Failed to write to stdin or read from stdout of child process"
)

local echoChild = if process.os == "windows"
then process.create("/c", { "echo", msg, "1>&2" }, { shell = "cmd" })
else process.create("echo", { msg, ">>/dev/stderr" }, { shell = true })

assert(msg == echoChild.stderr:read(#msg), "Failed to read from stderr of child process")
Loading

0 comments on commit 309c461

Please sign in to comment.