From ba7821b2c26cafe68170d93d1556dad1064ddb4b Mon Sep 17 00:00:00 2001 From: Martin Geisler Date: Sun, 24 Mar 2024 20:46:03 +0000 Subject: [PATCH] Make async code testable This moves all non-trivial examples in the async part of the course to self-contained Rust files. This ensures that we can test them in CI. --- Cargo.lock | 64 ++++++++++++++++++- Cargo.toml | 3 + src/concurrency/async-control-flow/Cargo.toml | 24 +++++++ .../async-control-flow/channels.md | 26 +------- .../async-control-flow/channels.rs | 25 ++++++++ src/concurrency/async-control-flow/join.md | 25 +------- src/concurrency/async-control-flow/join.rs | 24 +++++++ src/concurrency/async-control-flow/select.md | 39 +---------- src/concurrency/async-control-flow/select.rs | 38 +++++++++++ src/concurrency/async-pitfalls/Cargo.toml | 24 +++++++ .../async-pitfalls/async-traits.md | 43 +------------ .../async-pitfalls/async-traits.rs | 42 ++++++++++++ .../async-pitfalls/cancellation.md | 60 +---------------- .../async-pitfalls/cancellation.rs | 59 +++++++++++++++++ src/concurrency/async-pitfalls/pin.md | 49 +------------- src/concurrency/async-pitfalls/pin.rs | 48 ++++++++++++++ src/concurrency/async/Cargo.toml | 16 +++++ src/concurrency/async/tasks.md | 25 +------- src/concurrency/async/tasks.rs | 24 +++++++ 19 files changed, 395 insertions(+), 263 deletions(-) create mode 100644 src/concurrency/async-control-flow/Cargo.toml create mode 100644 src/concurrency/async-control-flow/channels.rs create mode 100644 src/concurrency/async-control-flow/join.rs create mode 100644 src/concurrency/async-control-flow/select.rs create mode 100644 src/concurrency/async-pitfalls/Cargo.toml create mode 100644 src/concurrency/async-pitfalls/async-traits.rs create mode 100644 src/concurrency/async-pitfalls/cancellation.rs create mode 100644 src/concurrency/async-pitfalls/pin.rs create mode 100644 src/concurrency/async/Cargo.toml create mode 100644 src/concurrency/async/tasks.rs diff --git a/Cargo.lock b/Cargo.lock index e4848d43..3154677f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -136,6 +136,50 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "async" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "reqwest", + "tokio", +] + +[[package]] +name = "async-control-flow" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "reqwest", + "tokio", +] + +[[package]] +name = "async-pitfalls" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "reqwest", + "tokio", +] + +[[package]] +name = "async-trait" +version = "0.1.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -704,10 +748,24 @@ dependencies = [ ] [[package]] -name = "futures-channel" -version = "0.3.29" +name = "futures" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", diff --git a/Cargo.toml b/Cargo.toml index 3a12574a..b33c0d92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,9 @@ members = [ "src/std-traits", "src/std-types", "src/testing", + "src/concurrency/async", + "src/concurrency/async-control-flow", + "src/concurrency/async-pitfalls", "src/tuples-and-arrays", "src/types-and-values", "src/unsafe-rust", diff --git a/src/concurrency/async-control-flow/Cargo.toml b/src/concurrency/async-control-flow/Cargo.toml new file mode 100644 index 00000000..a0b13413 --- /dev/null +++ b/src/concurrency/async-control-flow/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "async-control-flow" +version = "0.1.0" +edition = "2021" +publish = false + +[[bin]] +name = "channels" +path = "channels.rs" + +[[bin]] +name = "join" +path = "join.rs" + +[[bin]] +name = "select" +path = "select.rs" + +[dependencies] +anyhow = "1.0.81" +async-trait = "0.1.79" +futures = { version = "0.3.30", default-features = false } +reqwest = { version = "0.12.1", default-features = false } +tokio = { version = "1.36.0", features = ["full"] } diff --git a/src/concurrency/async-control-flow/channels.md b/src/concurrency/async-control-flow/channels.md index 72678464..497b69f5 100644 --- a/src/concurrency/async-control-flow/channels.md +++ b/src/concurrency/async-control-flow/channels.md @@ -7,31 +7,7 @@ minutes: 8 Several crates have support for asynchronous channels. For instance `tokio`: ```rust,editable,compile_fail -use tokio::sync::mpsc::{self, Receiver}; - -async fn ping_handler(mut input: Receiver<()>) { - let mut count: usize = 0; - - while let Some(_) = input.recv().await { - count += 1; - println!("Received {count} pings so far."); - } - - println!("ping_handler complete"); -} - -#[tokio::main] -async fn main() { - let (sender, receiver) = mpsc::channel(32); - let ping_handler_task = tokio::spawn(ping_handler(receiver)); - for i in 0..10 { - sender.send(()).await.expect("Failed to send ping."); - println!("Sent {} pings so far.", i + 1); - } - - drop(sender); - ping_handler_task.await.expect("Something went wrong in ping handler task."); -} +{{#include channels.rs}} ```
diff --git a/src/concurrency/async-control-flow/channels.rs b/src/concurrency/async-control-flow/channels.rs new file mode 100644 index 00000000..3aefa055 --- /dev/null +++ b/src/concurrency/async-control-flow/channels.rs @@ -0,0 +1,25 @@ +use tokio::sync::mpsc::{self, Receiver}; + +async fn ping_handler(mut input: Receiver<()>) { + let mut count: usize = 0; + + while let Some(_) = input.recv().await { + count += 1; + println!("Received {count} pings so far."); + } + + println!("ping_handler complete"); +} + +#[tokio::main] +async fn main() { + let (sender, receiver) = mpsc::channel(32); + let ping_handler_task = tokio::spawn(ping_handler(receiver)); + for i in 0..10 { + sender.send(()).await.expect("Failed to send ping."); + println!("Sent {} pings so far.", i + 1); + } + + drop(sender); + ping_handler_task.await.expect("Something went wrong in ping handler task."); +} diff --git a/src/concurrency/async-control-flow/join.md b/src/concurrency/async-control-flow/join.md index 6aa06e6d..5962120a 100644 --- a/src/concurrency/async-control-flow/join.md +++ b/src/concurrency/async-control-flow/join.md @@ -9,30 +9,7 @@ collection of their results. This is similar to `Promise.all` in JavaScript or `asyncio.gather` in Python. ```rust,editable,compile_fail -use anyhow::Result; -use futures::future; -use reqwest; -use std::collections::HashMap; - -async fn size_of_page(url: &str) -> Result { - let resp = reqwest::get(url).await?; - Ok(resp.text().await?.len()) -} - -#[tokio::main] -async fn main() { - let urls: [&str; 4] = [ - "https://google.com", - "https://httpbin.org/ip", - "https://play.rust-lang.org/", - "BAD_URL", - ]; - let futures_iter = urls.into_iter().map(size_of_page); - let results = future::join_all(futures_iter).await; - let page_sizes_dict: HashMap<&str, Result> = - urls.into_iter().zip(results.into_iter()).collect(); - println!("{:?}", page_sizes_dict); -} +{{#include join.rs}} ```
diff --git a/src/concurrency/async-control-flow/join.rs b/src/concurrency/async-control-flow/join.rs new file mode 100644 index 00000000..2fae1c87 --- /dev/null +++ b/src/concurrency/async-control-flow/join.rs @@ -0,0 +1,24 @@ +use anyhow::Result; +use futures::future; +use reqwest; +use std::collections::HashMap; + +async fn size_of_page(url: &str) -> Result { + let resp = reqwest::get(url).await?; + Ok(resp.text().await?.len()) +} + +#[tokio::main] +async fn main() { + let urls: [&str; 4] = [ + "https://google.com", + "https://httpbin.org/ip", + "https://play.rust-lang.org/", + "BAD_URL", + ]; + let futures_iter = urls.into_iter().map(size_of_page); + let results = future::join_all(futures_iter).await; + let page_sizes_dict: HashMap<&str, Result> = + urls.into_iter().zip(results.into_iter()).collect(); + println!("{:?}", page_sizes_dict); +} diff --git a/src/concurrency/async-control-flow/select.md b/src/concurrency/async-control-flow/select.md index b3df5c2f..257d37be 100644 --- a/src/concurrency/async-control-flow/select.md +++ b/src/concurrency/async-control-flow/select.md @@ -16,44 +16,7 @@ the resulting variables. The `statement` result becomes the result of the `select!` macro. ```rust,editable,compile_fail -use tokio::sync::mpsc::{self, Receiver}; -use tokio::time::{sleep, Duration}; - -#[derive(Debug, PartialEq)] -enum Animal { - Cat { name: String }, - Dog { name: String }, -} - -async fn first_animal_to_finish_race( - mut cat_rcv: Receiver, - mut dog_rcv: Receiver, -) -> Option { - tokio::select! { - cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }), - dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? }) - } -} - -#[tokio::main] -async fn main() { - let (cat_sender, cat_receiver) = mpsc::channel(32); - let (dog_sender, dog_receiver) = mpsc::channel(32); - tokio::spawn(async move { - sleep(Duration::from_millis(500)).await; - cat_sender.send(String::from("Felix")).await.expect("Failed to send cat."); - }); - tokio::spawn(async move { - sleep(Duration::from_millis(50)).await; - dog_sender.send(String::from("Rex")).await.expect("Failed to send dog."); - }); - - let winner = first_animal_to_finish_race(cat_receiver, dog_receiver) - .await - .expect("Failed to receive winner"); - - println!("Winner is {winner:?}"); -} +{{#include select.rs}} ```
diff --git a/src/concurrency/async-control-flow/select.rs b/src/concurrency/async-control-flow/select.rs new file mode 100644 index 00000000..69f2e8d3 --- /dev/null +++ b/src/concurrency/async-control-flow/select.rs @@ -0,0 +1,38 @@ +use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::{sleep, Duration}; + +#[derive(Debug, PartialEq)] +enum Animal { + Cat { name: String }, + Dog { name: String }, +} + +async fn first_animal_to_finish_race( + mut cat_rcv: Receiver, + mut dog_rcv: Receiver, +) -> Option { + tokio::select! { + cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }), + dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? }) + } +} + +#[tokio::main] +async fn main() { + let (cat_sender, cat_receiver) = mpsc::channel(32); + let (dog_sender, dog_receiver) = mpsc::channel(32); + tokio::spawn(async move { + sleep(Duration::from_millis(500)).await; + cat_sender.send(String::from("Felix")).await.expect("Failed to send cat."); + }); + tokio::spawn(async move { + sleep(Duration::from_millis(50)).await; + dog_sender.send(String::from("Rex")).await.expect("Failed to send dog."); + }); + + let winner = first_animal_to_finish_race(cat_receiver, dog_receiver) + .await + .expect("Failed to receive winner"); + + println!("Winner is {winner:?}"); +} diff --git a/src/concurrency/async-pitfalls/Cargo.toml b/src/concurrency/async-pitfalls/Cargo.toml new file mode 100644 index 00000000..a14bb6d5 --- /dev/null +++ b/src/concurrency/async-pitfalls/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "async-pitfalls" +version = "0.1.0" +edition = "2021" +publish = false + +[[bin]] +name = "async-traits" +path = "async-traits.rs" + +[[bin]] +name = "cancellation" +path = "cancellation.rs" + +[[bin]] +name = "pin" +path = "pin.rs" + +[dependencies] +anyhow = "1.0.81" +async-trait = "0.1.79" +futures = { version = "0.3.30", default-features = false } +reqwest = { version = "0.12.1", default-features = false } +tokio = { version = "1.36.0", features = ["full"] } diff --git a/src/concurrency/async-pitfalls/async-traits.md b/src/concurrency/async-pitfalls/async-traits.md index 8d552052..00140aff 100644 --- a/src/concurrency/async-pitfalls/async-traits.md +++ b/src/concurrency/async-pitfalls/async-traits.md @@ -20,48 +20,7 @@ The [async_trait] crate provides a workaround for `dyn` support through a macro, with some caveats: ```rust,editable,compile_fail -use async_trait::async_trait; -use std::time::Instant; -use tokio::time::{sleep, Duration}; - -#[async_trait] -trait Sleeper { - async fn sleep(&self); -} - -struct FixedSleeper { - sleep_ms: u64, -} - -#[async_trait] -impl Sleeper for FixedSleeper { - async fn sleep(&self) { - sleep(Duration::from_millis(self.sleep_ms)).await; - } -} - -async fn run_all_sleepers_multiple_times( - sleepers: Vec>, - n_times: usize, -) { - for _ in 0..n_times { - println!("Running all sleepers..."); - for sleeper in &sleepers { - let start = Instant::now(); - sleeper.sleep().await; - println!("Slept for {} ms", start.elapsed().as_millis()); - } - } -} - -#[tokio::main] -async fn main() { - let sleepers: Vec> = vec![ - Box::new(FixedSleeper { sleep_ms: 50 }), - Box::new(FixedSleeper { sleep_ms: 100 }), - ]; - run_all_sleepers_multiple_times(sleepers, 5).await; -} +{{#include async-traits.rs}} ```
diff --git a/src/concurrency/async-pitfalls/async-traits.rs b/src/concurrency/async-pitfalls/async-traits.rs new file mode 100644 index 00000000..af50c89b --- /dev/null +++ b/src/concurrency/async-pitfalls/async-traits.rs @@ -0,0 +1,42 @@ +use async_trait::async_trait; +use std::time::Instant; +use tokio::time::{sleep, Duration}; + +#[async_trait] +trait Sleeper { + async fn sleep(&self); +} + +struct FixedSleeper { + sleep_ms: u64, +} + +#[async_trait] +impl Sleeper for FixedSleeper { + async fn sleep(&self) { + sleep(Duration::from_millis(self.sleep_ms)).await; + } +} + +async fn run_all_sleepers_multiple_times( + sleepers: Vec>, + n_times: usize, +) { + for _ in 0..n_times { + println!("Running all sleepers..."); + for sleeper in &sleepers { + let start = Instant::now(); + sleeper.sleep().await; + println!("Slept for {} ms", start.elapsed().as_millis()); + } + } +} + +#[tokio::main] +async fn main() { + let sleepers: Vec> = vec![ + Box::new(FixedSleeper { sleep_ms: 50 }), + Box::new(FixedSleeper { sleep_ms: 100 }), + ]; + run_all_sleepers_multiple_times(sleepers, 5).await; +} diff --git a/src/concurrency/async-pitfalls/cancellation.md b/src/concurrency/async-pitfalls/cancellation.md index 66fae4e6..adb258ab 100644 --- a/src/concurrency/async-pitfalls/cancellation.md +++ b/src/concurrency/async-pitfalls/cancellation.md @@ -10,65 +10,7 @@ the system works correctly even when futures are cancelled. For example, it shouldn't deadlock or lose data. ```rust,editable,compile_fail -use std::io::{self, ErrorKind}; -use std::time::Duration; -use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; - -struct LinesReader { - stream: DuplexStream, -} - -impl LinesReader { - fn new(stream: DuplexStream) -> Self { - Self { stream } - } - - async fn next(&mut self) -> io::Result> { - let mut bytes = Vec::new(); - let mut buf = [0]; - while self.stream.read(&mut buf[..]).await? != 0 { - bytes.push(buf[0]); - if buf[0] == b'\n' { - break; - } - } - if bytes.is_empty() { - return Ok(None); - } - let s = String::from_utf8(bytes) - .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?; - Ok(Some(s)) - } -} - -async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> { - for b in source.bytes() { - dest.write_u8(b).await?; - tokio::time::sleep(Duration::from_millis(10)).await - } - Ok(()) -} - -#[tokio::main] -async fn main() -> std::io::Result<()> { - let (client, server) = tokio::io::duplex(5); - let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client)); - - let mut lines = LinesReader::new(server); - let mut interval = tokio::time::interval(Duration::from_millis(60)); - loop { - tokio::select! { - _ = interval.tick() => println!("tick!"), - line = lines.next() => if let Some(l) = line? { - print!("{}", l) - } else { - break - }, - } - } - handle.await.unwrap()?; - Ok(()) -} +{{#include cancellation.rs}} ```
diff --git a/src/concurrency/async-pitfalls/cancellation.rs b/src/concurrency/async-pitfalls/cancellation.rs new file mode 100644 index 00000000..d771e6ee --- /dev/null +++ b/src/concurrency/async-pitfalls/cancellation.rs @@ -0,0 +1,59 @@ +use std::io::{self, ErrorKind}; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; + +struct LinesReader { + stream: DuplexStream, +} + +impl LinesReader { + fn new(stream: DuplexStream) -> Self { + Self { stream } + } + + async fn next(&mut self) -> io::Result> { + let mut bytes = Vec::new(); + let mut buf = [0]; + while self.stream.read(&mut buf[..]).await? != 0 { + bytes.push(buf[0]); + if buf[0] == b'\n' { + break; + } + } + if bytes.is_empty() { + return Ok(None); + } + let s = String::from_utf8(bytes) + .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?; + Ok(Some(s)) + } +} + +async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> { + for b in source.bytes() { + dest.write_u8(b).await?; + tokio::time::sleep(Duration::from_millis(10)).await + } + Ok(()) +} + +#[tokio::main] +async fn main() -> std::io::Result<()> { + let (client, server) = tokio::io::duplex(5); + let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client)); + + let mut lines = LinesReader::new(server); + let mut interval = tokio::time::interval(Duration::from_millis(60)); + loop { + tokio::select! { + _ = interval.tick() => println!("tick!"), + line = lines.next() => if let Some(l) = line? { + print!("{}", l) + } else { + break + }, + } + } + handle.await.unwrap()?; + Ok(()) +} diff --git a/src/concurrency/async-pitfalls/pin.md b/src/concurrency/async-pitfalls/pin.md index fc764a8a..85be24ac 100644 --- a/src/concurrency/async-pitfalls/pin.md +++ b/src/concurrency/async-pitfalls/pin.md @@ -18,54 +18,7 @@ operations that would move the instance it points to into a different memory location. ```rust,editable,compile_fail -use tokio::sync::{mpsc, oneshot}; -use tokio::task::spawn; -use tokio::time::{sleep, Duration}; - -// A work item. In this case, just sleep for the given time and respond -// with a message on the `respond_on` channel. -#[derive(Debug)] -struct Work { - input: u32, - respond_on: oneshot::Sender, -} - -// A worker which listens for work on a queue and performs it. -async fn worker(mut work_queue: mpsc::Receiver) { - let mut iterations = 0; - loop { - tokio::select! { - Some(work) = work_queue.recv() => { - sleep(Duration::from_millis(10)).await; // Pretend to work. - work.respond_on - .send(work.input * 1000) - .expect("failed to send response"); - iterations += 1; - } - // TODO: report number of iterations every 100ms - } - } -} - -// A requester which requests work and waits for it to complete. -async fn do_work(work_queue: &mpsc::Sender, input: u32) -> u32 { - let (tx, rx) = oneshot::channel(); - work_queue - .send(Work { input, respond_on: tx }) - .await - .expect("failed to send on work queue"); - rx.await.expect("failed waiting for response") -} - -#[tokio::main] -async fn main() { - let (tx, rx) = mpsc::channel(10); - spawn(worker(rx)); - for i in 0..100 { - let resp = do_work(&tx, i).await; - println!("work result for iteration {i}: {resp}"); - } -} +{{#include pin.rs}} ```
diff --git a/src/concurrency/async-pitfalls/pin.rs b/src/concurrency/async-pitfalls/pin.rs new file mode 100644 index 00000000..69efec6f --- /dev/null +++ b/src/concurrency/async-pitfalls/pin.rs @@ -0,0 +1,48 @@ +use tokio::sync::{mpsc, oneshot}; +use tokio::task::spawn; +use tokio::time::{sleep, Duration}; + +// A work item. In this case, just sleep for the given time and respond +// with a message on the `respond_on` channel. +#[derive(Debug)] +struct Work { + input: u32, + respond_on: oneshot::Sender, +} + +// A worker which listens for work on a queue and performs it. +async fn worker(mut work_queue: mpsc::Receiver) { + let mut _iterations = 0; + loop { + tokio::select! { + Some(work) = work_queue.recv() => { + sleep(Duration::from_millis(10)).await; // Pretend to work. + work.respond_on + .send(work.input * 1000) + .expect("failed to send response"); + _iterations += 1; + } + // TODO: report number of iterations every 100ms + } + } +} + +// A requester which requests work and waits for it to complete. +async fn do_work(work_queue: &mpsc::Sender, input: u32) -> u32 { + let (tx, rx) = oneshot::channel(); + work_queue + .send(Work { input, respond_on: tx }) + .await + .expect("failed to send on work queue"); + rx.await.expect("failed waiting for response") +} + +#[tokio::main] +async fn main() { + let (tx, rx) = mpsc::channel(10); + spawn(worker(rx)); + for i in 0..100 { + let resp = do_work(&tx, i).await; + println!("work result for iteration {i}: {resp}"); + } +} diff --git a/src/concurrency/async/Cargo.toml b/src/concurrency/async/Cargo.toml new file mode 100644 index 00000000..f60f74e2 --- /dev/null +++ b/src/concurrency/async/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "async" +version = "0.1.0" +edition = "2021" +publish = false + +[[bin]] +name = "tasks" +path = "tasks.rs" + +[dependencies] +anyhow = "1.0.81" +async-trait = "0.1.79" +futures = { version = "0.3.30", default-features = false } +reqwest = { version = "0.12.1", default-features = false } +tokio = { version = "1.36.0", features = ["full"] } diff --git a/src/concurrency/async/tasks.md b/src/concurrency/async/tasks.md index 2c23a006..2941cd4a 100644 --- a/src/concurrency/async/tasks.md +++ b/src/concurrency/async/tasks.md @@ -12,30 +12,7 @@ corresponding loosely to a call stack. Concurrency within a task is possible by polling multiple child futures, such as racing a timer and an I/O operation. ```rust,compile_fail -use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpListener; - -#[tokio::main] -async fn main() -> io::Result<()> { - let listener = TcpListener::bind("127.0.0.1:0").await?; - println!("listening on port {}", listener.local_addr()?.port()); - - loop { - let (mut socket, addr) = listener.accept().await?; - - println!("connection from {addr:?}"); - - tokio::spawn(async move { - socket.write_all(b"Who are you?\n").await.expect("socket error"); - - let mut buf = vec![0; 1024]; - let name_size = socket.read(&mut buf).await.expect("socket error"); - let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim(); - let reply = format!("Thanks for dialing in, {name}!\n"); - socket.write_all(reply.as_bytes()).await.expect("socket error"); - }); - } -} +{{#include tasks.rs}} ```
diff --git a/src/concurrency/async/tasks.rs b/src/concurrency/async/tasks.rs new file mode 100644 index 00000000..14b1f863 --- /dev/null +++ b/src/concurrency/async/tasks.rs @@ -0,0 +1,24 @@ +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +#[tokio::main] +async fn main() -> io::Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + println!("listening on port {}", listener.local_addr()?.port()); + + loop { + let (mut socket, addr) = listener.accept().await?; + + println!("connection from {addr:?}"); + + tokio::spawn(async move { + socket.write_all(b"Who are you?\n").await.expect("socket error"); + + let mut buf = vec![0; 1024]; + let name_size = socket.read(&mut buf).await.expect("socket error"); + let name = std::str::from_utf8(&buf[..name_size]).unwrap().trim(); + let reply = format!("Thanks for dialing in, {name}!\n"); + socket.write_all(reply.as_bytes()).await.expect("socket error"); + }); + } +}