diff --git a/src/concurrency/async-pitfalls/pin.rs b/src/concurrency/async-pitfalls/pin.rs index 69efec6f..6e03174a 100644 --- a/src/concurrency/async-pitfalls/pin.rs +++ b/src/concurrency/async-pitfalls/pin.rs @@ -1,25 +1,20 @@ +use anyhow::Result; use tokio::sync::{mpsc, oneshot}; -use tokio::task::spawn; use tokio::time::{sleep, Duration}; -// A work item. In this case, just sleep for the given time and respond -// with a message on the `respond_on` channel. #[derive(Debug)] struct Work { input: u32, respond_on: oneshot::Sender, } -// A worker which listens for work on a queue and performs it. async fn worker(mut work_queue: mpsc::Receiver) { let mut _iterations = 0; loop { tokio::select! { Some(work) = work_queue.recv() => { sleep(Duration::from_millis(10)).await; // Pretend to work. - work.respond_on - .send(work.input * 1000) - .expect("failed to send response"); + work.respond_on.send(work.input * 1000).unwrap(); _iterations += 1; } // TODO: report number of iterations every 100ms @@ -27,22 +22,19 @@ async fn worker(mut work_queue: mpsc::Receiver) { } } -// A requester which requests work and waits for it to complete. -async fn do_work(work_queue: &mpsc::Sender, input: u32) -> u32 { +async fn do_work(work_queue: &mpsc::Sender, input: u32) -> Result { let (tx, rx) = oneshot::channel(); - work_queue - .send(Work { input, respond_on: tx }) - .await - .expect("failed to send on work queue"); - rx.await.expect("failed waiting for response") + work_queue.send(Work { input, respond_on: tx }).await?; + Ok(rx.await?) } #[tokio::main] -async fn main() { +async fn main() -> Result<()> { let (tx, rx) = mpsc::channel(10); - spawn(worker(rx)); + tokio::spawn(worker(rx)); for i in 0..100 { - let resp = do_work(&tx, i).await; + let resp = do_work(&tx, i).await?; println!("work result for iteration {i}: {resp}"); } + Ok(()) }