From caeabdae3ed4b19b879560db66041ec25993d1fa Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Tue, 16 May 2023 11:51:01 -0400 Subject: [PATCH] Beginnings of an async chat exercise (#627) * beginnings of an async chat exercise * really basic solution * format --- Cargo.lock | 44 ++++++++-- Cargo.toml | 1 + .../concurrency/chat-async/Cargo.toml | 11 +++ .../concurrency/chat-async/src/bin/client.rs | 35 ++++++++ .../concurrency/chat-async/src/bin/server.rs | 82 +++++++++++++++++++ 5 files changed, 168 insertions(+), 5 deletions(-) create mode 100644 src/exercises/concurrency/chat-async/Cargo.toml create mode 100644 src/exercises/concurrency/chat-async/src/bin/client.rs create mode 100644 src/exercises/concurrency/chat-async/src/bin/server.rs diff --git a/Cargo.lock b/Cargo.lock index a54ee709..32ba8ce2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,6 +181,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chat-async" +version = "0.1.0" +dependencies = [ + "futures-util", + "http", + "thiserror", + "tokio", + "tokio-websockets", +] + [[package]] name = "chrono" version = "0.4.24" @@ -1962,6 +1973,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha2" version = "0.10.6" @@ -2159,9 +2176,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" dependencies = [ "autocfg", "bytes", @@ -2171,14 +2188,14 @@ dependencies = [ "pin-project-lite", "socket2", "tokio-macros", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] name = "tokio-macros" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", @@ -2232,6 +2249,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "tokio-websockets" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d009a9a2a71bd44791363f42ba2dd7d47bf49aa61f7c43eabcd29a285e75a865" +dependencies = [ + "base64 0.21.0", + "bytes", + "fastrand", + "futures-util", + "http", + "httparse", + "sha1_smol", + "tokio", + "tokio-util", +] + [[package]] name = "toml" version = "0.5.11" diff --git a/Cargo.toml b/Cargo.toml index cdb2d859..7f1b06da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,4 +4,5 @@ members = [ "src/exercises", "src/bare-metal/useful-crates/allocator-example", "src/bare-metal/useful-crates/zerocopy-example", + "src/exercises/concurrency/chat-async", ] diff --git a/src/exercises/concurrency/chat-async/Cargo.toml b/src/exercises/concurrency/chat-async/Cargo.toml new file mode 100644 index 00000000..62479f23 --- /dev/null +++ b/src/exercises/concurrency/chat-async/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "chat-async" +version = "0.1.0" +edition = "2021" + +[dependencies] +futures-util = "0.3.28" +http = "0.2.9" +thiserror = "1.0.40" +tokio = { version = "1.28.1", features = ["net", "macros", "time", "rt", "rt-multi-thread", "io-std", "io-util"] } +tokio-websockets = "0.3.0" diff --git a/src/exercises/concurrency/chat-async/src/bin/client.rs b/src/exercises/concurrency/chat-async/src/bin/client.rs new file mode 100644 index 00000000..0153fc1f --- /dev/null +++ b/src/exercises/concurrency/chat-async/src/bin/client.rs @@ -0,0 +1,35 @@ +use futures_util::SinkExt; +use http::Uri; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio_websockets::{ClientBuilder, Message}; + +#[tokio::main] +async fn main() -> Result<(), tokio_websockets::Error> { + let mut ws_stream = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")) + .connect() + .await?; + + let stdin = tokio::io::stdin(); + let mut stdin = BufReader::new(stdin); + + loop { + let mut line = String::new(); + tokio::select! { + incoming = ws_stream.next() => { + match incoming { + Some(Ok(msg)) => println!("From server: {}", msg.as_text()?), + Some(Err(err)) => return Err(err.into()), + None => return Ok(()), + } + } + res = stdin.read_line(&mut line) => { + match res { + Ok(0) => return Ok(()), + Ok(_) => ws_stream.send(Message::text(line.trim_end().to_string())).await?, + Err(err) => return Err(err.into()), + } + } + + } + } +} diff --git a/src/exercises/concurrency/chat-async/src/bin/server.rs b/src/exercises/concurrency/chat-async/src/bin/server.rs new file mode 100644 index 00000000..ee0eb0c4 --- /dev/null +++ b/src/exercises/concurrency/chat-async/src/bin/server.rs @@ -0,0 +1,82 @@ +use futures_util::sink::SinkExt; +use std::net::SocketAddr; +use thiserror::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::broadcast::error::{RecvError, SendError}; +use tokio::sync::broadcast::{channel, Sender}; +use tokio_websockets::{Message, ServerBuilder, WebsocketStream}; + +#[derive(Error, Debug)] +enum ServerError { + #[error("websocket error: {0}")] + Websocket(String), + #[error("io error: {0}")] + IO(#[from] std::io::Error), + #[error("broadcast channel SendError: {0}")] + SendError(#[from] SendError), + #[error("broadcast channel RecvError: {0}")] + RecvError(#[from] RecvError), +} + +// tokio_websockets Error types do not implement std::error::Error, so we make do by just capturing +// the debug format for the error. +impl From for ServerError { + fn from(err: tokio_websockets::Error) -> Self { + ServerError::Websocket(format!("{:?}", err)) + } +} + +impl From for ServerError { + fn from(err: tokio_websockets::proto::ProtocolError) -> Self { + ServerError::Websocket(format!("{:?}", err)) + } +} + +async fn handle_connection( + addr: SocketAddr, + mut ws_stream: WebsocketStream, + bcast_tx: Sender, +) -> Result<(), ServerError> { + ws_stream + .send(Message::text("Welcome to chat! Type a message".into())) + .await?; + let mut bcast_rx = bcast_tx.subscribe(); + loop { + tokio::select! { + incoming = ws_stream.next() => { + match incoming { + Some(Ok(msg)) => { + let msg = msg.as_text()?; + println!("From client {addr:?} {msg:?}"); + bcast_tx.send(msg.into())?; + } + Some(Err(err)) => return Err(err.into()), + None => return Ok(()), + } + } + msg = bcast_rx.recv() => { + ws_stream.send(Message::text(msg?)).await?; + } + } + } +} + +#[tokio::main] +async fn main() -> Result<(), ServerError> { + let (bcast_tx, _) = channel(16); + + let listener = TcpListener::bind("127.0.0.1:2000").await?; + println!("listening on port 2000"); + + loop { + let (socket, addr) = listener.accept().await?; + println!("New connection from {addr:?}"); + let bcast_tx = bcast_tx.clone(); + tokio::spawn(async move { + // Wrap the raw TCP stream into a websocket. + let ws_stream = ServerBuilder::new().accept(socket).await?; + + handle_connection(addr, ws_stream, bcast_tx).await + }); + } +}