Skip to content

Commit

Permalink
Merge pull request WasmEdge#98 from second-state/refactor/sdk_preview
Browse files Browse the repository at this point in the history
Refactor/sdk preview
  • Loading branch information
L-jasmine authored Dec 11, 2023
2 parents 6be2ef9 + 17221d0 commit 04d2e6e
Show file tree
Hide file tree
Showing 67 changed files with 4,183 additions and 12,381 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ jobs:
export WASMEDGE_BUILD_DIR="$(pwd)/WasmEdge/build"
export WASMEDGE_PLUGIN_PATH="$(pwd)/WasmEdge/build/plugins/wasmedge_process"
export LD_LIBRARY_PATH="$(pwd)/WasmEdge/build/lib/api"
cargo test --workspace --locked --features aot,async,wasmedge_process,ffi -- --nocapture --test-threads=1
cargo test --workspace --locked --features aot,async,wasmedge_process,ffi -- --nocapture --test-threads=1 --skip test_vmbuilder
build_macos:
name: MacOS
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async-wasi = { workspace = true, optional = true }
[features]
aot = ["wasmedge-sys/aot"]
async = ["wasmedge-sys/async", "dep:async-wasi"]
default = []
default = ["async"]
ffi = ["wasmedge-sys/ffi"]
standalone = ["wasmedge-sys/standalone"]
static = ["wasmedge-sys/static"]
Expand Down
14 changes: 7 additions & 7 deletions crates/async-wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ version = "0.1.0"
[dependencies]
bitflags = "2.0.2"
cfg-if = "1.0.0"
futures = {version = "0.3"}
futures = { version = "0.3" }
getrandom = "0.2"
libc = "0.2"
path-absolutize = "3.0.13"
serde = {version = "1", features = ["derive"], optional = true}
socket2 = {version = "^0.4.9", features = ["all"]}
tokio = {version = "1", features = ["full"], optional = true}
serde = { version = "1", features = ["derive"] }
socket2 = { version = "^0.4.9", features = ["all"] }
tokio = { version = "1", features = ["full"], optional = true }
parking_lot.workspace = true

[dev-dependencies]
serde_json = {version = "1"}
serde_json = { version = "1" }

[features]
async_tokio = ["tokio"]
default = ["async_tokio", "serialize"]
serialize = ["serde"]
default = ["async_tokio"]
serialize = []
95 changes: 81 additions & 14 deletions crates/async-wasi/src/snapshots/common/net/async_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use socket2::{SockAddr, Socket};
use std::{
ops::DerefMut,
os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
sync::atomic::AtomicBool,
};
use tokio::io::{
unix::{AsyncFd, AsyncFdReadyGuard, TryIoError},
Expand Down Expand Up @@ -124,10 +125,59 @@ impl AsyncWasiSocketInner {
}
}

#[derive(Debug)]
pub(crate) struct SocketWritable(pub(crate) AtomicBool);
impl SocketWritable {
pub(crate) async fn writable(&self) {
let b = self.0.swap(false, std::sync::atomic::Ordering::Acquire);
SocketWritableFuture(b).await;
}

pub(crate) fn set_writable(&self) {
self.0.store(true, std::sync::atomic::Ordering::Release)
}
}
impl Default for SocketWritable {
fn default() -> Self {
Self(AtomicBool::new(true))
}
}

#[derive(Debug, Clone, Copy)]
pub(crate) struct SocketWritableFuture(bool);

impl Future for SocketWritableFuture {
type Output = ();

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if self.0 {
std::task::Poll::Ready(())
} else {
std::task::Poll::Pending
}
}
}

#[derive(Debug)]
pub struct AsyncWasiSocket {
pub(crate) inner: AsyncWasiSocketInner,
pub state: WasiSocketState,
pub state: Box<WasiSocketState>,
pub(crate) writable: SocketWritable,
}

impl AsyncWasiSocket {
pub(crate) async fn readable(&self) -> std::io::Result<()> {
self.inner.readable().await.map(|x| ())
}

pub(crate) async fn writable(&self) -> std::io::Result<()> {
self.writable.writable().await;
self.inner.writable().await?;
Ok(())
}
}

#[inline]
Expand Down Expand Up @@ -171,7 +221,8 @@ impl AsyncWasiSocket {
socket.set_nonblocking(true)?;
Ok(Self {
inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(socket)?),
state,
state: Box::new(state),
writable: Default::default(),
})
}

Expand All @@ -180,7 +231,8 @@ impl AsyncWasiSocket {
socket.set_nonblocking(true)?;
Ok(Self {
inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(socket)?),
state,
state: Box::new(state),
writable: Default::default(),
})
}
}
Expand Down Expand Up @@ -226,7 +278,8 @@ impl AsyncWasiSocket {
}
Ok(AsyncWasiSocket {
inner: AsyncWasiSocketInner::PreOpen(inner),
state,
state: Box::new(state),
writable: Default::default(),
})
}

Expand Down Expand Up @@ -280,7 +333,8 @@ impl AsyncWasiSocket {

Ok(AsyncWasiSocket {
inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(cs)?),
state: new_state,
state: Box::new(new_state),
writable: Default::default(),
})
} else {
loop {
Expand All @@ -293,7 +347,8 @@ impl AsyncWasiSocket {

Ok(AsyncWasiSocket {
inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(cs)?),
state: new_state.clone(),
state: Box::new(new_state.clone()),
writable: Default::default(),
})
}) {
return r;
Expand Down Expand Up @@ -365,15 +420,27 @@ impl AsyncWasiSocket {

match (self.state.nonblocking, self.state.so_recv_timeout) {
(true, None) => {
// Safety: reference Socket::read_vectored
let bufs = unsafe {
&mut *(bufs as *mut [io::IoSliceMut<'_>] as *mut [MaybeUninitSlice<'_>])
let (n, f) = {
let r = {
// Safety: reference Socket::read_vectored
let bufs = unsafe {
&mut *(bufs as *mut [io::IoSliceMut<'_>]
as *mut [MaybeUninitSlice<'_>])
};
self.inner.get_ref()?.recv_vectored_with_flags(bufs, flags)
};
if let Err(e) = &r {
if e.kind() == std::io::ErrorKind::WouldBlock {
tokio::select! {
s=self.inner.readable()=>{
s?.clear_ready();
}
else=>{}
}
}
};
r?
};

let (n, f) = self
.inner
.get_ref()?
.recv_vectored_with_flags(bufs, flags)?;
Ok((n, f.is_truncated()))
}
(false, None) => loop {
Expand Down
Loading

0 comments on commit 04d2e6e

Please sign in to comment.