You've already forked comprehensive-rust
mirror of
https://github.com/google/comprehensive-rust.git
synced 2025-08-09 08:48:05 +02:00
Make async code testable
This moves all non-trivial examples in the async part of the course to self-contained Rust files. This ensures that we can test them in CI.
This commit is contained in:
64
Cargo.lock
generated
64
Cargo.lock
generated
@ -136,6 +136,50 @@ version = "1.0.86"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
|
||||
|
||||
[[package]]
|
||||
name = "async"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"futures",
|
||||
"reqwest",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-control-flow"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"futures",
|
||||
"reqwest",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-pitfalls"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"futures",
|
||||
"reqwest",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.80"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.48",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.1.0"
|
||||
@ -704,10 +748,24 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.29"
|
||||
name = "futures"
|
||||
version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
|
||||
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
|
@ -23,6 +23,9 @@ members = [
|
||||
"src/std-traits",
|
||||
"src/std-types",
|
||||
"src/testing",
|
||||
"src/concurrency/async",
|
||||
"src/concurrency/async-control-flow",
|
||||
"src/concurrency/async-pitfalls",
|
||||
"src/tuples-and-arrays",
|
||||
"src/types-and-values",
|
||||
"src/unsafe-rust",
|
||||
|
24
src/concurrency/async-control-flow/Cargo.toml
Normal file
24
src/concurrency/async-control-flow/Cargo.toml
Normal file
@ -0,0 +1,24 @@
|
||||
[package]
|
||||
name = "async-control-flow"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
publish = false
|
||||
|
||||
[[bin]]
|
||||
name = "channels"
|
||||
path = "channels.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "join"
|
||||
path = "join.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "select"
|
||||
path = "select.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.81"
|
||||
async-trait = "0.1.79"
|
||||
futures = { version = "0.3.30", default-features = false }
|
||||
reqwest = { version = "0.12.1", default-features = false }
|
||||
tokio = { version = "1.36.0", features = ["full"] }
|
@ -7,31 +7,7 @@ minutes: 8
|
||||
Several crates have support for asynchronous channels. For instance `tokio`:
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use tokio::sync::mpsc::{self, Receiver};
|
||||
|
||||
async fn ping_handler(mut input: Receiver<()>) {
|
||||
let mut count: usize = 0;
|
||||
|
||||
while let Some(_) = input.recv().await {
|
||||
count += 1;
|
||||
println!("Received {count} pings so far.");
|
||||
}
|
||||
|
||||
println!("ping_handler complete");
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (sender, receiver) = mpsc::channel(32);
|
||||
let ping_handler_task = tokio::spawn(ping_handler(receiver));
|
||||
for i in 0..10 {
|
||||
sender.send(()).await.expect("Failed to send ping.");
|
||||
println!("Sent {} pings so far.", i + 1);
|
||||
}
|
||||
|
||||
drop(sender);
|
||||
ping_handler_task.await.expect("Something went wrong in ping handler task.");
|
||||
}
|
||||
{{#include channels.rs}}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
25
src/concurrency/async-control-flow/channels.rs
Normal file
25
src/concurrency/async-control-flow/channels.rs
Normal file
@ -0,0 +1,25 @@
|
||||
use tokio::sync::mpsc::{self, Receiver};
|
||||
|
||||
async fn ping_handler(mut input: Receiver<()>) {
|
||||
let mut count: usize = 0;
|
||||
|
||||
while let Some(_) = input.recv().await {
|
||||
count += 1;
|
||||
println!("Received {count} pings so far.");
|
||||
}
|
||||
|
||||
println!("ping_handler complete");
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (sender, receiver) = mpsc::channel(32);
|
||||
let ping_handler_task = tokio::spawn(ping_handler(receiver));
|
||||
for i in 0..10 {
|
||||
sender.send(()).await.expect("Failed to send ping.");
|
||||
println!("Sent {} pings so far.", i + 1);
|
||||
}
|
||||
|
||||
drop(sender);
|
||||
ping_handler_task.await.expect("Something went wrong in ping handler task.");
|
||||
}
|
@ -9,30 +9,7 @@ collection of their results. This is similar to `Promise.all` in JavaScript or
|
||||
`asyncio.gather` in Python.
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use anyhow::Result;
|
||||
use futures::future;
|
||||
use reqwest;
|
||||
use std::collections::HashMap;
|
||||
|
||||
async fn size_of_page(url: &str) -> Result<usize> {
|
||||
let resp = reqwest::get(url).await?;
|
||||
Ok(resp.text().await?.len())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let urls: [&str; 4] = [
|
||||
"https://google.com",
|
||||
"https://httpbin.org/ip",
|
||||
"https://play.rust-lang.org/",
|
||||
"BAD_URL",
|
||||
];
|
||||
let futures_iter = urls.into_iter().map(size_of_page);
|
||||
let results = future::join_all(futures_iter).await;
|
||||
let page_sizes_dict: HashMap<&str, Result<usize>> =
|
||||
urls.into_iter().zip(results.into_iter()).collect();
|
||||
println!("{:?}", page_sizes_dict);
|
||||
}
|
||||
{{#include join.rs}}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
24
src/concurrency/async-control-flow/join.rs
Normal file
24
src/concurrency/async-control-flow/join.rs
Normal file
@ -0,0 +1,24 @@
|
||||
use anyhow::Result;
|
||||
use futures::future;
|
||||
use reqwest;
|
||||
use std::collections::HashMap;
|
||||
|
||||
async fn size_of_page(url: &str) -> Result<usize> {
|
||||
let resp = reqwest::get(url).await?;
|
||||
Ok(resp.text().await?.len())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let urls: [&str; 4] = [
|
||||
"https://google.com",
|
||||
"https://httpbin.org/ip",
|
||||
"https://play.rust-lang.org/",
|
||||
"BAD_URL",
|
||||
];
|
||||
let futures_iter = urls.into_iter().map(size_of_page);
|
||||
let results = future::join_all(futures_iter).await;
|
||||
let page_sizes_dict: HashMap<&str, Result<usize>> =
|
||||
urls.into_iter().zip(results.into_iter()).collect();
|
||||
println!("{:?}", page_sizes_dict);
|
||||
}
|
@ -16,44 +16,7 @@ the resulting variables. The `statement` result becomes the result of the
|
||||
`select!` macro.
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use tokio::sync::mpsc::{self, Receiver};
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum Animal {
|
||||
Cat { name: String },
|
||||
Dog { name: String },
|
||||
}
|
||||
|
||||
async fn first_animal_to_finish_race(
|
||||
mut cat_rcv: Receiver<String>,
|
||||
mut dog_rcv: Receiver<String>,
|
||||
) -> Option<Animal> {
|
||||
tokio::select! {
|
||||
cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }),
|
||||
dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? })
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (cat_sender, cat_receiver) = mpsc::channel(32);
|
||||
let (dog_sender, dog_receiver) = mpsc::channel(32);
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
cat_sender.send(String::from("Felix")).await.expect("Failed to send cat.");
|
||||
});
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
dog_sender.send(String::from("Rex")).await.expect("Failed to send dog.");
|
||||
});
|
||||
|
||||
let winner = first_animal_to_finish_race(cat_receiver, dog_receiver)
|
||||
.await
|
||||
.expect("Failed to receive winner");
|
||||
|
||||
println!("Winner is {winner:?}");
|
||||
}
|
||||
{{#include select.rs}}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
38
src/concurrency/async-control-flow/select.rs
Normal file
38
src/concurrency/async-control-flow/select.rs
Normal file
@ -0,0 +1,38 @@
|
||||
use tokio::sync::mpsc::{self, Receiver};
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum Animal {
|
||||
Cat { name: String },
|
||||
Dog { name: String },
|
||||
}
|
||||
|
||||
async fn first_animal_to_finish_race(
|
||||
mut cat_rcv: Receiver<String>,
|
||||
mut dog_rcv: Receiver<String>,
|
||||
) -> Option<Animal> {
|
||||
tokio::select! {
|
||||
cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }),
|
||||
dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? })
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (cat_sender, cat_receiver) = mpsc::channel(32);
|
||||
let (dog_sender, dog_receiver) = mpsc::channel(32);
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
cat_sender.send(String::from("Felix")).await.expect("Failed to send cat.");
|
||||
});
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
dog_sender.send(String::from("Rex")).await.expect("Failed to send dog.");
|
||||
});
|
||||
|
||||
let winner = first_animal_to_finish_race(cat_receiver, dog_receiver)
|
||||
.await
|
||||
.expect("Failed to receive winner");
|
||||
|
||||
println!("Winner is {winner:?}");
|
||||
}
|
24
src/concurrency/async-pitfalls/Cargo.toml
Normal file
24
src/concurrency/async-pitfalls/Cargo.toml
Normal file
@ -0,0 +1,24 @@
|
||||
[package]
|
||||
name = "async-pitfalls"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
publish = false
|
||||
|
||||
[[bin]]
|
||||
name = "async-traits"
|
||||
path = "async-traits.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "cancellation"
|
||||
path = "cancellation.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "pin"
|
||||
path = "pin.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.81"
|
||||
async-trait = "0.1.79"
|
||||
futures = { version = "0.3.30", default-features = false }
|
||||
reqwest = { version = "0.12.1", default-features = false }
|
||||
tokio = { version = "1.36.0", features = ["full"] }
|
@ -20,48 +20,7 @@ The [async_trait] crate provides a workaround for `dyn` support through a macro,
|
||||
with some caveats:
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use async_trait::async_trait;
|
||||
use std::time::Instant;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
#[async_trait]
|
||||
trait Sleeper {
|
||||
async fn sleep(&self);
|
||||
}
|
||||
|
||||
struct FixedSleeper {
|
||||
sleep_ms: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Sleeper for FixedSleeper {
|
||||
async fn sleep(&self) {
|
||||
sleep(Duration::from_millis(self.sleep_ms)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_all_sleepers_multiple_times(
|
||||
sleepers: Vec<Box<dyn Sleeper>>,
|
||||
n_times: usize,
|
||||
) {
|
||||
for _ in 0..n_times {
|
||||
println!("Running all sleepers...");
|
||||
for sleeper in &sleepers {
|
||||
let start = Instant::now();
|
||||
sleeper.sleep().await;
|
||||
println!("Slept for {} ms", start.elapsed().as_millis());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let sleepers: Vec<Box<dyn Sleeper>> = vec![
|
||||
Box::new(FixedSleeper { sleep_ms: 50 }),
|
||||
Box::new(FixedSleeper { sleep_ms: 100 }),
|
||||
];
|
||||
run_all_sleepers_multiple_times(sleepers, 5).await;
|
||||
}
|
||||
{{#include async-traits.rs}}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
42
src/concurrency/async-pitfalls/async-traits.rs
Normal file
42
src/concurrency/async-pitfalls/async-traits.rs
Normal file
@ -0,0 +1,42 @@
|
||||
use async_trait::async_trait;
|
||||
use std::time::Instant;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
#[async_trait]
|
||||
trait Sleeper {
|
||||
async fn sleep(&self);
|
||||
}
|
||||
|
||||
struct FixedSleeper {
|
||||
sleep_ms: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Sleeper for FixedSleeper {
|
||||
async fn sleep(&self) {
|
||||
sleep(Duration::from_millis(self.sleep_ms)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_all_sleepers_multiple_times(
|
||||
sleepers: Vec<Box<dyn Sleeper>>,
|
||||
n_times: usize,
|
||||
) {
|
||||
for _ in 0..n_times {
|
||||
println!("Running all sleepers...");
|
||||
for sleeper in &sleepers {
|
||||
let start = Instant::now();
|
||||
sleeper.sleep().await;
|
||||
println!("Slept for {} ms", start.elapsed().as_millis());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let sleepers: Vec<Box<dyn Sleeper>> = vec![
|
||||
Box::new(FixedSleeper { sleep_ms: 50 }),
|
||||
Box::new(FixedSleeper { sleep_ms: 100 }),
|
||||
];
|
||||
run_all_sleepers_multiple_times(sleepers, 5).await;
|
||||
}
|
@ -10,65 +10,7 @@ the system works correctly even when futures are cancelled. For example, it
|
||||
shouldn't deadlock or lose data.
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
|
||||
|
||||
struct LinesReader {
|
||||
stream: DuplexStream,
|
||||
}
|
||||
|
||||
impl LinesReader {
|
||||
fn new(stream: DuplexStream) -> Self {
|
||||
Self { stream }
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> io::Result<Option<String>> {
|
||||
let mut bytes = Vec::new();
|
||||
let mut buf = [0];
|
||||
while self.stream.read(&mut buf[..]).await? != 0 {
|
||||
bytes.push(buf[0]);
|
||||
if buf[0] == b'\n' {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if bytes.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let s = String::from_utf8(bytes)
|
||||
.map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
|
||||
Ok(Some(s))
|
||||
}
|
||||
}
|
||||
|
||||
async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
|
||||
for b in source.bytes() {
|
||||
dest.write_u8(b).await?;
|
||||
tokio::time::sleep(Duration::from_millis(10)).await
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let (client, server) = tokio::io::duplex(5);
|
||||
let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));
|
||||
|
||||
let mut lines = LinesReader::new(server);
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(60));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => println!("tick!"),
|
||||
line = lines.next() => if let Some(l) = line? {
|
||||
print!("{}", l)
|
||||
} else {
|
||||
break
|
||||
},
|
||||
}
|
||||
}
|
||||
handle.await.unwrap()?;
|
||||
Ok(())
|
||||
}
|
||||
{{#include cancellation.rs}}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
59
src/concurrency/async-pitfalls/cancellation.rs
Normal file
59
src/concurrency/async-pitfalls/cancellation.rs
Normal file
@ -0,0 +1,59 @@
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
|
||||
|
||||
struct LinesReader {
|
||||
stream: DuplexStream,
|
||||
}
|
||||
|
||||
impl LinesReader {
|
||||
fn new(stream: DuplexStream) -> Self {
|
||||
Self { stream }
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> io::Result<Option<String>> {
|
||||
let mut bytes = Vec::new();
|
||||
let mut buf = [0];
|
||||
while self.stream.read(&mut buf[..]).await? != 0 {
|
||||
bytes.push(buf[0]);
|
||||
if buf[0] == b'\n' {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if bytes.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let s = String::from_utf8(bytes)
|
||||
.map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
|
||||
Ok(Some(s))
|
||||
}
|
||||
}
|
||||
|
||||
async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
|
||||
for b in source.bytes() {
|
||||
dest.write_u8(b).await?;
|
||||
tokio::time::sleep(Duration::from_millis(10)).await
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let (client, server) = tokio::io::duplex(5);
|
||||
let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));
|
||||
|
||||
let mut lines = LinesReader::new(server);
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(60));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => println!("tick!"),
|
||||
line = lines.next() => if let Some(l) = line? {
|
||||
print!("{}", l)
|
||||
} else {
|
||||
break
|
||||
},
|
||||
}
|
||||
}
|
||||
handle.await.unwrap()?;
|
||||
Ok(())
|
||||
}
|
@ -18,54 +18,7 @@ operations that would move the instance it points to into a different memory
|
||||
location.
|
||||
|
||||
```rust,editable,compile_fail
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::task::spawn;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
// A work item. In this case, just sleep for the given time and respond
|
||||
// with a message on the `respond_on` channel.
|
||||
#[derive(Debug)]
|
||||
struct Work {
|
||||
input: u32,
|
||||
respond_on: oneshot::Sender<u32>,
|
||||
}
|
||||
|
||||
// A worker which listens for work on a queue and performs it.
|
||||
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
|
||||
let mut iterations = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(work) = work_queue.recv() => {
|
||||
sleep(Duration::from_millis(10)).await; // Pretend to work.
|
||||
work.respond_on
|
||||
.send(work.input * 1000)
|
||||
.expect("failed to send response");
|
||||
iterations += 1;
|
||||
}
|
||||
// TODO: report number of iterations every 100ms
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A requester which requests work and waits for it to complete.
|
||||
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
work_queue
|
||||
.send(Work { input, respond_on: tx })
|
||||
.await
|
||||
.expect("failed to send on work queue");
|
||||
rx.await.expect("failed waiting for response")
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
spawn(worker(rx));
|
||||
for i in 0..100 {
|
||||
let resp = do_work(&tx, i).await;
|
||||
println!("work result for iteration {i}: {resp}");
|
||||
}
|
||||
}
|
||||
{{#include pin.rs}}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
48
src/concurrency/async-pitfalls/pin.rs
Normal file
48
src/concurrency/async-pitfalls/pin.rs
Normal file
@ -0,0 +1,48 @@
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::task::spawn;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
// A work item. In this case, just sleep for the given time and respond
|
||||
// with a message on the `respond_on` channel.
|
||||
#[derive(Debug)]
|
||||
struct Work {
|
||||
input: u32,
|
||||
respond_on: oneshot::Sender<u32>,
|
||||
}
|
||||
|
||||
// A worker which listens for work on a queue and performs it.
|
||||
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
|
||||
let mut _iterations = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(work) = work_queue.recv() => {
|
||||
sleep(Duration::from_millis(10)).await; // Pretend to work.
|
||||
work.respond_on
|
||||
.send(work.input * 1000)
|
||||
.expect("failed to send response");
|
||||
_iterations += 1;
|
||||
}
|
||||
// TODO: report number of iterations every 100ms
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A requester which requests work and waits for it to complete.
|
||||
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
work_queue
|
||||
.send(Work { input, respond_on: tx })
|
||||
.await
|
||||
.expect("failed to send on work queue");
|
||||
rx.await.expect("failed waiting for response")
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
spawn(worker(rx));
|
||||
for i in 0..100 {
|
||||
let resp = do_work(&tx, i).await;
|
||||
println!("work result for iteration {i}: {resp}");
|
||||
}
|
||||
}
|
16
src/concurrency/async/Cargo.toml
Normal file
16
src/concurrency/async/Cargo.toml
Normal file
@ -0,0 +1,16 @@
|
||||
[package]
|
||||
name = "async"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
publish = false
|
||||
|
||||
[[bin]]
|
||||
name = "tasks"
|
||||
path = "tasks.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.81"
|
||||
async-trait = "0.1.79"
|
||||
futures = { version = "0.3.30", default-features = false }
|
||||
reqwest = { version = "0.12.1", default-features = false }
|
||||
tokio = { version = "1.36.0", features = ["full"] }
|
@ -12,30 +12,7 @@ corresponding loosely to a call stack. Concurrency within a task is possible by
|
||||
polling multiple child futures, such as racing a timer and an I/O operation.
|
||||
|
||||
```rust,compile_fail
|
||||
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
println!("listening on port {}", listener.local_addr()?.port());
|
||||
|
||||
loop {
|
||||
let (mut socket, addr) = listener.accept().await?;
|
||||
|
||||
println!("connection from {addr:?}");
|
||||
|
||||
tokio::spawn(async move {
|
||||
socket.write_all(b"Who are you?\n").await.expect("socket error");
|
||||
|
||||
let mut buf = vec![0; 1024];
|
||||
let name_size = socket.read(&mut buf).await.expect("socket error");
|
||||
let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim();
|
||||
let reply = format!("Thanks for dialing in, {name}!\n");
|
||||
socket.write_all(reply.as_bytes()).await.expect("socket error");
|
||||
});
|
||||
}
|
||||
}
|
||||
{{#include tasks.rs}}
|
||||
```
|
||||
|
||||
<details>
|
||||
|
24
src/concurrency/async/tasks.rs
Normal file
24
src/concurrency/async/tasks.rs
Normal file
@ -0,0 +1,24 @@
|
||||
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
println!("listening on port {}", listener.local_addr()?.port());
|
||||
|
||||
loop {
|
||||
let (mut socket, addr) = listener.accept().await?;
|
||||
|
||||
println!("connection from {addr:?}");
|
||||
|
||||
tokio::spawn(async move {
|
||||
socket.write_all(b"Who are you?\n").await.expect("socket error");
|
||||
|
||||
let mut buf = vec![0; 1024];
|
||||
let name_size = socket.read(&mut buf).await.expect("socket error");
|
||||
let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim();
|
||||
let reply = format!("Thanks for dialing in, {name}!\n");
|
||||
socket.write_all(reply.as_bytes()).await.expect("socket error");
|
||||
});
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user