-
Notifications
You must be signed in to change notification settings - Fork 335
/
main.rs
301 lines (267 loc) · 11.6 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
//! Demonstrates how to implement a (very) basic asynchronous rust executor and
//! timer. The goal of this file is to provide some context into how the various
//! building blocks fit together.
use futures::future::BoxFuture;
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
// A utility that allows us to implement a `std::task::Waker` without having to
// use `unsafe` code.
use futures::task::{self, ArcWake};
// Used as a channel to queue scheduled tasks.
use crossbeam::channel;
// Main entry point. A mini-tokio instance is created and a few tasks are
// spawned. Our mini-tokio implementation only supports spawning tasks and
// setting delays.
fn main() {
// Create the mini-tokio instance.
let mini_tokio = MiniTokio::new();
// Spawn the root task. All other tasks are spawned from the context of this
// root task. No work happens until `mini_tokio.run()` is called.
mini_tokio.spawn(async {
// Spawn a task
spawn(async {
// Wait for a little bit of time so that "world" is printed after
// "hello"
delay(Duration::from_millis(100)).await;
println!("world");
});
// Spawn a second task
spawn(async {
println!("hello");
});
// We haven't implemented executor shutdown, so force the process to exit.
delay(Duration::from_millis(200)).await;
std::process::exit(0);
});
// Start the mini-tokio executor loop. Scheduled tasks are received and
// executed.
mini_tokio.run();
}
/// A very basic futures executor based on a channel. When tasks are woken, they
/// are scheduled by queuing them in the send half of the channel. The executor
/// waits on the receive half and executes received tasks.
///
/// When a task is executed, the send half of the channel is passed along via
/// the task's Waker.
struct MiniTokio {
// Receives scheduled tasks. When a task is scheduled, the associated future
// is ready to make progress. This usually happens when a resource the task
// uses becomes ready to perform an operation. For example, a socket
// received data and a `read` call will succeed.
scheduled: channel::Receiver<Arc<Task>>,
// Send half of the scheduled channel.
sender: channel::Sender<Arc<Task>>,
}
impl MiniTokio {
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
/// Run the executor.
///
/// This starts the executor loop and runs it indefinitely. No shutdown
/// mechanism has been implemented.
///
/// Tasks are popped from the `scheduled` channel receiver. Receiving a task
/// on the channel signifies the task is ready to be executed. This happens
/// when the task is first created and when its waker has been used.
fn run(&self) {
// Set the CURRENT thread-local to point to the current executor.
//
// Tokio uses a thread-local variable to implement `tokio::spawn`. When
// entering the runtime, the executor stores necessary context with the
// thread-local to support spawning new tasks.
CURRENT.with(|cell| {
*cell.borrow_mut() = Some(self.sender.clone());
});
// The executor loop. Scheduled tasks are received. If the channel is
// empty, the thread blocks until a task is received.
while let Ok(task) = self.scheduled.recv() {
// Execute the task until it either completes or cannot make further
// progress and returns `Poll::Pending`.
task.poll();
}
}
}
// An equivalent to `tokio::spawn`. When entering the mini-tokio executor, the
// `CURRENT` thread-local is set to point to that executor's channel's Send
// half. Then, spawning requires creating the `Task` harness for the given
// `future` and pushing it into the scheduled queue.
pub fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
CURRENT.with(|cell| {
let borrow = cell.borrow();
let sender = borrow.as_ref().unwrap();
Task::spawn(future, sender);
});
}
// Asynchronous equivalent to `thread::sleep`. Awaiting on this function pauses
// for the given duration.
//
// mini-tokio implements delays by spawning a timer thread that sleeps for the
// requested duration and notifies the caller once the delay completes. A thread
// is spawned **per** call to `delay`. This is obviously a terrible
// implementation strategy and nobody should use this in production. Tokio does
// not use this strategy. However, it can be implemented with few lines of code,
// so here we are.
async fn delay(dur: Duration) {
// `delay` is a leaf future. Sometimes, this is referred to as a "resource".
// Other resources include sockets and channels. Resources may not be
// implemented in terms of `async/await` as they must integrate with some
// operating system detail. Because of this, we must manually implement the
// `Future`.
//
// However, it is nice to expose the API as an `async fn`. A useful idiom is
// to manually define a private future and then use it from a public `async
// fn` API.
struct Delay {
// When to complete the delay.
when: Instant,
// The waker to notify once the delay has completed. The waker must be
// accessible by both the timer thread and the future so it is wrapped
// with `Arc<Mutex<_>>`
waker: Option<Arc<Mutex<Waker>>>,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// First, if this is the first time the future is called, spawn the
// timer thread. If the timer thread is already running, ensure the
// stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// Check if the stored waker matches the current tasks waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}
// Once the waker is stored and the timer thread is started, it is
// time to check if the delay has completed. This is done by
// checking the current instant. If the duration has elapsed, then
// the future has completed and `Poll::Ready` is returned.
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// The duration has not elapsed, the future has not completed so
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signaled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}
}
// Create an instance of our `Delay` future.
let future = Delay {
when: Instant::now() + dur,
waker: None,
};
// Wait for the duration to complete.
future.await;
}
// Used to track the current mini-tokio instance so that the `spawn` function is
// able to schedule spawned tasks.
thread_local! {
static CURRENT: RefCell<Option<channel::Sender<Arc<Task>>>> =
RefCell::new(None);
}
// Task harness. Contains the future as well as the necessary data to schedule
// the future once it is woken.
struct Task {
// The future is wrapped with a `Mutex` to make the `Task` structure `Sync`.
// There will only ever be a single thread that attempts to use `future`.
// The Tokio runtime avoids the mutex by using `unsafe` code. The box is
// also avoided.
// BoxFuture<T> is a type alias for:
// Pin<Box<dyn Future<Output = T> + Send + 'static>>
future: Mutex<BoxFuture<'static, ()>>,
// When a task is notified, it is queued into this channel. The executor
// pops notified tasks and executes them.
executor: channel::Sender<Arc<Task>>,
}
impl Task {
// Spawns a new task with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
// Execute a scheduled task. This creates the necessary `task::Context`
// containing a waker for the task. This waker pushes the task onto the
// mini-tokio scheduled channel. The future is then polled with the waker.
fn poll(self: Arc<Self>) {
// Get a waker referencing the task.
let waker = task::waker(self.clone());
// Initialize the task context with the waker.
let mut cx = Context::from_waker(&waker);
// This will never block as only a single thread ever locks the future.
let mut future = self.future.try_lock().unwrap();
// Poll the future
let _ = future.as_mut().poll(&mut cx);
}
}
// The standard library provides low-level, unsafe APIs for defining wakers.
// Instead of writing unsafe code, we will use the helpers provided by the
// `futures` crate to define a waker that is able to schedule our `Task`
// structure.
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Schedule the task for execution. The executor receives from the
// channel and polls tasks.
let _ = arc_self.executor.send(arc_self.clone());
}
}