From 36e2ecbf6c3b48eba83d4dce27c0181ed499e7ef Mon Sep 17 00:00:00 2001 From: Julio Biason Date: Mon, 19 Aug 2024 14:31:57 -0300 Subject: [PATCH] Using Condvars, Atomics and Mutexes as a semaphore --- condvartest/Cargo.lock | 197 ++++++++++++++++++++++++++++++++++++++ condvartest/Cargo.toml | 8 ++ condvartest/README.md | 3 + condvartest/src/beacon.rs | 39 ++++++++ condvartest/src/main.rs | 63 ++++++++++++ 5 files changed, 310 insertions(+) create mode 100644 condvartest/Cargo.lock create mode 100644 condvartest/Cargo.toml create mode 100644 condvartest/README.md create mode 100644 condvartest/src/beacon.rs create mode 100644 condvartest/src/main.rs diff --git a/condvartest/Cargo.lock b/condvartest/Cargo.lock new file mode 100644 index 0000000..ab0e7e4 --- /dev/null +++ b/condvartest/Cargo.lock @@ -0,0 +1,197 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "condvartest" +version = "0.1.0" +dependencies = [ + "crossbeam", + "rand", +] + +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "libc" +version = "0.2.158" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" + +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "syn" +version = "2.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/condvartest/Cargo.toml b/condvartest/Cargo.toml new file mode 100644 index 0000000..67a779b --- /dev/null +++ b/condvartest/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "condvartest" +version = "0.1.0" +edition = "2021" + +[dependencies] +crossbeam = "0.8.4" +rand = "0.8.5" diff --git a/condvartest/README.md b/condvartest/README.md new file mode 100644 index 0000000..74f6a05 --- /dev/null +++ b/condvartest/README.md @@ -0,0 +1,3 @@ +# CondVarTest + +Playing a bit with a condition variable, which carries a value instead of a bool. diff --git a/condvartest/src/beacon.rs b/condvartest/src/beacon.rs new file mode 100644 index 0000000..a0fa9de --- /dev/null +++ b/condvartest/src/beacon.rs @@ -0,0 +1,39 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Condvar, Mutex, +}; + +pub struct Beacon { + lock: Mutex, + guard: Condvar, +} + +impl Beacon { + pub fn new(leases: usize) -> Self { + Self { + lock: Mutex::new(AtomicUsize::new(leases)), + guard: Condvar::new(), + } + } + + pub fn lease(&self, leases: usize) { + let mut control = self.lock.lock().unwrap(); + let mut current_leases = control.load(Ordering::Relaxed); + println!("Need {leases} out of {current_leases}"); + while current_leases < leases { + println!("Not enough, waiting..."); + control = self.guard.wait(control).unwrap(); + current_leases = control.load(Ordering::Relaxed); + println!("Need {leases} our of {current_leases}"); + } + + control.fetch_sub(leases, Ordering::SeqCst); + } + + pub fn release(&self, leases: usize) { + println!("Releasing {leases} leases"); + let control = self.lock.lock().unwrap(); + control.fetch_add(leases, Ordering::SeqCst); + self.guard.notify_all(); + } +} diff --git a/condvartest/src/main.rs b/condvartest/src/main.rs new file mode 100644 index 0000000..a82a7ad --- /dev/null +++ b/condvartest/src/main.rs @@ -0,0 +1,63 @@ +mod beacon; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Condvar; +use std::sync::Mutex; +use std::time::Duration; + +use beacon::Beacon; +use crossbeam::channel::unbounded; +use rand::thread_rng; +use rand::Rng; + +fn main() { + let semaphore = Arc::new(Beacon::new(10)); + let (send, receive) = unbounded(); + + let mut waits = Vec::new(); + for i in 0..3 { + let thread_recv = receive.clone(); + let thread_semaphore = Arc::clone(&semaphore); + waits.push(std::thread::spawn(move || { + let me = i; + let mut sum = 0; + + while let Ok(value) = thread_recv.recv() { + println!("{me} Got value: {value}"); + + thread_semaphore.lease(value); + + let duration = Duration::new(value as u64, 0); + std::thread::sleep(duration); + sum += value; + + thread_semaphore.release(value); + } + println!("{me} completed with a total of {sum}"); + sum + })); + } + drop(receive); + + println!("Sending messages:"); + let mut rng = thread_rng(); + let mut produced = 0; + for i in 0..12 { + let value = rng.gen_range(1..11); + println!("{i}: produced value {value}"); + send.send(value).expect("Failed to send number"); + produced += value; + } + drop(send); // breaks the sender + println!("Produced a total of {produced}"); + + let mut total_result = 0; + for handle in waits { + let thread_result = handle.join().unwrap(); + total_result += thread_result; + } + + println!("Threads consumed {total_result}"); +}