Browse Source

Made it work

master
Julio Biason 3 years ago
parent
commit
c11f977cfa
  1. 14
      tokiounixsocket/Cargo.lock
  2. 2
      tokiounixsocket/Cargo.toml
  3. 41
      tokiounixsocket/src/main.rs

14
tokiounixsocket/Cargo.lock generated

@ -11,6 +11,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "bytes"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -32,6 +38,12 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "memchr"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.0" version = "0.8.0"
@ -134,7 +146,9 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
dependencies = [ dependencies = [
"bytes",
"libc", "libc",
"memchr",
"mio", "mio",
"pin-project-lite", "pin-project-lite",
"socket2", "socket2",

2
tokiounixsocket/Cargo.toml

@ -8,4 +8,4 @@ edition = "2021"
[dependencies] [dependencies]
bincode = "1.3.3" bincode = "1.3.3"
serde = { version = "1.0.136", features = ["derive"] } serde = { version = "1.0.136", features = ["derive"] }
tokio = { version = "1.17.0", features = ["rt", "sync", "net", "macros"] } tokio = { version = "1.17.0", features = ["rt", "sync", "net", "macros", "io-util"] }

41
tokiounixsocket/src/main.rs

@ -1,6 +1,8 @@
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::Interest; use tokio::io::Interest;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use tokio::net::UnixStream; use tokio::net::UnixStream;
@ -14,6 +16,7 @@ enum Message {
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() { async fn main() {
let path = Path::new("./comm.socket"); let path = Path::new("./comm.socket");
let _ = std::fs::remove_file(&path);
// producer // producer
let path_prod = path.to_path_buf(); let path_prod = path.to_path_buf();
@ -21,9 +24,9 @@ async fn main() {
let listener = UnixListener::bind(&path_prod).unwrap(); let listener = UnixListener::bind(&path_prod).unwrap();
loop { loop {
match listener.accept().await { match listener.accept().await {
Ok((stream, _addr)) => { Ok((mut stream, _addr)) => {
println!("Client here"); println!("Client here");
tokio::spawn(async move { send_to_consumer(&stream).await }) tokio::spawn(async move { send_to_consumer(&mut stream).await })
} }
Err(e) => { Err(e) => {
println!("Erro! {:?}", e); println!("Erro! {:?}", e);
@ -36,26 +39,37 @@ async fn main() {
// consumer // consumer
let path_cons = path.to_path_buf(); let path_cons = path.to_path_buf();
let consumer = tokio::spawn(async move { let consumer = tokio::spawn(async move {
let socket = UnixStream::connect(&path_cons).await.unwrap(); let mut socket = UnixStream::connect(&path_cons).await.unwrap();
loop { loop {
let ready = socket.ready(Interest::READABLE).await.unwrap(); let ready = socket.ready(Interest::READABLE).await.unwrap();
if ready.is_readable() { if ready.is_readable() {
let mut data = [0; 1024]; let size = socket.read_u64().await.unwrap();
match socket.try_read(&mut data) { println!("Content length: {}", size as usize);
let mut record = vec![0u8; size as usize];
loop {
match socket.read_exact(&mut record).await {
Ok(0) => {
tokio::task::yield_now().await;
continue;
}
Ok(n) => { Ok(n) => {
println!("Read {} bytes", n); println!("Read {} bytes: {:?}", n, record);
let message: Result<Message, Box<bincode::ErrorKind>> = let message: Result<Message, Box<bincode::ErrorKind>> =
bincode::deserialize(&data[..n]); bincode::deserialize(&record);
match message { match message {
Ok(the_message) => println!("Message: {:?}", the_message), Ok(the_message) => println!("Message: {:?}", the_message),
Err(e) => println!("Deserialized message: {:?}", e), Err(e) => println!("Deserialized message: {:?}", e),
} }
break;
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue; break;
} }
Err(e) => { Err(e) => {
println!("Error reading: {:?}", e); println!("Read error: {:?}", e);
break;
}
} }
} }
} }
@ -66,7 +80,7 @@ async fn main() {
let _ = tokio::join!(producer, consumer); let _ = tokio::join!(producer, consumer);
} }
async fn send_to_consumer(stream: &UnixStream) { async fn send_to_consumer(stream: &mut UnixStream) {
let mut should_yield = false; let mut should_yield = false;
let mut seq = 0; let mut seq = 0;
loop { loop {
@ -78,12 +92,17 @@ async fn send_to_consumer(stream: &UnixStream) {
seq += 1; seq += 1;
let encoded = bincode::serialize(&message).unwrap(); let encoded = bincode::serialize(&message).unwrap();
let len = encoded.len();
let ready = stream.ready(Interest::WRITABLE).await.unwrap(); let ready = stream.ready(Interest::WRITABLE).await.unwrap();
if ready.is_writable() { if ready.is_writable() {
// Header, with the size
stream.write_u64(len as u64).await.unwrap();
// The content
match stream.try_write(&encoded) { match stream.try_write(&encoded) {
Ok(n) => println!("Wrote {} bytes: {:?}", n, encoded), Ok(n) => println!("Wrote {} bytes: {:?}", n, encoded),
Err(e) => println!("Error: {:?}", e), Err(e) => println!("Write error: {:?}", e),
} }
should_yield = !should_yield; should_yield = !should_yield;

Loading…
Cancel
Save