diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 72346bf8..9908ebb0 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -226,8 +226,24 @@ - [With Java](android/interoperability/java.md) - [Exercises](exercises/day-4/android.md) -# Async (temp until issue181 is closed) +# Day 4: Afternoon (Async) +---- + +- [Async Basics](async.md) + - [async/await](async/async-await.md) + - [Futures](async/futures.md) + - [Runtimes](async/runtimes.md) + - [Tokio](async/runtimes/tokio.md) + - [Tasks](async/tasks.md) + - [Async Channels](async/channels.md) +- [Control Flow](async/control-flow.md) + - [Join](async/control-flow/join.md) + - [Select](async/control-flow/select.md) +- [Pitfalls](async/pitfalls.md) + - [Blocking the Executor](async/pitfalls/blocking-executor.md) + - [Pin](async/pitfalls/pin.md) + - [Async Traits](async/pitfalls/async-traits.md) - [Exercises](exercises/day-4/elevator.md) # Final Words diff --git a/src/async.md b/src/async.md new file mode 100644 index 00000000..28d39cfd --- /dev/null +++ b/src/async.md @@ -0,0 +1,25 @@ +# Async Rust + +"Async" is a concurrency model where multiple tasks are executed concurrently by +executing each task until it would block, then switching to another task that is +ready to make progress. The model allows running a larger number of tasks on a +limited number of threads. This is because the per-task overhead is typically +very low and operating systems provide primitives for efficiently identifying +I/O that is able to proceed. + +Rust's asynchronous operation is based on "futures", which represent work that +may be completed in the future. Futures are "polled" until they signal that +they are complete. + +Futures are polled by an async runtime, and several different runtimes are +available. + +## Comparisons + + * Python has a similar model in its `asyncio`. However, its `Future` type is + callback-based, and not polled. Async Python programs require a "loop", + similar to a runtime in Rust. + + * JavaScript's `Promise` is similar, but again callback-based. The language + runtime implements the event loop, so many of the details of Promise + resolution are hidden. diff --git a/src/async/async-await.md b/src/async/async-await.md new file mode 100644 index 00000000..79060338 --- /dev/null +++ b/src/async/async-await.md @@ -0,0 +1,48 @@ +# `async`/`await` + +At a high level, async Rust code looks very much like "normal" sequential code: + +```rust,editable,compile_fail +use futures::executor::block_on; + +async fn count_to(count: i32) { + for i in 1..=count { + println!("Count is: {i}!"); + } +} + +async fn async_main(count: i32) { + count_to(count).await; +} + +fn main() { + block_on(async_main(10)); +} +``` + +
+ +Key points: + +* Note that this is a simplified example to show the syntax. There is no long + running operation or any real concurrency in it! + +* What is the return type of an async call? + * Use `let future: () = async_main(10);` in `main` to see the type. + +* The "async" keyword is syntactic sugar. The compiler replaces the return type + with a future. + +* You cannot make `main` async, without additional instructions to the compiler + on how to use the returned future. + +* You need an executor to run async code. `block_on` blocks the current thread + until the provided future has run to completion. + +* `.await` asynchronously waits for the completion of another operation. Unlike + `block_on`, `.await` doesn't block the current thread. + +* `.await` can only be used inside an `async` function (or block; these are + introduced later). + +
diff --git a/src/async/channels.md b/src/async/channels.md new file mode 100644 index 00000000..9e0a5bf8 --- /dev/null +++ b/src/async/channels.md @@ -0,0 +1,49 @@ +# Async Channels + +Several crates have support for `async`/`await`. For instance `tokio` channels: + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; + +async fn ping_handler(mut input: Receiver<()>) { + let mut count: usize = 0; + + while let Some(_) = input.recv().await { + count += 1; + println!("Received {count} pings so far."); + } + + println!("ping_handler complete"); +} + +#[tokio::main] +async fn main() { + let (sender, receiver) = mpsc::channel(32); + let ping_handler_task = tokio::spawn(ping_handler(receiver)); + for i in 0..10 { + sender.send(()).await.expect("Failed to send ping."); + println!("Sent {} pings so far.", i + 1); + } + + std::mem::drop(sender); + ping_handler_task.await.expect("Something went wrong in ping handler task."); +} +``` + +
+ +* Change the channel size to `3` and see how it affects the execution. + +* Overall, the interface is similar to the `sync` channels as seen in the + [morning class](concurrency/channels.md). + +* Try removing the `std::mem::drop` call. What happens? Why? + +* The [Flume](https://docs.rs/flume/latest/flume/) crate has channels that + implement both `sync` and `async` `send` and `recv`. This can be convenient + for complex applications with both IO and heavy CPU processing tasks. + +* What makes working with `async` channels preferable is the ability to combine + them with other `future`s to combine them and create complex control flow. + +
diff --git a/src/async/control-flow.md b/src/async/control-flow.md new file mode 100644 index 00000000..69b1d03e --- /dev/null +++ b/src/async/control-flow.md @@ -0,0 +1,7 @@ +# Futures Control Flow + +Futures can be combined together to produce concurrent compute flow graphs. We +have already seen tasks, that function as independent threads of execution. + +- [Join](control-flow/join.md) +- [Select](control-flow/select.md) diff --git a/src/async/control-flow/join.md b/src/async/control-flow/join.md new file mode 100644 index 00000000..71db9a0e --- /dev/null +++ b/src/async/control-flow/join.md @@ -0,0 +1,51 @@ +# Join + +A join operation waits until all of a set of futures are ready, and +returns a collection of their results. This is similar to `Promise.all` in +JavaScript or `asyncio.gather` in Python. + +```rust,editable,compile_fail +use anyhow::Result; +use futures::future; +use reqwest; +use std::collections::HashMap; + +async fn size_of_page(url: &str) -> Result { + let resp = reqwest::get(url).await?; + Ok(resp.text().await?.len()) +} + +#[tokio::main] +async fn main() { + let urls: [&str; 4] = [ + "https://google.com", + "https://httpbin.org/ip", + "https://play.rust-lang.org/", + "BAD_URL", + ]; + let futures_iter = urls.into_iter().map(size_of_page); + let results = future::join_all(futures_iter).await; + let page_sizes_dict: HashMap<&str, Result> = + urls.into_iter().zip(results.into_iter()).collect(); + println!("{:?}", page_sizes_dict); +} +``` + +
+ +Copy this example into your prepared `src/main.rs` and run it from there. + +* For multiple futures of disjoint types, you can use `std::future::join!` but + you must know how many futures you will have at compile time. This is + currently in the `futures` crate, soon to be stabilised in `std::future`. + +* The risk of `join` is that one of the futures may never resolve, this would + cause your program to stall. + +* You can also combine `join_all` with `join!` for instance to join all requests + to an http service as well as a database query. + +* Try adding a timeout to the future, using `futures::join!`. + +
+ diff --git a/src/async/control-flow/select.md b/src/async/control-flow/select.md new file mode 100644 index 00000000..7e416ea1 --- /dev/null +++ b/src/async/control-flow/select.md @@ -0,0 +1,75 @@ +# Select + +A select operation waits until any of a set of futures is ready, and responds to +that future's result. In JavaScript, this is similar to `Promise.race`. In +Python, it compares to `asyncio.wait(task_set, +return_when=asyncio.FIRST_COMPLETED)`. + +This is usually a macro, similar to match, with each arm of the form `pattern = +future => statement`. When the future is ready, the statement is executed with the +variable bound to the future's result. + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::{sleep, Duration}; + +#[derive(Debug, PartialEq)] +enum Animal { + Cat { name: String }, + Dog { name: String }, +} + +async fn first_animal_to_finish_race( + mut cat_rcv: Receiver, + mut dog_rcv: Receiver, +) -> Option { + tokio::select! { + cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }), + dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? }) + } +} + +#[tokio::main] +async fn main() { + let (cat_sender, cat_receiver) = mpsc::channel(32); + let (dog_sender, dog_receiver) = mpsc::channel(32); + tokio::spawn(async move { + sleep(Duration::from_millis(500)).await; + cat_sender + .send(String::from("Felix")) + .await + .expect("Failed to send cat."); + }); + tokio::spawn(async move { + sleep(Duration::from_millis(50)).await; + dog_sender + .send(String::from("Rex")) + .await + .expect("Failed to send dog."); + }); + + let winner = first_animal_to_finish_race(cat_receiver, dog_receiver) + .await + .expect("Failed to receive winner"); + + println!("Winner is {winner:?}"); +} +``` + +
+ +* In this example, we have a race between a cat and a dog. + `first_animal_to_finish_race` listens to both channels and will pick whichever + arrives first. Since the dog takes 50ms, it wins against the cat that + take 500ms seconds. + +* You can use `oneshot` channels in this example as the channels are supposed to + receive only one `send`. + +* Try adding a deadline to the race, demonstrating selecting different sorts of + futures. + +* Note that `select!` consumes the futures it is given, and is easiest to use + when every execution of `select!` creates new futures. + +
diff --git a/src/async/futures.md b/src/async/futures.md new file mode 100644 index 00000000..d9d4347c --- /dev/null +++ b/src/async/futures.md @@ -0,0 +1,45 @@ +# Futures + +[`Future`](https://doc.rust-lang.org/std/future/trait.Future.html) +is a trait, implemented by objects that represent an operation that may not be +complete yet. A future can be polled, and `poll` returns a +[`Poll`](https://doc.rust-lang.org/std/task/enum.Poll.html). + +```rust +use std::pin::Pin; +use std::task::Context; + +pub trait Future { + type Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; +} + +pub enum Poll { + Ready(T), + Pending, +} +``` + +An async function returns an `impl Future`. It's also possible (but uncommon) to +implement `Future` for your own types. For example, the `JoinHandle` returned +from `tokio::spawn` implements `Future` to allow joining to it. + +The `.await` keyword, applied to a Future, causes the current async function to +pause until that Future is ready, and then evaluates to its output. + +
+ +* The `Future` and `Poll` types are implemented exactly as shown; click the + links to show the implementations in the docs. + +* We will not get to `Pin` and `Context`, as we will focus on writing async + code, rather than building new async primitives. Briefly: + + * `Context` allows a Future to schedule itself to be polled again when an + event occurs. + + * `Pin` ensures that the Future isn't moved in memory, so that pointers into + that future remain valid. This is required to allow references to remain + valid after an `.await`. + +
diff --git a/src/async/pitfalls.md b/src/async/pitfalls.md new file mode 100644 index 00000000..a8ae89e2 --- /dev/null +++ b/src/async/pitfalls.md @@ -0,0 +1,7 @@ +# Pitfalls of async/await + +Async / await provides convenient and efficient abstraction for concurrent asynchronous programming. However, the async/await model in Rust also comes with its share of pitfalls and footguns. We illustrate some of them in this chapter: + +- [Blocking the Executor](pitfalls/blocking-executor.md) +- [Pin](pitfalls/pin.md) +- [Async Traits](pitfall/async-traits.md) diff --git a/src/async/pitfalls/async-traits.md b/src/async/pitfalls/async-traits.md new file mode 100644 index 00000000..93e9ef8a --- /dev/null +++ b/src/async/pitfalls/async-traits.md @@ -0,0 +1,64 @@ +# Async Traits + +Async methods in traits are not yet supported in the stable channel ([An experimental feature exists in nightly and should be stabilized in the mid term.](https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html)) + +The crate [async_trait](https://docs.rs/async-trait/latest/async_trait/) provides a workaround through a macro: + +```rust,editable,compile_fail +use async_trait::async_trait; +use std::time::Instant; +use tokio::time::{sleep, Duration}; + +#[async_trait] +trait Sleeper { + async fn sleep(&self); +} + +struct FixedSleeper { + sleep_ms: u64, +} + +#[async_trait] +impl Sleeper for FixedSleeper { + async fn sleep(&self) { + sleep(Duration::from_millis(self.sleep_ms)).await; + } +} + +async fn run_all_sleepers_multiple_times(sleepers: Vec>, n_times: usize) { + for _ in 0..n_times { + println!("running all sleepers.."); + for sleeper in &sleepers { + let start = Instant::now(); + sleeper.sleep().await; + println!("slept for {}ms", start.elapsed().as_millis()); + } + } +} + +#[tokio::main] +async fn main() { + let sleepers: Vec> = vec![ + Box::new(FixedSleeper { sleep_ms: 50 }), + Box::new(FixedSleeper { sleep_ms: 100 }), + ]; + run_all_sleepers_multiple_times(sleepers, 5).await; +} +``` + +
+ +* The difficulty with `async trait` is in that the resulting `Future` does not + have a size known at compile time, because the size of the `Future` depends + on the implementation. + +* `async_trait` is easy to use, but note that it's using heap allocations to + achieve this, and solve the unknow size problem above. This heap allocation + has performance overhead. + +* Try creating a new sleeper struct that will sleep for a random amount of time + and adding it to the Vec. + +* Try making the `sleep` call take `&mut self`. + +
diff --git a/src/async/pitfalls/blocking-executor.md b/src/async/pitfalls/blocking-executor.md new file mode 100644 index 00000000..cfbfb467 --- /dev/null +++ b/src/async/pitfalls/blocking-executor.md @@ -0,0 +1,50 @@ +# Blocking the executor + +Most async runtimes only allow IO tasks to run concurrently. +This means that CPU blocking tasks will block the executor and prevent other tasks from being executed. +An easy workaround is to use async equivalent methods where possible. + +```rust,editable,compile_fail +use futures::future::join_all; +use std::time::Instant; + +async fn sleep_ms(start: &Instant, id: u64, duration_ms: u64) { + std::thread::sleep(std::time::Duration::from_millis(duration_ms)); + println!( + "future {id} slept for {duration_ms}ms, finished after {}ms", + start.elapsed().as_millis() + ); +} + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let start = Instant::now(); + let sleep_futures = (1..=10).map(|t| sleep_ms(&start, t, t * 10)); + join_all(sleep_futures).await; +} +``` + +
+ +* Run the code and see that the sleeps happen consecutively rather than + concurrently. + +* The `"current_thread"` flavor puts all tasks on a single thread. This makes the + effect more obvious, but the bug is still present in the multi-threaded + flavor. + +* Switch the `std::thread::sleep` to `tokio::time::sleep` and await its result. + +* Another fix would be to `tokio::task::spawn_blocking` which spawns an actual + thread and transforms its handle into a future without blocking the executor. + +* You should not think of tasks as OS threads. They do not map 1 to 1 and most + executors will allow many tasks to run on a single OS thread. This is + particularly problematic when interacting with other libraries via FFI, where + that library might depend on thread-local storage or map to specific OS + threads (e.g., CUDA). Prefer `tokio::task::spawn_blocking` in such situations. + +* Use sync mutexes with care. Holding a mutex over an `.await` may cause another + task to block, and that task may be running on the same thread. + +
diff --git a/src/async/pitfalls/pin.md b/src/async/pitfalls/pin.md new file mode 100644 index 00000000..455e19ee --- /dev/null +++ b/src/async/pitfalls/pin.md @@ -0,0 +1,110 @@ +# Pin + +When you await a future, all local variables (that would ordinarily be stored on +a stack frame) are instead stored in the Future for the current async block. If your +future has pointers to data on the stack, those pointers might get invalidated. +This is unsafe. + +Therefore, you must guarantee that the addresses your future points to don't +change. That is why we need to `pin` futures. Using the same future repeatedly +in a `select!` often leads to issues with pinned values. + +```rust,editable,compile_fail +use tokio::sync::{mpsc, oneshot}; +use tokio::task::spawn; +use tokio::time::{sleep, Duration}; + +// A work item. In this case, just sleep for the given time and respond +// with a message on the `respond_on` channel. +#[derive(Debug)] +struct Work { + input: u32, + respond_on: oneshot::Sender, +} + +// A worker which listens for work on a queue and performs it. +async fn worker(mut work_queue: mpsc::Receiver) { + let mut iterations = 0; + loop { + tokio::select! { + Some(work) = work_queue.recv() => { + sleep(Duration::from_millis(10)).await; // Pretend to work. + work.respond_on + .send(work.input * 1000) + .expect("failed to send response"); + iterations += 1; + } + // TODO: report number of iterations every 100ms + } + } +} + +// A requester which requests work and waits for it to complete. +async fn do_work(work_queue: &mpsc::Sender, input: u32) -> u32 { + let (tx, rx) = oneshot::channel(); + work_queue + .send(Work { + input, + respond_on: tx, + }) + .await + .expect("failed to send on work queue"); + rx.await.expect("failed waiting for response") +} + +#[tokio::main] +async fn main() { + let (tx, rx) = mpsc::channel(10); + spawn(worker(rx)); + for i in 0..100 { + let resp = do_work(&tx, i).await; + println!("work result for iteration {i}: {resp}"); + } +} +``` + +
+ +* You may recognize this as an example of the actor pattern. Actors + typically call `select!` in a loop. + +* This serves as a summation of a few of the previous lessons, so take your time + with it. + + * Naively add a `_ = sleep(Duration::from_millis(100)) => { println!(..) }` + to the `select!`. This will never execute. Why? + + * Instead, add a `timeout_fut` containing that future outside of the `loop`: + + ```rust,compile_fail + let mut timeout_fut = sleep(Duration::from_millis(100)); + loop { + select! { + .., + _ = timeout_fut => { println!(..); }, + } + } + ``` + * This still doesn't work. Follow the compiler errors, adding `&mut` to the + `timeout_fut` in the `select!` to work around the move, then using + `Box::pin`: + + ```rust,compile_fail + let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100))); + loop { + select! { + .., + _ = &mut timeout_fut => { println!(..); }, + } + } + ``` + + * This compiles, but once the timeout expires it is `Poll::Ready` on every + iteration (a fused future would help with this). Update to reset + `timeout_fut` every time it expires. + +* Box allocates on the heap. In some cases, `tokio::pin!` is also an option, but + that is difficult to use for a future that is reassigned. +* Another alternative is to not use `pin` at all but spawn another task that will send to a `oneshot` channel every 100ms. + +
diff --git a/src/async/runtimes.md b/src/async/runtimes.md new file mode 100644 index 00000000..64a309e0 --- /dev/null +++ b/src/async/runtimes.md @@ -0,0 +1,29 @@ +# Runtimes and Tasks + +A *runtime* provides support for performing operations asynchronously (a +*reactor*) and is responsible for executing futures (an *executor*). Rust does not have a +"built-in" runtime, but several options are available: + + * [Tokio](https://tokio.rs/) - performant, with a well-developed ecosystem of + functionality like [Hyper](https://hyper.rs/) for HTTP or + [Tonic](https://github.com/hyperium/tonic) for gRPC. + * [async-std](https://async.rs/) - aims to be a "std for async", and includes a + basic runtime in `async::task`. + * [smol](https://docs.rs/smol/latest/smol/) - simple and lightweight + +Several larger applications have their own runtimes. For example, +[Fuchsia](https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/src/lib/fuchsia-async/src/lib.rs) +already has one. + +
+ +* Note that of the listed runtimes, only Tokio is supported in the Rust + playground. The playground also does not permit any I/O, so most interesting + async things can't run in the playground. + +* Futures are "inert" in that they do not do anything (not even start an I/O + operation) unless there is an executor polling them. This differs from JS + Promises, for example, which will run to completion even if they are never + used. + +
diff --git a/src/async/runtimes/tokio.md b/src/async/runtimes/tokio.md new file mode 100644 index 00000000..c9dfbd48 --- /dev/null +++ b/src/async/runtimes/tokio.md @@ -0,0 +1,49 @@ +# Tokio + + +Tokio provides: + +* A multi-threaded runtime for executing asynchronous code. +* An asynchronous version of the standard library. +* A large ecosystem of libraries. + +```rust,editable,compile_fail +use tokio::time; + +async fn count_to(count: i32) { + for i in 1..=count { + println!("Count in task: {i}!"); + time::sleep(time::Duration::from_millis(5)).await; + } +} + +#[tokio::main] +async fn main() { + tokio::spawn(count_to(10)); + + for i in 1..5 { + println!("Main task: {i}"); + time::sleep(time::Duration::from_millis(5)).await; + } +} +``` + +
+ +* With the `tokio::main` macro we can now make `main` async. + +* The `spawn` function creates a new, concurrent "task". + +* Note: `spawn` takes a `Future`, you don't call `.await` on `count_to`. + +**Further exploration:** + +* Why does `count_to` not (usually) get to 10? This is an example of async + cancellation. `tokio::spawn` returns a handle which can be awaited to wait + until it finishes. + +* Try `count_to(10).await` instead of spawning. + +* Try awaiting the task returned from `tokio::spawn`. + +
diff --git a/src/async/tasks.md b/src/async/tasks.md new file mode 100644 index 00000000..e00499f8 --- /dev/null +++ b/src/async/tasks.md @@ -0,0 +1,64 @@ +# Tasks + +Runtimes have the concept of a "Task", similar to a thread but much +less resource-intensive. + +A Task has a single top-level Future which the executor polls to make progress. +That future may have one or more nested futures that its `poll` method polls, +corresponding loosely to a call stack. Concurrency within a task is possible by +polling multiple child futures, such as racing a timer and an I/O operation. + +```rust,compile_fail +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +#[tokio::main] +async fn main() -> io::Result<()> { + let listener = TcpListener::bind("127.0.0.1:6142").await?; + println!("listening on port 6142"); + + loop { + let (mut socket, addr) = listener.accept().await?; + + println!("connection from {addr:?}"); + + tokio::spawn(async move { + if let Err(e) = socket.write_all(b"Who are you?\n").await { + println!("socket error: {e:?}"); + return; + } + + let mut buf = vec![0; 1024]; + let reply = match socket.read(&mut buf).await { + Ok(n) => { + let name = std::str::from_utf8(&buf[..n]).unwrap().trim(); + format!("Thanks for dialing in, {name}!\n") + } + Err(e) => { + println!("socket error: {e:?}"); + return; + } + }; + + if let Err(e) = socket.write_all(reply.as_bytes()).await { + println!("socket error: {e:?}"); + } + }); + } +} +``` + +
+ +Copy this example into your prepared `src/main.rs` and run it from there. + +* Ask students to visualize what the state of the example server would be with a + few connected clients. What tasks exist? What are their Futures? + +* This is the first time we've seen an `async` block. This is similar to a + closure, but does not take any arguments. Its return value is a Future, + similar to an `async fn`. + +* Refactor the async block into a function, and improve the error handling using `?`. + +
diff --git a/src/exercises/day-4/async.md b/src/exercises/day-4/async.md new file mode 100644 index 00000000..14047188 --- /dev/null +++ b/src/exercises/day-4/async.md @@ -0,0 +1,3 @@ +# Exercises + +TBD diff --git a/src/running-the-course/day-4.md b/src/running-the-course/day-4.md index 0eb98dc5..ab21933c 100644 --- a/src/running-the-course/day-4.md +++ b/src/running-the-course/day-4.md @@ -23,5 +23,18 @@ Ensure that `adb sync` works with your emulator or real device and pre-build all Android examples using `src/android/build_all.sh`. Read the script to see the commands it runs and make sure they work when you run them by hand. +## Async + +If you chose Async for Day 4 afternoon, you will need a fresh crate set up and +the dependencies downloaded and ready to go. You can then copy/paste the +examples into `src/main.rs` to experiment with them. + +```shell +cargo init day4 +cd day4 +cargo add tokio --features full +cargo run +``` + [1]: https://source.android.com/docs/setup/download/downloading [2]: https://github.com/google/comprehensive-rust