You've already forked comprehensive-rust
mirror of
https://github.com/google/comprehensive-rust.git
synced 2025-06-21 08:19:32 +02:00
Add an "async" session (#496)
* beginning of an Async section * address review comments * Add futures page (#497) NOTE: `mdbook test` does not allow code samples to reference other crates, so they must be marked as `compile_fail`; see #175. * Add Runtimes & Tasks (#522) These concepts are closely related, and there's not much else to know about runtimes other than "they exist". This removes the bit about futures being "inert" because it doesn't really lead anywhere. * Async chapter (#524) * Add async channels chapter * Async control flow * Async pitfalls * Separate in multiple chapters + add daemon section * Merge reentering threads in blocking-executor * async_trait * Async fixes (#546) * Async: some ideas for simplifying the content (#550) * Simplify the async-await slide * Shorten futures and move it up * Add a page on Tokio * Modifications to the async section (#556) * Modifications to the async section * Remove the "Daemon" slide, as it largely duplicates the "Tasks" slide. The introduction to the "Control Flow" section mentions tasks as a kind of control flow. * Reorganize the structure in SUMMARY.md to correspond to the directory structure. * Simplify the "Pin" and "Blocking the Executor" slides with steps in the speaker notes to demonstrate / fix the issues. * Rename "join_all" to "Join". * Simplify some code samples to shorten them, and to print output rather than asserting. * Clarify speaker notes and include more "Try.." suggestions. * Be consistent about where `async` blocks are introduced (in the "Tasks" slide). * Explain `join` and `select` in prose. * Fix formatting of section-header slides. * Add a note on async trait (#558) --------- Co-authored-by: sakex <alexandre@senges.ch> Co-authored-by: rbehjati <razieh@google.com>
This commit is contained in:
committed by
GitHub
parent
d6e09c8130
commit
0d30da7f23
64
src/async/pitfalls/async-traits.md
Normal file
64
src/async/pitfalls/async-traits.md
Normal file
@ -0,0 +1,64 @@
|
||||
# Async Traits
|
||||
|
||||
Async methods in traits are not yet supported in the stable channel ([An experimental feature exists in nightly and should be stabilized in the mid term.](https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html))
|
||||
|
||||
The crate [async_trait](https://docs.rs/async-trait/latest/async_trait/) provides a workaround through a macro:
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use async_trait::async_trait;
|
||||
use std::time::Instant;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
#[async_trait]
|
||||
trait Sleeper {
|
||||
async fn sleep(&self);
|
||||
}
|
||||
|
||||
struct FixedSleeper {
|
||||
sleep_ms: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Sleeper for FixedSleeper {
|
||||
async fn sleep(&self) {
|
||||
sleep(Duration::from_millis(self.sleep_ms)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_all_sleepers_multiple_times(sleepers: Vec<Box<dyn Sleeper>>, n_times: usize) {
|
||||
for _ in 0..n_times {
|
||||
println!("running all sleepers..");
|
||||
for sleeper in &sleepers {
|
||||
let start = Instant::now();
|
||||
sleeper.sleep().await;
|
||||
println!("slept for {}ms", start.elapsed().as_millis());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let sleepers: Vec<Box<dyn Sleeper>> = vec![
|
||||
Box::new(FixedSleeper { sleep_ms: 50 }),
|
||||
Box::new(FixedSleeper { sleep_ms: 100 }),
|
||||
];
|
||||
run_all_sleepers_multiple_times(sleepers, 5).await;
|
||||
}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
||||
* The difficulty with `async trait` is in that the resulting `Future` does not
|
||||
have a size known at compile time, because the size of the `Future` depends
|
||||
on the implementation.
|
||||
|
||||
* `async_trait` is easy to use, but note that it's using heap allocations to
|
||||
achieve this, and solve the unknow size problem above. This heap allocation
|
||||
has performance overhead.
|
||||
|
||||
* Try creating a new sleeper struct that will sleep for a random amount of time
|
||||
and adding it to the Vec.
|
||||
|
||||
* Try making the `sleep` call take `&mut self`.
|
||||
|
||||
</details>
|
50
src/async/pitfalls/blocking-executor.md
Normal file
50
src/async/pitfalls/blocking-executor.md
Normal file
@ -0,0 +1,50 @@
|
||||
# Blocking the executor
|
||||
|
||||
Most async runtimes only allow IO tasks to run concurrently.
|
||||
This means that CPU blocking tasks will block the executor and prevent other tasks from being executed.
|
||||
An easy workaround is to use async equivalent methods where possible.
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use futures::future::join_all;
|
||||
use std::time::Instant;
|
||||
|
||||
async fn sleep_ms(start: &Instant, id: u64, duration_ms: u64) {
|
||||
std::thread::sleep(std::time::Duration::from_millis(duration_ms));
|
||||
println!(
|
||||
"future {id} slept for {duration_ms}ms, finished after {}ms",
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() {
|
||||
let start = Instant::now();
|
||||
let sleep_futures = (1..=10).map(|t| sleep_ms(&start, t, t * 10));
|
||||
join_all(sleep_futures).await;
|
||||
}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
||||
* Run the code and see that the sleeps happen consecutively rather than
|
||||
concurrently.
|
||||
|
||||
* The `"current_thread"` flavor puts all tasks on a single thread. This makes the
|
||||
effect more obvious, but the bug is still present in the multi-threaded
|
||||
flavor.
|
||||
|
||||
* Switch the `std::thread::sleep` to `tokio::time::sleep` and await its result.
|
||||
|
||||
* Another fix would be to `tokio::task::spawn_blocking` which spawns an actual
|
||||
thread and transforms its handle into a future without blocking the executor.
|
||||
|
||||
* You should not think of tasks as OS threads. They do not map 1 to 1 and most
|
||||
executors will allow many tasks to run on a single OS thread. This is
|
||||
particularly problematic when interacting with other libraries via FFI, where
|
||||
that library might depend on thread-local storage or map to specific OS
|
||||
threads (e.g., CUDA). Prefer `tokio::task::spawn_blocking` in such situations.
|
||||
|
||||
* Use sync mutexes with care. Holding a mutex over an `.await` may cause another
|
||||
task to block, and that task may be running on the same thread.
|
||||
|
||||
</details>
|
110
src/async/pitfalls/pin.md
Normal file
110
src/async/pitfalls/pin.md
Normal file
@ -0,0 +1,110 @@
|
||||
# Pin
|
||||
|
||||
When you await a future, all local variables (that would ordinarily be stored on
|
||||
a stack frame) are instead stored in the Future for the current async block. If your
|
||||
future has pointers to data on the stack, those pointers might get invalidated.
|
||||
This is unsafe.
|
||||
|
||||
Therefore, you must guarantee that the addresses your future points to don't
|
||||
change. That is why we need to `pin` futures. Using the same future repeatedly
|
||||
in a `select!` often leads to issues with pinned values.
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::task::spawn;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
// A work item. In this case, just sleep for the given time and respond
|
||||
// with a message on the `respond_on` channel.
|
||||
#[derive(Debug)]
|
||||
struct Work {
|
||||
input: u32,
|
||||
respond_on: oneshot::Sender<u32>,
|
||||
}
|
||||
|
||||
// A worker which listens for work on a queue and performs it.
|
||||
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
|
||||
let mut iterations = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(work) = work_queue.recv() => {
|
||||
sleep(Duration::from_millis(10)).await; // Pretend to work.
|
||||
work.respond_on
|
||||
.send(work.input * 1000)
|
||||
.expect("failed to send response");
|
||||
iterations += 1;
|
||||
}
|
||||
// TODO: report number of iterations every 100ms
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A requester which requests work and waits for it to complete.
|
||||
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
work_queue
|
||||
.send(Work {
|
||||
input,
|
||||
respond_on: tx,
|
||||
})
|
||||
.await
|
||||
.expect("failed to send on work queue");
|
||||
rx.await.expect("failed waiting for response")
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
spawn(worker(rx));
|
||||
for i in 0..100 {
|
||||
let resp = do_work(&tx, i).await;
|
||||
println!("work result for iteration {i}: {resp}");
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
||||
* You may recognize this as an example of the actor pattern. Actors
|
||||
typically call `select!` in a loop.
|
||||
|
||||
* This serves as a summation of a few of the previous lessons, so take your time
|
||||
with it.
|
||||
|
||||
* Naively add a `_ = sleep(Duration::from_millis(100)) => { println!(..) }`
|
||||
to the `select!`. This will never execute. Why?
|
||||
|
||||
* Instead, add a `timeout_fut` containing that future outside of the `loop`:
|
||||
|
||||
```rust,compile_fail
|
||||
let mut timeout_fut = sleep(Duration::from_millis(100));
|
||||
loop {
|
||||
select! {
|
||||
..,
|
||||
_ = timeout_fut => { println!(..); },
|
||||
}
|
||||
}
|
||||
```
|
||||
* This still doesn't work. Follow the compiler errors, adding `&mut` to the
|
||||
`timeout_fut` in the `select!` to work around the move, then using
|
||||
`Box::pin`:
|
||||
|
||||
```rust,compile_fail
|
||||
let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
|
||||
loop {
|
||||
select! {
|
||||
..,
|
||||
_ = &mut timeout_fut => { println!(..); },
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
* This compiles, but once the timeout expires it is `Poll::Ready` on every
|
||||
iteration (a fused future would help with this). Update to reset
|
||||
`timeout_fut` every time it expires.
|
||||
|
||||
* Box allocates on the heap. In some cases, `tokio::pin!` is also an option, but
|
||||
that is difficult to use for a future that is reassigned.
|
||||
* Another alternative is to not use `pin` at all but spawn another task that will send to a `oneshot` channel every 100ms.
|
||||
|
||||
</details>
|
Reference in New Issue
Block a user