diff --git a/tokiounixsocket/Cargo.lock b/tokiounixsocket/Cargo.lock index 4df5c93..f81efe0 100644 --- a/tokiounixsocket/Cargo.lock +++ b/tokiounixsocket/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + [[package]] name = "cfg-if" version = "1.0.0" @@ -32,6 +38,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + [[package]] name = "mio" version = "0.8.0" @@ -134,7 +146,9 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ + "bytes", "libc", + "memchr", "mio", "pin-project-lite", "socket2", diff --git a/tokiounixsocket/Cargo.toml b/tokiounixsocket/Cargo.toml index 287a8e7..9df71b0 100644 --- a/tokiounixsocket/Cargo.toml +++ b/tokiounixsocket/Cargo.toml @@ -8,4 +8,4 @@ edition = "2021" [dependencies] bincode = "1.3.3" serde = { version = "1.0.136", features = ["derive"] } -tokio = { version = "1.17.0", features = ["rt", "sync", "net", "macros"] } +tokio = { version = "1.17.0", features = ["rt", "sync", "net", "macros", "io-util"] } diff --git a/tokiounixsocket/src/main.rs b/tokiounixsocket/src/main.rs index 0662d31..03afa63 100644 --- a/tokiounixsocket/src/main.rs +++ b/tokiounixsocket/src/main.rs @@ -1,6 +1,8 @@ use std::io; use std::path::Path; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; use tokio::io::Interest; use tokio::net::UnixListener; use tokio::net::UnixStream; @@ -14,6 +16,7 @@ enum Message { #[tokio::main(flavor = "current_thread")] async fn main() { let path = Path::new("./comm.socket"); + let _ = std::fs::remove_file(&path); // producer let path_prod = path.to_path_buf(); @@ -21,9 +24,9 @@ async fn main() { let listener = UnixListener::bind(&path_prod).unwrap(); loop { match listener.accept().await { - Ok((stream, _addr)) => { + Ok((mut stream, _addr)) => { println!("Client here"); - tokio::spawn(async move { send_to_consumer(&stream).await }) + tokio::spawn(async move { send_to_consumer(&mut stream).await }) } Err(e) => { println!("Erro! {:?}", e); @@ -36,26 +39,37 @@ async fn main() { // consumer let path_cons = path.to_path_buf(); let consumer = tokio::spawn(async move { - let socket = UnixStream::connect(&path_cons).await.unwrap(); + let mut socket = UnixStream::connect(&path_cons).await.unwrap(); loop { let ready = socket.ready(Interest::READABLE).await.unwrap(); if ready.is_readable() { - let mut data = [0; 1024]; - match socket.try_read(&mut data) { - Ok(n) => { - println!("Read {} bytes", n); - let message: Result> = - bincode::deserialize(&data[..n]); - match message { - Ok(the_message) => println!("Message: {:?}", the_message), - Err(e) => println!("Deserialized message: {:?}", e), + let size = socket.read_u64().await.unwrap(); + println!("Content length: {}", size as usize); + + let mut record = vec![0u8; size as usize]; + loop { + match socket.read_exact(&mut record).await { + Ok(0) => { + tokio::task::yield_now().await; + continue; + } + Ok(n) => { + println!("Read {} bytes: {:?}", n, record); + let message: Result> = + bincode::deserialize(&record); + match message { + Ok(the_message) => println!("Message: {:?}", the_message), + Err(e) => println!("Deserialized message: {:?}", e), + } + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => { + println!("Read error: {:?}", e); + break; } - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - println!("Error reading: {:?}", e); } } } @@ -66,7 +80,7 @@ async fn main() { let _ = tokio::join!(producer, consumer); } -async fn send_to_consumer(stream: &UnixStream) { +async fn send_to_consumer(stream: &mut UnixStream) { let mut should_yield = false; let mut seq = 0; loop { @@ -78,12 +92,17 @@ async fn send_to_consumer(stream: &UnixStream) { seq += 1; let encoded = bincode::serialize(&message).unwrap(); + let len = encoded.len(); let ready = stream.ready(Interest::WRITABLE).await.unwrap(); if ready.is_writable() { + // Header, with the size + stream.write_u64(len as u64).await.unwrap(); + + // The content match stream.try_write(&encoded) { Ok(n) => println!("Wrote {} bytes: {:?}", n, encoded), - Err(e) => println!("Error: {:?}", e), + Err(e) => println!("Write error: {:?}", e), } should_yield = !should_yield;