1
0
mirror of https://github.com/google/comprehensive-rust.git synced 2025-12-24 07:19:47 +02:00

Update Concurrency course with times (#2007)

As I mentioned in #1536:

* Break into segments at approximately the places @fw-immunant put
breaks
 * Move all of the files into `src/concurrency`
 * Add timings and segment/session metadata so course outlines appear

There's room for more work here, including some additional feedback from
@fw-immunant after the session I observed, but let's do one step at a
time :)
This commit is contained in:
Dustin J. Mitchell
2024-04-23 09:26:41 -04:00
committed by GitHub
parent a03b7e68e5
commit face5af783
58 changed files with 385 additions and 246 deletions

View File

@@ -0,0 +1,3 @@
# Channels and Control Flow
{{%segment outline}}

View File

@@ -0,0 +1,53 @@
---
minutes: 8
---
# Async Channels
Several crates have support for asynchronous channels. For instance `tokio`:
```rust,editable,compile_fail
use tokio::sync::mpsc::{self, Receiver};
async fn ping_handler(mut input: Receiver<()>) {
let mut count: usize = 0;
while let Some(_) = input.recv().await {
count += 1;
println!("Received {count} pings so far.");
}
println!("ping_handler complete");
}
#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(32);
let ping_handler_task = tokio::spawn(ping_handler(receiver));
for i in 0..10 {
sender.send(()).await.expect("Failed to send ping.");
println!("Sent {} pings so far.", i + 1);
}
drop(sender);
ping_handler_task.await.expect("Something went wrong in ping handler task.");
}
```
<details>
- Change the channel size to `3` and see how it affects the execution.
- Overall, the interface is similar to the `sync` channels as seen in the
[morning class](concurrency/channels.md).
- Try removing the `std::mem::drop` call. What happens? Why?
- The [Flume](https://docs.rs/flume/latest/flume/) crate has channels that
implement both `sync` and `async` `send` and `recv`. This can be convenient
for complex applications with both IO and heavy CPU processing tasks.
- What makes working with `async` channels preferable is the ability to combine
them with other `future`s to combine them and create complex control flow.
</details>

View File

@@ -0,0 +1,55 @@
---
minutes: 4
---
# Join
A join operation waits until all of a set of futures are ready, and returns a
collection of their results. This is similar to `Promise.all` in JavaScript or
`asyncio.gather` in Python.
```rust,editable,compile_fail
use anyhow::Result;
use futures::future;
use reqwest;
use std::collections::HashMap;
async fn size_of_page(url: &str) -> Result<usize> {
let resp = reqwest::get(url).await?;
Ok(resp.text().await?.len())
}
#[tokio::main]
async fn main() {
let urls: [&str; 4] = [
"https://google.com",
"https://httpbin.org/ip",
"https://play.rust-lang.org/",
"BAD_URL",
];
let futures_iter = urls.into_iter().map(size_of_page);
let results = future::join_all(futures_iter).await;
let page_sizes_dict: HashMap<&str, Result<usize>> =
urls.into_iter().zip(results.into_iter()).collect();
println!("{:?}", page_sizes_dict);
}
```
<details>
Copy this example into your prepared `src/main.rs` and run it from there.
- For multiple futures of disjoint types, you can use `std::future::join!` but
you must know how many futures you will have at compile time. This is
currently in the `futures` crate, soon to be stabilised in `std::future`.
- The risk of `join` is that one of the futures may never resolve, this would
cause your program to stall.
- You can also combine `join_all` with `join!` for instance to join all requests
to an http service as well as a database query. Try adding a
`tokio::time::sleep` to the future, using `futures::join!`. This is not a
timeout (that requires `select!`, explained in the next chapter), but
demonstrates `join!`.
</details>

View File

@@ -0,0 +1,78 @@
---
minutes: 5
---
# Select
A select operation waits until any of a set of futures is ready, and responds to
that future's result. In JavaScript, this is similar to `Promise.race`. In
Python, it compares to
`asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED)`.
Similar to a match statement, the body of `select!` has a number of arms, each
of the form `pattern = future => statement`. When a `future` is ready, its
return value is destructured by the `pattern`. The `statement` is then run with
the resulting variables. The `statement` result becomes the result of the
`select!` macro.
```rust,editable,compile_fail
use tokio::sync::mpsc::{self, Receiver};
use tokio::time::{sleep, Duration};
#[derive(Debug, PartialEq)]
enum Animal {
Cat { name: String },
Dog { name: String },
}
async fn first_animal_to_finish_race(
mut cat_rcv: Receiver<String>,
mut dog_rcv: Receiver<String>,
) -> Option<Animal> {
tokio::select! {
cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }),
dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? })
}
}
#[tokio::main]
async fn main() {
let (cat_sender, cat_receiver) = mpsc::channel(32);
let (dog_sender, dog_receiver) = mpsc::channel(32);
tokio::spawn(async move {
sleep(Duration::from_millis(500)).await;
cat_sender.send(String::from("Felix")).await.expect("Failed to send cat.");
});
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
dog_sender.send(String::from("Rex")).await.expect("Failed to send dog.");
});
let winner = first_animal_to_finish_race(cat_receiver, dog_receiver)
.await
.expect("Failed to receive winner");
println!("Winner is {winner:?}");
}
```
<details>
- In this example, we have a race between a cat and a dog.
`first_animal_to_finish_race` listens to both channels and will pick whichever
arrives first. Since the dog takes 50ms, it wins against the cat that take
500ms.
- You can use `oneshot` channels in this example as the channels are supposed to
receive only one `send`.
- Try adding a deadline to the race, demonstrating selecting different sorts of
futures.
- Note that `select!` drops unmatched branches, which cancels their futures. It
is easiest to use when every execution of `select!` creates new futures.
- An alternative is to pass `&mut future` instead of the future itself, but
this can lead to issues, further discussed in the pinning slide.
</details>

View File

@@ -0,0 +1,3 @@
# Exercises
{{%segment outline}}

View File

@@ -0,0 +1 @@
# Exercises

View File

@@ -0,0 +1,113 @@
---
minutes: 30
---
# Broadcast Chat Application
In this exercise, we want to use our new knowledge to implement a broadcast chat
application. We have a chat server that the clients connect to and publish their
messages. The client reads user messages from the standard input, and sends them
to the server. The chat server broadcasts each message that it receives to all
the clients.
For this, we use [a broadcast channel][1] on the server, and
[`tokio_websockets`][2] for the communication between the client and the server.
Create a new Cargo project and add the following dependencies:
_Cargo.toml_:
<!-- File Cargo.toml -->
```toml
{{#include chat-async/Cargo.toml}}
```
## The required APIs
You are going to need the following functions from `tokio` and
[`tokio_websockets`][2]. Spend a few minutes to familiarize yourself with the
API.
- [StreamExt::next()][3] implemented by `WebSocketStream`: for asynchronously
reading messages from a Websocket Stream.
- [SinkExt::send()][4] implemented by `WebSocketStream`: for asynchronously
sending messages on a Websocket Stream.
- [Lines::next_line()][5]: for asynchronously reading user messages from the
standard input.
- [Sender::subscribe()][6]: for subscribing to a broadcast channel.
## Two binaries
Normally in a Cargo project, you can have only one binary, and one `src/main.rs`
file. In this project, we need two binaries. One for the client, and one for the
server. You could potentially make them two separate Cargo projects, but we are
going to put them in a single Cargo project with two binaries. For this to work,
the client and the server code should go under `src/bin` (see the
[documentation][7]).
Copy the following server and client code into `src/bin/server.rs` and
`src/bin/client.rs`, respectively. Your task is to complete these files as
described below.
_src/bin/server.rs_:
<!-- File src/bin/server.rs -->
```rust,compile_fail
{{#include chat-async/src/bin/server.rs:setup}}
{{#include chat-async/src/bin/server.rs:handle_connection}}
// TODO: For a hint, see the description of the task below.
{{#include chat-async/src/bin/server.rs:main}}
```
_src/bin/client.rs_:
<!-- File src/bin/client.rs -->
```rust,compile_fail
{{#include chat-async/src/bin/client.rs:setup}}
// TODO: For a hint, see the description of the task below.
}
```
## Running the binaries
Run the server with:
```shell
cargo run --bin server
```
and the client with:
```shell
cargo run --bin client
```
## Tasks
- Implement the `handle_connection` function in `src/bin/server.rs`.
- Hint: Use `tokio::select!` for concurrently performing two tasks in a
continuous loop. One task receives messages from the client and broadcasts
them. The other sends messages received by the server to the client.
- Complete the main function in `src/bin/client.rs`.
- Hint: As before, use `tokio::select!` in a continuous loop for concurrently
performing two tasks: (1) reading user messages from standard input and
sending them to the server, and (2) receiving messages from the server, and
displaying them for the user.
- Optional: Once you are done, change the code to broadcast messages to all
clients, but the sender of the message.
[1]: https://docs.rs/tokio/latest/tokio/sync/broadcast/fn.channel.html
[2]: https://docs.rs/tokio-websockets/
[3]: https://docs.rs/futures-util/0.3.28/futures_util/stream/trait.StreamExt.html#method.next
[4]: https://docs.rs/futures-util/0.3.28/futures_util/sink/trait.SinkExt.html#method.send
[5]: https://docs.rs/tokio/latest/tokio/io/struct.Lines.html#method.next_line
[6]: https://docs.rs/tokio/latest/tokio/sync/broadcast/struct.Sender.html#method.subscribe
[7]: https://doc.rust-lang.org/cargo/reference/cargo-targets.html#binaries

View File

@@ -0,0 +1,10 @@
[package]
name = "chat-async"
version = "0.1.0"
edition = "2021"
[dependencies]
futures-util = { version = "0.3.30", features = ["sink"] }
http = "1.1.0"
tokio = { version = "1.37.0", features = ["full"] }
tokio-websockets = { version = "0.8.2", features = ["client", "fastrand", "server", "sha1_smol"] }

View File

@@ -0,0 +1,58 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ANCHOR: solution
// ANCHOR: setup
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};
#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let (mut ws_stream, _) =
ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
.connect()
.await?;
let stdin = tokio::io::stdin();
let mut stdin = BufReader::new(stdin).lines();
// ANCHOR_END: setup
// Continuous loop for concurrently sending and receiving messages.
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
println!("From server: {}", text);
}
},
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
res = stdin.next_line() => {
match res {
Ok(None) => return Ok(()),
Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,
Err(err) => return Err(err.into()),
}
}
}
}
}

View File

@@ -0,0 +1,83 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ANCHOR: solution
// ANCHOR: setup
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
// ANCHOR_END: setup
// ANCHOR: handle_connection
async fn handle_connection(
addr: SocketAddr,
mut ws_stream: WebSocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// ANCHOR_END: handle_connection
ws_stream
.send(Message::text("Welcome to chat! Type a message".to_string()))
.await?;
let mut bcast_rx = bcast_tx.subscribe();
// A continuous loop for concurrently performing two tasks: (1) receiving
// messages from `ws_stream` and broadcasting them, and (2) receiving
// messages on `bcast_rx` and sending them to the client.
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if let Some(text) = msg.as_text() {
println!("From client {addr:?} {text:?}");
bcast_tx.send(text.into())?;
}
}
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
msg = bcast_rx.recv() => {
ws_stream.send(Message::text(msg?)).await?;
}
}
}
// ANCHOR: main
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let (bcast_tx, _) = channel(16);
let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("listening on port 2000");
loop {
let (socket, addr) = listener.accept().await?;
println!("New connection from {addr:?}");
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// Wrap the raw TCP stream into a websocket.
let ws_stream = ServerBuilder::new().accept(socket).await?;
handle_connection(addr, ws_stream, bcast_tx).await
});
}
}
// ANCHOR_END: main

View File

@@ -0,0 +1,61 @@
---
minutes: 20
---
# Dining Philosophers --- Async
See [dining philosophers](dining-philosophers.md) for a description of the
problem.
As before, you will need a local
[Cargo installation](../../cargo/running-locally.md) for this exercise. Copy the
code below to a file called `src/main.rs`, fill out the blanks, and test that
`cargo run` does not deadlock:
<!-- File src/main.rs -->
```rust,compile_fail
{{#include dining-philosophers.rs:Philosopher}}
// left_fork: ...
// right_fork: ...
// thoughts: ...
}
{{#include dining-philosophers.rs:Philosopher-think}}
{{#include dining-philosophers.rs:Philosopher-eat}}
{{#include dining-philosophers.rs:Philosopher-eat-body}}
{{#include dining-philosophers.rs:Philosopher-eat-end}}
// Create forks
// Create philosophers
// Make them think and eat
// Output their thoughts
}
```
Since this time you are using Async Rust, you'll need a `tokio` dependency. You
can use the following `Cargo.toml`:
<!-- File Cargo.toml -->
```toml
[package]
name = "dining-philosophers-async-dine"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.26.0", features = ["sync", "time", "macros", "rt-multi-thread"] }
```
Also note that this time you have to use the `Mutex` and the `mpsc` module from
the `tokio` crate.
<details>
- Can you make your implementation single-threaded?
</details>

View File

@@ -0,0 +1,119 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ANCHOR: solution
// ANCHOR: Philosopher
use std::sync::Arc;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::Mutex;
use tokio::time;
struct Fork;
struct Philosopher {
name: String,
// ANCHOR_END: Philosopher
left_fork: Arc<Mutex<Fork>>,
right_fork: Arc<Mutex<Fork>>,
thoughts: Sender<String>,
}
// ANCHOR: Philosopher-think
impl Philosopher {
async fn think(&self) {
self.thoughts
.send(format!("Eureka! {} has a new idea!", &self.name))
.await
.unwrap();
}
// ANCHOR_END: Philosopher-think
// ANCHOR: Philosopher-eat
async fn eat(&self) {
// Keep trying until we have both forks
// ANCHOR_END: Philosopher-eat
let (_left_fork, _right_fork) = loop {
// Pick up forks...
let left_fork = self.left_fork.try_lock();
let right_fork = self.right_fork.try_lock();
let Ok(left_fork) = left_fork else {
// If we didn't get the left fork, drop the right fork if we
// have it and let other tasks make progress.
drop(right_fork);
time::sleep(time::Duration::from_millis(1)).await;
continue;
};
let Ok(right_fork) = right_fork else {
// If we didn't get the right fork, drop the left fork and let
// other tasks make progress.
drop(left_fork);
time::sleep(time::Duration::from_millis(1)).await;
continue;
};
break (left_fork, right_fork);
};
// ANCHOR: Philosopher-eat-body
println!("{} is eating...", &self.name);
time::sleep(time::Duration::from_millis(5)).await;
// ANCHOR_END: Philosopher-eat-body
// The locks are dropped here
// ANCHOR: Philosopher-eat-end
}
}
static PHILOSOPHERS: &[&str] =
&["Socrates", "Hypatia", "Plato", "Aristotle", "Pythagoras"];
#[tokio::main]
async fn main() {
// ANCHOR_END: Philosopher-eat-end
// Create forks
let mut forks = vec![];
(0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));
// Create philosophers
let (philosophers, mut rx) = {
let mut philosophers = vec![];
let (tx, rx) = mpsc::channel(10);
for (i, name) in PHILOSOPHERS.iter().enumerate() {
let left_fork = Arc::clone(&forks[i]);
let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);
philosophers.push(Philosopher {
name: name.to_string(),
left_fork,
right_fork,
thoughts: tx.clone(),
});
}
(philosophers, rx)
// tx is dropped here, so we don't need to explicitly drop it later
};
// Make them think and eat
for phil in philosophers {
tokio::spawn(async move {
for _ in 0..100 {
phil.think().await;
phil.eat().await;
}
});
}
// Output their thoughts
while let Some(thought) = rx.recv().await {
println!("Here is a thought: {thought}");
}
}

View File

@@ -0,0 +1,25 @@
---
minutes: 20
---
# Solutions
## Dining Philosophers --- Async
```rust,compile_fail
{{#include dining-philosophers.rs:solution}}
```
## Broadcast Chat Application
_src/bin/server.rs_:
```rust,compile_fail
{{#include chat-async/src/bin/server.rs:solution}}
```
_src/bin/client.rs_:
```rust,compile_fail
{{#include chat-async/src/bin/client.rs:solution}}
```

View File

@@ -0,0 +1,7 @@
# Pitfalls
Async / await provides convenient and efficient abstraction for concurrent
asynchronous programming. However, the async/await model in Rust also comes with
its share of pitfalls and footguns. We illustrate some of them in this chapter.
{{%segment outline}}

View File

@@ -0,0 +1,83 @@
---
minutes: 5
---
# Async Traits
Async methods in traits are were stabilized only recently, in the 1.75 release.
This required support for using return-position `impl Trait` (RPIT) in traits,
as the desugaring for `async fn` includes `-> impl Future<Output = ...>`.
However, even with the native support today there are some pitfalls around
`async fn` and RPIT in traits:
- Return-position impl Trait captures all in-scope lifetimes (so some patterns
of borrowing cannot be expressed)
- Traits whose methods use return-position `impl trait` or `async` are not `dyn`
compatible.
If we do need `dyn` support, the crate
[async_trait](https://docs.rs/async-trait/latest/async_trait/) provides a
workaround through a macro, with some caveats:
```rust,editable,compile_fail
use async_trait::async_trait;
use std::time::Instant;
use tokio::time::{sleep, Duration};
#[async_trait]
trait Sleeper {
async fn sleep(&self);
}
struct FixedSleeper {
sleep_ms: u64,
}
#[async_trait]
impl Sleeper for FixedSleeper {
async fn sleep(&self) {
sleep(Duration::from_millis(self.sleep_ms)).await;
}
}
async fn run_all_sleepers_multiple_times(
sleepers: Vec<Box<dyn Sleeper>>,
n_times: usize,
) {
for _ in 0..n_times {
println!("running all sleepers..");
for sleeper in &sleepers {
let start = Instant::now();
sleeper.sleep().await;
println!("slept for {}ms", start.elapsed().as_millis());
}
}
}
#[tokio::main]
async fn main() {
let sleepers: Vec<Box<dyn Sleeper>> = vec![
Box::new(FixedSleeper { sleep_ms: 50 }),
Box::new(FixedSleeper { sleep_ms: 100 }),
];
run_all_sleepers_multiple_times(sleepers, 5).await;
}
```
<details>
- `async_trait` is easy to use, but note that it's using heap allocations to
achieve this. This heap allocation has performance overhead.
- The challenges in language support for `async trait` are deep Rust and
probably not worth describing in-depth. Niko Matsakis did a good job of
explaining them in
[this post](https://smallcultfollowing.com/babysteps/blog/2019/10/26/async-fn-in-traits-are-hard/)
if you are interested in digging deeper.
- Try creating a new sleeper struct that will sleep for a random amount of time
and adding it to the Vec.
</details>

View File

@@ -0,0 +1,54 @@
---
minutes: 10
---
# Blocking the executor
Most async runtimes only allow IO tasks to run concurrently. This means that CPU
blocking tasks will block the executor and prevent other tasks from being
executed. An easy workaround is to use async equivalent methods where possible.
```rust,editable,compile_fail
use futures::future::join_all;
use std::time::Instant;
async fn sleep_ms(start: &Instant, id: u64, duration_ms: u64) {
std::thread::sleep(std::time::Duration::from_millis(duration_ms));
println!(
"future {id} slept for {duration_ms}ms, finished after {}ms",
start.elapsed().as_millis()
);
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
let start = Instant::now();
let sleep_futures = (1..=10).map(|t| sleep_ms(&start, t, t * 10));
join_all(sleep_futures).await;
}
```
<details>
- Run the code and see that the sleeps happen consecutively rather than
concurrently.
- The `"current_thread"` flavor puts all tasks on a single thread. This makes
the effect more obvious, but the bug is still present in the multi-threaded
flavor.
- Switch the `std::thread::sleep` to `tokio::time::sleep` and await its result.
- Another fix would be to `tokio::task::spawn_blocking` which spawns an actual
thread and transforms its handle into a future without blocking the executor.
- You should not think of tasks as OS threads. They do not map 1 to 1 and most
executors will allow many tasks to run on a single OS thread. This is
particularly problematic when interacting with other libraries via FFI, where
that library might depend on thread-local storage or map to specific OS
threads (e.g., CUDA). Prefer `tokio::task::spawn_blocking` in such situations.
- Use sync mutexes with care. Holding a mutex over an `.await` may cause another
task to block, and that task may be running on the same thread.
</details>

View File

@@ -0,0 +1,121 @@
---
minutes: 18
---
# Cancellation
Dropping a future implies it can never be polled again. This is called
_cancellation_ and it can occur at any `await` point. Care is needed to ensure
the system works correctly even when futures are cancelled. For example, it
shouldn't deadlock or lose data.
```rust,editable,compile_fail
use std::io::{self, ErrorKind};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
struct LinesReader {
stream: DuplexStream,
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream }
}
async fn next(&mut self) -> io::Result<Option<String>> {
let mut bytes = Vec::new();
let mut buf = [0];
while self.stream.read(&mut buf[..]).await? != 0 {
bytes.push(buf[0]);
if buf[0] == b'\n' {
break;
}
}
if bytes.is_empty() {
return Ok(None);
}
let s = String::from_utf8(bytes)
.map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
Ok(Some(s))
}
}
async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
for b in source.bytes() {
dest.write_u8(b).await?;
tokio::time::sleep(Duration::from_millis(10)).await
}
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let (client, server) = tokio::io::duplex(5);
let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));
let mut lines = LinesReader::new(server);
let mut interval = tokio::time::interval(Duration::from_millis(60));
loop {
tokio::select! {
_ = interval.tick() => println!("tick!"),
line = lines.next() => if let Some(l) = line? {
print!("{}", l)
} else {
break
},
}
}
handle.await.unwrap()?;
Ok(())
}
```
<details>
- The compiler doesn't help with cancellation-safety. You need to read API
documentation and consider what state your `async fn` holds.
- Unlike `panic` and `?`, cancellation is part of normal control flow (vs
error-handling).
- The example loses parts of the string.
- Whenever the `tick()` branch finishes first, `next()` and its `buf` are
dropped.
- `LinesReader` can be made cancellation-safe by making `buf` part of the
struct:
```rust,compile_fail
struct LinesReader {
stream: DuplexStream,
bytes: Vec<u8>,
buf: [u8; 1],
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream, bytes: Vec::new(), buf: [0] }
}
async fn next(&mut self) -> io::Result<Option<String>> {
// prefix buf and bytes with self.
// ...
let raw = std::mem::take(&mut self.bytes);
let s = String::from_utf8(raw)
// ...
}
}
```
- [`Interval::tick`](https://docs.rs/tokio/latest/tokio/time/struct.Interval.html#method.tick)
is cancellation-safe because it keeps track of whether a tick has been
'delivered'.
- [`AsyncReadExt::read`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncReadExt.html#method.read)
is cancellation-safe because it either returns or doesn't read data.
- [`AsyncBufReadExt::read_line`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncBufReadExt.html#method.read_line)
is similar to the example and _isn't_ cancellation-safe. See its documentation
for details and alternatives.
</details>

View File

@@ -0,0 +1,132 @@
---
minutes: 20
---
# `Pin`
Async blocks and functions return types implementing the `Future` trait. The
type returned is the result of a compiler transformation which turns local
variables into data stored inside the future.
Some of those variables can hold pointers to other local variables. Because of
that, the future should never be moved to a different memory location, as it
would invalidate those pointers.
To prevent moving the future type in memory, it can only be polled through a
pinned pointer. `Pin` is a wrapper around a reference that disallows all
operations that would move the instance it points to into a different memory
location.
```rust,editable,compile_fail
use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};
// A work item. In this case, just sleep for the given time and respond
// with a message on the `respond_on` channel.
#[derive(Debug)]
struct Work {
input: u32,
respond_on: oneshot::Sender<u32>,
}
// A worker which listens for work on a queue and performs it.
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
let mut iterations = 0;
loop {
tokio::select! {
Some(work) = work_queue.recv() => {
sleep(Duration::from_millis(10)).await; // Pretend to work.
work.respond_on
.send(work.input * 1000)
.expect("failed to send response");
iterations += 1;
}
// TODO: report number of iterations every 100ms
}
}
}
// A requester which requests work and waits for it to complete.
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
let (tx, rx) = oneshot::channel();
work_queue
.send(Work { input, respond_on: tx })
.await
.expect("failed to send on work queue");
rx.await.expect("failed waiting for response")
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(10);
spawn(worker(rx));
for i in 0..100 {
let resp = do_work(&tx, i).await;
println!("work result for iteration {i}: {resp}");
}
}
```
<details>
- You may recognize this as an example of the actor pattern. Actors typically
call `select!` in a loop.
- This serves as a summation of a few of the previous lessons, so take your time
with it.
- Naively add a `_ = sleep(Duration::from_millis(100)) => { println!(..) }` to
the `select!`. This will never execute. Why?
- Instead, add a `timeout_fut` containing that future outside of the `loop`:
```rust,compile_fail
let mut timeout_fut = sleep(Duration::from_millis(100));
loop {
select! {
..,
_ = timeout_fut => { println!(..); },
}
}
```
- This still doesn't work. Follow the compiler errors, adding `&mut` to the
`timeout_fut` in the `select!` to work around the move, then using
`Box::pin`:
```rust,compile_fail
let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
loop {
select! {
..,
_ = &mut timeout_fut => { println!(..); },
}
}
```
- This compiles, but once the timeout expires it is `Poll::Ready` on every
iteration (a fused future would help with this). Update to reset
`timeout_fut` every time it expires.
- Box allocates on the heap. In some cases, `std::pin::pin!` (only recently
stabilized, with older code often using `tokio::pin!`) is also an option, but
that is difficult to use for a future that is reassigned.
- Another alternative is to not use `pin` at all but spawn another task that
will send to a `oneshot` channel every 100ms.
- Data that contains pointers to itself is called self-referential. Normally,
the Rust borrow checker would prevent self-referential data from being moved,
as the references cannot outlive the data they point to. However, the code
transformation for async blocks and functions is not verified by the borrow
checker.
- `Pin` is a wrapper around a reference. An object cannot be moved from its
place using a pinned pointer. However, it can still be moved through an
unpinned pointer.
- The `poll` method of the `Future` trait uses `Pin<&mut Self>` instead of
`&mut Self` to refer to the instance. That's why it can only be called on a
pinned pointer.
</details>

3
src/concurrency/async.md Normal file
View File

@@ -0,0 +1,3 @@
# Async Basics
{{%segment outline}}

View File

@@ -0,0 +1,52 @@
---
minutes: 6
---
# `async`/`await`
At a high level, async Rust code looks very much like "normal" sequential code:
```rust,editable,compile_fail
use futures::executor::block_on;
async fn count_to(count: i32) {
for i in 1..=count {
println!("Count is: {i}!");
}
}
async fn async_main(count: i32) {
count_to(count).await;
}
fn main() {
block_on(async_main(10));
}
```
<details>
Key points:
- Note that this is a simplified example to show the syntax. There is no long
running operation or any real concurrency in it!
- What is the return type of an async call?
- Use `let future: () = async_main(10);` in `main` to see the type.
- The "async" keyword is syntactic sugar. The compiler replaces the return type
with a future.
- You cannot make `main` async, without additional instructions to the compiler
on how to use the returned future.
- You need an executor to run async code. `block_on` blocks the current thread
until the provided future has run to completion.
- `.await` asynchronously waits for the completion of another operation. Unlike
`block_on`, `.await` doesn't block the current thread.
- `.await` can only be used inside an `async` function (or block; these are
introduced later).
</details>

View File

@@ -0,0 +1,49 @@
---
minutes: 4
---
# Futures
[`Future`](https://doc.rust-lang.org/std/future/trait.Future.html) is a trait,
implemented by objects that represent an operation that may not be complete yet.
A future can be polled, and `poll` returns a
[`Poll`](https://doc.rust-lang.org/std/task/enum.Poll.html).
```rust
use std::pin::Pin;
use std::task::Context;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
```
An async function returns an `impl Future`. It's also possible (but uncommon) to
implement `Future` for your own types. For example, the `JoinHandle` returned
from `tokio::spawn` implements `Future` to allow joining to it.
The `.await` keyword, applied to a Future, causes the current async function to
pause until that Future is ready, and then evaluates to its output.
<details>
- The `Future` and `Poll` types are implemented exactly as shown; click the
links to show the implementations in the docs.
- We will not get to `Pin` and `Context`, as we will focus on writing async
code, rather than building new async primitives. Briefly:
- `Context` allows a Future to schedule itself to be polled again when an
event occurs.
- `Pin` ensures that the Future isn't moved in memory, so that pointers into
that future remain valid. This is required to allow references to remain
valid after an `.await`.
</details>

View File

@@ -0,0 +1,33 @@
---
minutes: 10
---
# Runtimes
A _runtime_ provides support for performing operations asynchronously (a
_reactor_) and is responsible for executing futures (an _executor_). Rust does
not have a "built-in" runtime, but several options are available:
- [Tokio](https://tokio.rs/): performant, with a well-developed ecosystem of
functionality like [Hyper](https://hyper.rs/) for HTTP or
[Tonic](https://github.com/hyperium/tonic) for gRPC.
- [async-std](https://async.rs/): aims to be a "std for async", and includes a
basic runtime in `async::task`.
- [smol](https://docs.rs/smol/latest/smol/): simple and lightweight
Several larger applications have their own runtimes. For example,
[Fuchsia](https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/src/lib/fuchsia-async/src/lib.rs)
already has one.
<details>
- Note that of the listed runtimes, only Tokio is supported in the Rust
playground. The playground also does not permit any I/O, so most interesting
async things can't run in the playground.
- Futures are "inert" in that they do not do anything (not even start an I/O
operation) unless there is an executor polling them. This differs from JS
Promises, for example, which will run to completion even if they are never
used.
</details>

View File

@@ -0,0 +1,48 @@
# Tokio
Tokio provides:
- A multi-threaded runtime for executing asynchronous code.
- An asynchronous version of the standard library.
- A large ecosystem of libraries.
```rust,editable,compile_fail
use tokio::time;
async fn count_to(count: i32) {
for i in 1..=count {
println!("Count in task: {i}!");
time::sleep(time::Duration::from_millis(5)).await;
}
}
#[tokio::main]
async fn main() {
tokio::spawn(count_to(10));
for i in 1..5 {
println!("Main task: {i}");
time::sleep(time::Duration::from_millis(5)).await;
}
}
```
<details>
- With the `tokio::main` macro we can now make `main` async.
- The `spawn` function creates a new, concurrent "task".
- Note: `spawn` takes a `Future`, you don't call `.await` on `count_to`.
**Further exploration:**
- Why does `count_to` not (usually) get to 10? This is an example of async
cancellation. `tokio::spawn` returns a handle which can be awaited to wait
until it finishes.
- Try `count_to(10).await` instead of spawning.
- Try awaiting the task returned from `tokio::spawn`.
</details>

View File

@@ -0,0 +1,59 @@
---
minutes: 6
---
# Tasks
Rust has a task system, which is a form of lightweight threading.
A task has a single top-level future which the executor polls to make progress.
That future may have one or more nested futures that its `poll` method polls,
corresponding loosely to a call stack. Concurrency within a task is possible by
polling multiple child futures, such as racing a timer and an I/O operation.
```rust,compile_fail
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
println!("listening on port {}", listener.local_addr()?.port());
loop {
let (mut socket, addr) = listener.accept().await?;
println!("connection from {addr:?}");
tokio::spawn(async move {
socket.write_all(b"Who are you?\n").await.expect("socket error");
let mut buf = vec![0; 1024];
let name_size = socket.read(&mut buf).await.expect("socket error");
let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim();
let reply = format!("Thanks for dialing in, {name}!\n");
socket.write_all(reply.as_bytes()).await.expect("socket error");
});
}
}
```
<details>
Copy this example into your prepared `src/main.rs` and run it from there.
Try connecting to it with a TCP connection tool like
[nc](https://www.unix.com/man-page/linux/1/nc/) or
[telnet](https://www.unix.com/man-page/linux/1/telnet/).
- Ask students to visualize what the state of the example server would be with a
few connected clients. What tasks exist? What are their Futures?
- This is the first time we've seen an `async` block. This is similar to a
closure, but does not take any arguments. Its return value is a Future,
similar to an `async fn`.
- Refactor the async block into a function, and improve the error handling using
`?`.
</details>

View File

@@ -1,32 +1,3 @@
# Channels
Rust channels have two parts: a `Sender<T>` and a `Receiver<T>`. The two parts
are connected via the channel, but you only see the end-points.
```rust,editable
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
tx.send(10).unwrap();
tx.send(20).unwrap();
println!("Received: {:?}", rx.recv());
println!("Received: {:?}", rx.recv());
let tx2 = tx.clone();
tx2.send(30).unwrap();
println!("Received: {:?}", rx.recv());
}
```
<details>
- `mpsc` stands for Multi-Producer, Single-Consumer. `Sender` and `SyncSender`
implement `Clone` (so you can make multiple producers) but `Receiver` does
not.
- `send()` and `recv()` return `Result`. If they return `Err`, it means the
counterpart `Sender` or `Receiver` is dropped and the channel is closed.
</details>
{{%segment outline}}

View File

@@ -1,3 +1,7 @@
---
minutes: 8
---
# Bounded Channels
With bounded (synchronous) channels, `send` can block the current thread:

View File

@@ -0,0 +1,36 @@
---
minutes: 9
---
# Senders and Receivers
Rust channels have two parts: a `Sender<T>` and a `Receiver<T>`. The two parts
are connected via the channel, but you only see the end-points.
```rust,editable
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
tx.send(10).unwrap();
tx.send(20).unwrap();
println!("Received: {:?}", rx.recv());
println!("Received: {:?}", rx.recv());
let tx2 = tx.clone();
tx2.send(30).unwrap();
println!("Received: {:?}", rx.recv());
}
```
<details>
- `mpsc` stands for Multi-Producer, Single-Consumer. `Sender` and `SyncSender`
implement `Clone` (so you can make multiple producers) but `Receiver` does
not.
- `send()` and `recv()` return `Result`. If they return `Err`, it means the
counterpart `Sender` or `Receiver` is dropped and the channel is closed.
</details>

View File

@@ -1,3 +1,7 @@
---
minutes: 2
---
# Unbounded Channels
You get an unbounded and asynchronous channel with `mpsc::channel()`:

View File

@@ -1,25 +1,3 @@
# `Send` and `Sync`
How does Rust know to forbid shared access across threads? The answer is in two
traits:
- [`Send`][1]: a type `T` is `Send` if it is safe to move a `T` across a thread
boundary.
- [`Sync`][2]: a type `T` is `Sync` if it is safe to move a `&T` across a thread
boundary.
`Send` and `Sync` are [unsafe traits][3]. The compiler will automatically derive
them for your types as long as they only contain `Send` and `Sync` types. You
can also implement them manually when you know it is valid.
[1]: https://doc.rust-lang.org/std/marker/trait.Send.html
[2]: https://doc.rust-lang.org/std/marker/trait.Sync.html
[3]: ../unsafe/unsafe-traits.md
<details>
- One can think of these traits as markers that the type has certain
thread-safety properties.
- They can be used in the generic constraints as normal traits.
</details>
{{%segment outline}}

View File

@@ -1,3 +1,7 @@
---
minutes: 6
---
# Examples
## `Send + Sync`

View File

@@ -0,0 +1,29 @@
---
minutes: 2
---
# Marker Traits
How does Rust know to forbid shared access across threads? The answer is in two
traits:
- [`Send`][1]: a type `T` is `Send` if it is safe to move a `T` across a thread
boundary.
- [`Sync`][2]: a type `T` is `Sync` if it is safe to move a `&T` across a thread
boundary.
`Send` and `Sync` are [unsafe traits][3]. The compiler will automatically derive
them for your types as long as they only contain `Send` and `Sync` types. You
can also implement them manually when you know it is valid.
[1]: https://doc.rust-lang.org/std/marker/trait.Send.html
[2]: https://doc.rust-lang.org/std/marker/trait.Sync.html
[3]: ../unsafe/unsafe-traits.md
<details>
- One can think of these traits as markers that the type has certain
thread-safety properties.
- They can be used in the generic constraints as normal traits.
</details>

View File

@@ -1,3 +1,7 @@
---
minutes: 2
---
# `Send`
> A type `T` is [`Send`][1] if it is safe to move a `T` value to another thread.

View File

@@ -1,3 +1,7 @@
---
minutes: 2
---
# `Sync`
> A type `T` is [`Sync`][1] if it is safe to access a `T` value from multiple

View File

@@ -0,0 +1,3 @@
# Shared State
{{%segment outline}}

View File

@@ -1,3 +1,7 @@
---
minutes: 5
---
# `Arc`
[`Arc<T>`][1] allows shared read-only access via `Arc::clone`:

View File

@@ -1,3 +1,7 @@
---
minutes: 8
---
# Example
Let us see `Arc` and `Mutex` in action:

View File

@@ -1,3 +1,7 @@
---
minutes: 14
---
# `Mutex`
[`Mutex<T>`][1] ensures mutual exclusion _and_ allows mutable access to `T`

View File

@@ -1,11 +0,0 @@
# Shared State
Rust uses the type system to enforce synchronization of shared data. This is
primarily done via two types:
- [`Arc<T>`][1], atomic reference counted `T`: handles sharing between threads
and takes care to deallocate `T` when the last reference is dropped,
- [`Mutex<T>`][2]: ensures mutually exclusive access to the `T` value.
[1]: https://doc.rust-lang.org/std/sync/struct.Arc.html
[2]: https://doc.rust-lang.org/std/sync/struct.Mutex.html

View File

@@ -0,0 +1,3 @@
# Exercises
{{%segment outline}}

View File

@@ -0,0 +1,21 @@
[package]
name = "sync-exercises"
version = "0.1.0"
edition = "2021"
publish = false
[[bin]]
name = "dining-philosophers"
path = "dining-philosophers.rs"
[[bin]]
name = "link-checker"
path = "link-checker.rs"
[dependencies]
reqwest = { version = "0.12.4", features = ["blocking"] }
scraper = "0.19.0"
thiserror = "1.0.59"
[dev-dependencies]
tempfile = "3.10.1"

View File

@@ -0,0 +1,54 @@
---
minutes: 20
---
# Dining Philosophers
The dining philosophers problem is a classic problem in concurrency:
> Five philosophers dine together at the same table. Each philosopher has their
> own place at the table. There is a fork between each plate. The dish served is
> a kind of spaghetti which has to be eaten with two forks. Each philosopher can
> only alternately think and eat. Moreover, a philosopher can only eat their
> spaghetti when they have both a left and right fork. Thus two forks will only
> be available when their two nearest neighbors are thinking, not eating. After
> an individual philosopher finishes eating, they will put down both forks.
You will need a local [Cargo installation](../../cargo/running-locally.md) for
this exercise. Copy the code below to a file called `src/main.rs`, fill out the
blanks, and test that `cargo run` does not deadlock:
<!-- File src/main.rs -->
```rust,compile_fail
{{#include dining-philosophers.rs:Philosopher}}
// left_fork: ...
// right_fork: ...
// thoughts: ...
}
{{#include dining-philosophers.rs:Philosopher-think}}
{{#include dining-philosophers.rs:Philosopher-eat}}
// Pick up forks...
{{#include dining-philosophers.rs:Philosopher-eat-end}}
// Create forks
// Create philosophers
// Make each of them think and eat 100 times
// Output their thoughts
}
```
You can use the following `Cargo.toml`:
<!-- File Cargo.toml -->
```toml
[package]
name = "dining-philosophers"
version = "0.1.0"
edition = "2021"
```

View File

@@ -0,0 +1,95 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ANCHOR: solution
// ANCHOR: Philosopher
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Fork;
struct Philosopher {
name: String,
// ANCHOR_END: Philosopher
left_fork: Arc<Mutex<Fork>>,
right_fork: Arc<Mutex<Fork>>,
thoughts: mpsc::SyncSender<String>,
}
// ANCHOR: Philosopher-think
impl Philosopher {
fn think(&self) {
self.thoughts
.send(format!("Eureka! {} has a new idea!", &self.name))
.unwrap();
}
// ANCHOR_END: Philosopher-think
// ANCHOR: Philosopher-eat
fn eat(&self) {
// ANCHOR_END: Philosopher-eat
println!("{} is trying to eat", &self.name);
let _left = self.left_fork.lock().unwrap();
let _right = self.right_fork.lock().unwrap();
// ANCHOR: Philosopher-eat-end
println!("{} is eating...", &self.name);
thread::sleep(Duration::from_millis(10));
}
}
static PHILOSOPHERS: &[&str] =
&["Socrates", "Hypatia", "Plato", "Aristotle", "Pythagoras"];
fn main() {
// ANCHOR_END: Philosopher-eat-end
let (tx, rx) = mpsc::sync_channel(10);
let forks = (0..PHILOSOPHERS.len())
.map(|_| Arc::new(Mutex::new(Fork)))
.collect::<Vec<_>>();
for i in 0..forks.len() {
let tx = tx.clone();
let mut left_fork = Arc::clone(&forks[i]);
let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]);
// To avoid a deadlock, we have to break the symmetry
// somewhere. This will swap the forks without deinitializing
// either of them.
if i == forks.len() - 1 {
std::mem::swap(&mut left_fork, &mut right_fork);
}
let philosopher = Philosopher {
name: PHILOSOPHERS[i].to_string(),
thoughts: tx,
left_fork,
right_fork,
};
thread::spawn(move || {
for _ in 0..100 {
philosopher.eat();
philosopher.think();
}
});
}
drop(tx);
for thought in rx {
println!("{thought}");
}
}

View File

@@ -0,0 +1,85 @@
---
minutes: 20
---
# Multi-threaded Link Checker
Let us use our new knowledge to create a multi-threaded link checker. It should
start at a webpage and check that links on the page are valid. It should
recursively check other pages on the same domain and keep doing this until all
pages have been validated.
For this, you will need an HTTP client such as [`reqwest`][1]. You will also
need a way to find links, we can use [`scraper`][2]. Finally, we'll need some
way of handling errors, we will use [`thiserror`][3].
Create a new Cargo project and `reqwest` it as a dependency with:
```shell
cargo new link-checker
cd link-checker
cargo add --features blocking,rustls-tls reqwest
cargo add scraper
cargo add thiserror
```
> If `cargo add` fails with `error: no such subcommand`, then please edit the
> `Cargo.toml` file by hand. Add the dependencies listed below.
The `cargo add` calls will update the `Cargo.toml` file to look like this:
<!-- File Cargo.toml -->
```toml
[package]
name = "link-checker"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
reqwest = { version = "0.11.12", features = ["blocking", "rustls-tls"] }
scraper = "0.13.0"
thiserror = "1.0.37"
```
You can now download the start page. Try with a small site such as
`https://www.google.org/`.
Your `src/main.rs` file should look something like this:
<!-- File src/main.rs -->
```rust,compile_fail
{{#include link-checker.rs:setup}}
{{#include link-checker.rs:visit_page}}
fn main() {
let client = Client::new();
let start_url = Url::parse("https://www.google.org").unwrap();
let crawl_command = CrawlCommand{ url: start_url, extract_links: true };
match visit_page(&client, &crawl_command) {
Ok(links) => println!("Links: {links:#?}"),
Err(err) => println!("Could not extract links: {err:#}"),
}
}
```
Run the code in `src/main.rs` with
```shell
cargo run
```
## Tasks
- Use threads to check the links in parallel: send the URLs to be checked to a
channel and let a few threads check the URLs in parallel.
- Extend this to recursively extract links from all pages on the
`www.google.org` domain. Put an upper limit of 100 pages or so so that you
don't end up being blocked by the site.
[1]: https://docs.rs/reqwest/
[2]: https://docs.rs/scraper/
[3]: https://docs.rs/thiserror/

View File

@@ -0,0 +1,181 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ANCHOR: solution
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
// ANCHOR: setup
use reqwest::blocking::Client;
use reqwest::Url;
use scraper::{Html, Selector};
use thiserror::Error;
#[derive(Error, Debug)]
enum Error {
#[error("request error: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("bad http response: {0}")]
BadResponse(String),
}
// ANCHOR_END: setup
// ANCHOR: visit_page
#[derive(Debug)]
struct CrawlCommand {
url: Url,
extract_links: bool,
}
fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
println!("Checking {:#}", command.url);
let response = client.get(command.url.clone()).send()?;
if !response.status().is_success() {
return Err(Error::BadResponse(response.status().to_string()));
}
let mut link_urls = Vec::new();
if !command.extract_links {
return Ok(link_urls);
}
let base_url = response.url().to_owned();
let body_text = response.text()?;
let document = Html::parse_document(&body_text);
let selector = Selector::parse("a").unwrap();
let href_values = document
.select(&selector)
.filter_map(|element| element.value().attr("href"));
for href in href_values {
match base_url.join(href) {
Ok(link_url) => {
link_urls.push(link_url);
}
Err(err) => {
println!("On {base_url:#}: ignored unparsable {href:?}: {err}");
}
}
}
Ok(link_urls)
}
// ANCHOR_END: visit_page
struct CrawlState {
domain: String,
visited_pages: std::collections::HashSet<String>,
}
impl CrawlState {
fn new(start_url: &Url) -> CrawlState {
let mut visited_pages = std::collections::HashSet::new();
visited_pages.insert(start_url.as_str().to_string());
CrawlState { domain: start_url.domain().unwrap().to_string(), visited_pages }
}
/// Determine whether links within the given page should be extracted.
fn should_extract_links(&self, url: &Url) -> bool {
let Some(url_domain) = url.domain() else {
return false;
};
url_domain == self.domain
}
/// Mark the given page as visited, returning false if it had already
/// been visited.
fn mark_visited(&mut self, url: &Url) -> bool {
self.visited_pages.insert(url.as_str().to_string())
}
}
type CrawlResult = Result<Vec<Url>, (Url, Error)>;
fn spawn_crawler_threads(
command_receiver: mpsc::Receiver<CrawlCommand>,
result_sender: mpsc::Sender<CrawlResult>,
thread_count: u32,
) {
let command_receiver = Arc::new(Mutex::new(command_receiver));
for _ in 0..thread_count {
let result_sender = result_sender.clone();
let command_receiver = command_receiver.clone();
thread::spawn(move || {
let client = Client::new();
loop {
let command_result = {
let receiver_guard = command_receiver.lock().unwrap();
receiver_guard.recv()
};
let Ok(crawl_command) = command_result else {
// The sender got dropped. No more commands coming in.
break;
};
let crawl_result = match visit_page(&client, &crawl_command) {
Ok(link_urls) => Ok(link_urls),
Err(error) => Err((crawl_command.url, error)),
};
result_sender.send(crawl_result).unwrap();
}
});
}
}
fn control_crawl(
start_url: Url,
command_sender: mpsc::Sender<CrawlCommand>,
result_receiver: mpsc::Receiver<CrawlResult>,
) -> Vec<Url> {
let mut crawl_state = CrawlState::new(&start_url);
let start_command = CrawlCommand { url: start_url, extract_links: true };
command_sender.send(start_command).unwrap();
let mut pending_urls = 1;
let mut bad_urls = Vec::new();
while pending_urls > 0 {
let crawl_result = result_receiver.recv().unwrap();
pending_urls -= 1;
match crawl_result {
Ok(link_urls) => {
for url in link_urls {
if crawl_state.mark_visited(&url) {
let extract_links = crawl_state.should_extract_links(&url);
let crawl_command = CrawlCommand { url, extract_links };
command_sender.send(crawl_command).unwrap();
pending_urls += 1;
}
}
}
Err((url, error)) => {
bad_urls.push(url);
println!("Got crawling error: {:#}", error);
continue;
}
}
}
bad_urls
}
fn check_links(start_url: Url) -> Vec<Url> {
let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
spawn_crawler_threads(command_receiver, result_sender, 16);
control_crawl(start_url, command_sender, result_receiver)
}
fn main() {
let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
let bad_urls = check_links(start_url);
println!("Bad URLs: {:#?}", bad_urls);
}

View File

@@ -0,0 +1,17 @@
---
minutes: 30
---
# Solutions
## Dining Philosophers
```rust
{{#include dining-philosophers.rs:solution}}
```
## Link Checker
```rust,compile_fail
{{#include link-checker.rs:solution}}
```

View File

@@ -1,74 +1,3 @@
# Threads
Rust threads work similarly to threads in other languages:
```rust,editable
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("Count in thread: {i}!");
thread::sleep(Duration::from_millis(5));
}
});
for i in 1..5 {
println!("Main thread: {i}");
thread::sleep(Duration::from_millis(5));
}
}
```
- Threads are all daemon threads, the main thread does not wait for them.
- Thread panics are independent of each other.
- Panics can carry a payload, which can be unpacked with `downcast_ref`.
<details>
- Rust thread APIs look not too different from e.g. C++ ones.
- Run the example.
- 5ms timing is loose enough that main and spawned threads stay mostly in
lockstep.
- Notice that the program ends before the spawned thread reaches 10!
- This is because main ends the program and spawned threads do not make it
persist.
- Compare to pthreads/C++ std::thread/boost::thread if desired.
- How do we wait around for the spawned thread to complete?
- [`thread::spawn`] returns a `JoinHandle`. Look at the docs.
- `JoinHandle` has a [`.join()`] method that blocks.
- Use `let handle = thread::spawn(...)` and later `handle.join()` to wait for
the thread to finish and have the program count all the way to 10.
- Now what if we want to return a value?
- Look at docs again:
- [`thread::spawn`]'s closure returns `T`
- `JoinHandle` [`.join()`] returns `thread::Result<T>`
- Use the `Result` return value from `handle.join()` to get access to the
returned value.
- Ok, what about the other case?
- Trigger a panic in the thread. Note that this doesn't panic `main`.
- Access the panic payload. This is a good time to talk about [`Any`].
- Now we can return values from threads! What about taking inputs?
- Capture something by reference in the thread closure.
- An error message indicates we must move it.
- Move it in, see we can compute and then return a derived value.
- If we want to borrow?
- Main kills child threads when it returns, but another function would just
return and leave them running.
- That would be stack use-after-return, which violates memory safety!
- How do we avoid this? see next slide.
[`Any`]: https://doc.rust-lang.org/std/any/index.html
[`thread::spawn`]: https://doc.rust-lang.org/std/thread/fn.spawn.html
[`.join()`]: https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join
</details>
{{%segment outline}}

View File

@@ -0,0 +1,78 @@
---
minutes: 15
---
# Plain Threads
Rust threads work similarly to threads in other languages:
```rust,editable
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("Count in thread: {i}!");
thread::sleep(Duration::from_millis(5));
}
});
for i in 1..5 {
println!("Main thread: {i}");
thread::sleep(Duration::from_millis(5));
}
}
```
- Threads are all daemon threads, the main thread does not wait for them.
- Thread panics are independent of each other.
- Panics can carry a payload, which can be unpacked with `downcast_ref`.
<details>
- Rust thread APIs look not too different from e.g. C++ ones.
- Run the example.
- 5ms timing is loose enough that main and spawned threads stay mostly in
lockstep.
- Notice that the program ends before the spawned thread reaches 10!
- This is because main ends the program and spawned threads do not make it
persist.
- Compare to pthreads/C++ std::thread/boost::thread if desired.
- How do we wait around for the spawned thread to complete?
- [`thread::spawn`] returns a `JoinHandle`. Look at the docs.
- `JoinHandle` has a [`.join()`] method that blocks.
- Use `let handle = thread::spawn(...)` and later `handle.join()` to wait for
the thread to finish and have the program count all the way to 10.
- Now what if we want to return a value?
- Look at docs again:
- [`thread::spawn`]'s closure returns `T`
- `JoinHandle` [`.join()`] returns `thread::Result<T>`
- Use the `Result` return value from `handle.join()` to get access to the
returned value.
- Ok, what about the other case?
- Trigger a panic in the thread. Note that this doesn't panic `main`.
- Access the panic payload. This is a good time to talk about [`Any`].
- Now we can return values from threads! What about taking inputs?
- Capture something by reference in the thread closure.
- An error message indicates we must move it.
- Move it in, see we can compute and then return a derived value.
- If we want to borrow?
- Main kills child threads when it returns, but another function would just
return and leave them running.
- That would be stack use-after-return, which violates memory safety!
- How do we avoid this? see next slide.
[`Any`]: https://doc.rust-lang.org/std/any/index.html
[`thread::spawn`]: https://doc.rust-lang.org/std/thread/fn.spawn.html
[`.join()`]: https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join
</details>

View File

@@ -1,3 +1,7 @@
---
minutes: 13
---
# Scoped Threads
Normal threads cannot borrow from their environment:

View File

@@ -0,0 +1,34 @@
---
session: Afternoon
target_minutes: 180
---
# Welcome
"Async" is a concurrency model where multiple tasks are executed concurrently by
executing each task until it would block, then switching to another task that is
ready to make progress. The model allows running a larger number of tasks on a
limited number of threads. This is because the per-task overhead is typically
very low and operating systems provide primitives for efficiently identifying
I/O that is able to proceed.
Rust's asynchronous operation is based on "futures", which represent work that
may be completed in the future. Futures are "polled" until they signal that they
are complete.
Futures are polled by an async runtime, and several different runtimes are
available.
## Comparisons
- Python has a similar model in its `asyncio`. However, its `Future` type is
callback-based, and not polled. Async Python programs require a "loop",
similar to a runtime in Rust.
- JavaScript's `Promise` is similar, but again callback-based. The language
runtime implements the event loop, so many of the details of Promise
resolution are hidden.
## Schedule
{{%session outline}}

View File

@@ -0,0 +1,28 @@
---
course: Concurrency
session: Morning
target_minutes: 180
---
# Welcome to Concurrency in Rust
Rust has full support for concurrency using OS threads with mutexes and
channels.
The Rust type system plays an important role in making many concurrency bugs
compile time bugs. This is often referred to as _fearless concurrency_ since you
can rely on the compiler to ensure correctness at runtime.
## Schedule
{{%session outline}}
<details>
- Rust lets us access OS concurrency toolkit: threads, sync. primitives, etc.
- The type system gives us safety for concurrency without any special features.
- The same tools that help with "concurrent" access in a single thread (e.g., a
called function that might mutate an argument or save references to it to read
later) save us from multi-threading issues.
</details>