diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 9a12b02e..8d1339dc 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -290,6 +290,7 @@ - [Blocking the Executor](async/pitfalls/blocking-executor.md) - [Pin](async/pitfalls/pin.md) - [Async Traits](async/pitfalls/async-traits.md) + - [Cancellation](async/pitfalls/cancellation.md) - [Exercises](exercises/concurrency/afternoon.md) - [Dining Philosophers](exercises/concurrency/dining-philosophers-async.md) - [Broadcast Chat Application](exercises/concurrency/chat-app.md) diff --git a/src/async/control-flow/select.md b/src/async/control-flow/select.md index 9963d898..ba42aa2d 100644 --- a/src/async/control-flow/select.md +++ b/src/async/control-flow/select.md @@ -70,9 +70,10 @@ async fn main() { * Try adding a deadline to the race, demonstrating selecting different sorts of futures. -* Note that `select!` moves the values it is given. It is easiest to use - when every execution of `select!` creates new futures. An alternative is to - pass `&mut future` instead of the future itself, but this can lead to - issues, further discussed in the pinning slide. +* Note that `select!` drops unmatched branches, which cancels their futures. + It is easiest to use when every execution of `select!` creates new futures. + + * An alternative is to pass `&mut future` instead of the future itself, but + this can lead to issues, further discussed in the pinning slide. diff --git a/src/async/pitfalls.md b/src/async/pitfalls.md index a8ae89e2..bcebafe0 100644 --- a/src/async/pitfalls.md +++ b/src/async/pitfalls.md @@ -5,3 +5,4 @@ Async / await provides convenient and efficient abstraction for concurrent async - [Blocking the Executor](pitfalls/blocking-executor.md) - [Pin](pitfalls/pin.md) - [Async Traits](pitfall/async-traits.md) +- [Cancellation](pitfalls/cancellation.md) diff --git a/src/async/pitfalls/cancellation.md b/src/async/pitfalls/cancellation.md new file mode 100644 index 00000000..8969b7f4 --- /dev/null +++ b/src/async/pitfalls/cancellation.md @@ -0,0 +1,114 @@ +# Cancellation + +Dropping a future implies it can never be polled again. This is called *cancellation* +and it can occur at any `await` point. Care is needed to ensure the system works +correctly even when futures are cancelled. For example, it shouldn't deadlock or +lose data. + +```rust,editable,compile_fail +use std::io::{self, ErrorKind}; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; + +struct LinesReader { + stream: DuplexStream, +} + +impl LinesReader { + fn new(stream: DuplexStream) -> Self { + Self { stream } + } + + async fn next(&mut self) -> io::Result> { + let mut bytes = Vec::new(); + let mut buf = [0]; + while self.stream.read(&mut buf[..]).await? != 0 { + bytes.push(buf[0]); + if buf[0] == b'\n' { + break; + } + } + if bytes.is_empty() { + return Ok(None) + } + let s = String::from_utf8(bytes) + .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?; + Ok(Some(s)) + } +} + +async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> { + for b in source.bytes() { + dest.write_u8(b).await?; + tokio::time::sleep(Duration::from_millis(10)).await + } + Ok(()) +} + +#[tokio::main] +async fn main() -> std::io::Result<()> { + let (client, server) = tokio::io::duplex(5); + let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client)); + + let mut lines = LinesReader::new(server); + let mut interval = tokio::time::interval(Duration::from_millis(60)); + loop { + tokio::select! { + _ = interval.tick() => println!("tick!"), + line = lines.next() => if let Some(l) = line? { + print!("{}", l) + } else { + break + }, + } + } + handle.await.unwrap()?; + Ok(()) +} +``` + +
+ +* The compiler doesn't help with cancellation-safety. You need to read API + documentation and consider what state your `async fn` holds. + +* Unlike `panic` and `?`, cancellation is part of normal control flow + (vs error-handling). + +* The example loses parts of the string. + + * Whenever the `tick()` branch finishes first, `next()` and its `buf` are dropped. + + * `LinesReader` can be made cancellation-safe by makeing `buf` part of the struct: + ```rust,compile_fail + struct LinesReader { + stream: DuplexStream, + bytes: Vec, + buf: [u8; 1], + } + + impl LinesReader { + fn new(stream: DuplexStream) -> Self { + Self { stream, bytes: Vec::new(), buf: [0] } + } + async fn next(&mut self) -> io::Result> { + // prefix buf and bytes with self. + // ... + let raw = std::mem::take(&mut self.bytes); + let s = String::from_utf8(raw) + // ... + } + } + ``` + +* [`Interval::tick`](https://docs.rs/tokio/latest/tokio/time/struct.Interval.html#method.tick) + is cancellation-safe because it keeps track of whether a tick has been 'delivered'. + +* [`AsyncReadExt::read`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncReadExt.html#method.read) + is cancellation-safe because it either returns or doesn't read data. + +* [`AsyncBufReadExt::read_line`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncBufReadExt.html#method.read_line) + is similar to the example and *isn't* cancellation-safe. See its documentation + for details and alternatives. + +