1
0
mirror of https://github.com/google/comprehensive-rust.git synced 2025-01-20 21:18:26 +02:00

Beginnings of an async chat exercise (#627)

* beginnings of an async chat exercise

* really basic solution

* format
This commit is contained in:
Dustin J. Mitchell 2023-05-16 11:51:01 -04:00 committed by GitHub
parent b4fb870af6
commit caeabdae3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 168 additions and 5 deletions

44
Cargo.lock generated
View File

@ -181,6 +181,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chat-async"
version = "0.1.0"
dependencies = [
"futures-util",
"http",
"thiserror",
"tokio",
"tokio-websockets",
]
[[package]]
name = "chrono"
version = "0.4.24"
@ -1962,6 +1973,12 @@ dependencies = [
"digest",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "sha2"
version = "0.10.6"
@ -2159,9 +2176,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.27.0"
version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
dependencies = [
"autocfg",
"bytes",
@ -2171,14 +2188,14 @@ dependencies = [
"pin-project-lite",
"socket2",
"tokio-macros",
"windows-sys 0.45.0",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
version = "2.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
@ -2232,6 +2249,23 @@ dependencies = [
"tracing",
]
[[package]]
name = "tokio-websockets"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d009a9a2a71bd44791363f42ba2dd7d47bf49aa61f7c43eabcd29a285e75a865"
dependencies = [
"base64 0.21.0",
"bytes",
"fastrand",
"futures-util",
"http",
"httparse",
"sha1_smol",
"tokio",
"tokio-util",
]
[[package]]
name = "toml"
version = "0.5.11"

View File

@ -4,4 +4,5 @@ members = [
"src/exercises",
"src/bare-metal/useful-crates/allocator-example",
"src/bare-metal/useful-crates/zerocopy-example",
"src/exercises/concurrency/chat-async",
]

View File

@ -0,0 +1,11 @@
[package]
name = "chat-async"
version = "0.1.0"
edition = "2021"
[dependencies]
futures-util = "0.3.28"
http = "0.2.9"
thiserror = "1.0.40"
tokio = { version = "1.28.1", features = ["net", "macros", "time", "rt", "rt-multi-thread", "io-std", "io-util"] }
tokio-websockets = "0.3.0"

View File

@ -0,0 +1,35 @@
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};
#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
let mut ws_stream = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
.connect()
.await?;
let stdin = tokio::io::stdin();
let mut stdin = BufReader::new(stdin);
loop {
let mut line = String::new();
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => println!("From server: {}", msg.as_text()?),
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
res = stdin.read_line(&mut line) => {
match res {
Ok(0) => return Ok(()),
Ok(_) => ws_stream.send(Message::text(line.trim_end().to_string())).await?,
Err(err) => return Err(err.into()),
}
}
}
}
}

View File

@ -0,0 +1,82 @@
use futures_util::sink::SinkExt;
use std::net::SocketAddr;
use thiserror::Error;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::error::{RecvError, SendError};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebsocketStream};
#[derive(Error, Debug)]
enum ServerError {
#[error("websocket error: {0}")]
Websocket(String),
#[error("io error: {0}")]
IO(#[from] std::io::Error),
#[error("broadcast channel SendError: {0}")]
SendError(#[from] SendError<String>),
#[error("broadcast channel RecvError: {0}")]
RecvError(#[from] RecvError),
}
// tokio_websockets Error types do not implement std::error::Error, so we make do by just capturing
// the debug format for the error.
impl From<tokio_websockets::Error> for ServerError {
fn from(err: tokio_websockets::Error) -> Self {
ServerError::Websocket(format!("{:?}", err))
}
}
impl From<tokio_websockets::proto::ProtocolError> for ServerError {
fn from(err: tokio_websockets::proto::ProtocolError) -> Self {
ServerError::Websocket(format!("{:?}", err))
}
}
async fn handle_connection(
addr: SocketAddr,
mut ws_stream: WebsocketStream<TcpStream>,
bcast_tx: Sender<String>,
) -> Result<(), ServerError> {
ws_stream
.send(Message::text("Welcome to chat! Type a message".into()))
.await?;
let mut bcast_rx = bcast_tx.subscribe();
loop {
tokio::select! {
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
let msg = msg.as_text()?;
println!("From client {addr:?} {msg:?}");
bcast_tx.send(msg.into())?;
}
Some(Err(err)) => return Err(err.into()),
None => return Ok(()),
}
}
msg = bcast_rx.recv() => {
ws_stream.send(Message::text(msg?)).await?;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), ServerError> {
let (bcast_tx, _) = channel(16);
let listener = TcpListener::bind("127.0.0.1:2000").await?;
println!("listening on port 2000");
loop {
let (socket, addr) = listener.accept().await?;
println!("New connection from {addr:?}");
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
// Wrap the raw TCP stream into a websocket.
let ws_stream = ServerBuilder::new().accept(socket).await?;
handle_connection(addr, ws_stream, bcast_tx).await
});
}
}