diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 01:02:36 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 01:02:36 +0000 |
commit | c62c309c99c3c8582a1101b5024680103b8459d9 (patch) | |
tree | 41a3064fbbb4b862c41a57421365ee8c362760bf | |
parent | 461a7a691bdce4cf9d794b51cd565ac35a8c4b07 (diff) | |
parent | 61a284f21e182c391151d6a63a7be5a240931d6c (diff) | |
download | crossbeam-channel-aml_cbr_341610000.tar.gz |
Snap for 10447354 from 61a284f21e182c391151d6a63a7be5a240931d6c to mainline-cellbroadcast-releaseaml_cbr_341710000aml_cbr_341610000aml_cbr_341510010aml_cbr_341410010aml_cbr_341311010aml_cbr_341110000aml_cbr_341011000aml_cbr_340914000android14-mainline-cellbroadcast-release
Change-Id: I6187dbd1ef574e03e50dfbbd8aef9fff2e45acdc
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 8 | ||||
-rw-r--r-- | CHANGELOG.md | 28 | ||||
-rw-r--r-- | Cargo.lock | 63 | ||||
-rw-r--r-- | Cargo.toml | 21 | ||||
-rw-r--r-- | Cargo.toml.orig | 4 | ||||
-rw-r--r-- | METADATA | 12 | ||||
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | TEST_MAPPING | 3 | ||||
-rw-r--r-- | src/channel.rs | 33 | ||||
-rw-r--r-- | src/err.rs | 4 | ||||
-rw-r--r-- | src/flavors/array.rs | 78 | ||||
-rw-r--r-- | src/flavors/at.rs | 8 | ||||
-rw-r--r-- | src/flavors/list.rs | 28 | ||||
-rw-r--r-- | src/flavors/never.rs | 1 | ||||
-rw-r--r-- | src/flavors/tick.rs | 5 | ||||
-rw-r--r-- | src/flavors/zero.rs | 71 | ||||
-rw-r--r-- | src/select.rs | 25 | ||||
-rw-r--r-- | src/select_macro.rs | 60 | ||||
-rw-r--r-- | src/utils.rs | 56 | ||||
-rw-r--r-- | src/waker.rs | 64 | ||||
-rw-r--r-- | tests/array.rs | 94 | ||||
-rw-r--r-- | tests/golang.rs | 628 | ||||
-rw-r--r-- | tests/list.rs | 34 | ||||
-rw-r--r-- | tests/mpsc.rs | 19 | ||||
-rw-r--r-- | tests/ready.rs | 1 | ||||
-rw-r--r-- | tests/select.rs | 17 | ||||
-rw-r--r-- | tests/select_macro.rs | 14 | ||||
-rw-r--r-- | tests/thread_locals.rs | 2 | ||||
-rw-r--r-- | tests/zero.rs | 26 |
30 files changed, 1015 insertions, 398 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f15a046..548f675 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "f9cec068fa94bced547a66289cd288dca58c2e83" + "sha1": "721382b00b5dadd81954ed66764d547e2f1bb7a3" }, "path_in_vcs": "crossbeam-channel" }
\ No newline at end of file @@ -47,7 +47,7 @@ rust_library { host_supported: true, crate_name: "crossbeam_channel", cargo_env_compat: true, - cargo_pkg_version: "0.5.2", + cargo_pkg_version: "0.5.7", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -59,4 +59,10 @@ rust_library { "libcfg_if", "libcrossbeam_utils", ], + apex_available: [ + "//apex_available:platform", + "//apex_available:anyapex", + ], + product_available: true, + vendor_available: true, } diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bfd923..3277f15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,30 @@ +# Version 0.5.7 + +- Improve handling of very large timeout. (#953) + +# Version 0.5.6 + +- Bump the minimum supported Rust version to 1.38. (#877) + +# Version 0.5.5 + +- Replace Spinlock with Mutex. (#835) + +# Version 0.5.4 + +- Workaround a bug in upstream related to TLS access on AArch64 Linux. (#802) + +# Version 0.5.3 + +**Note:** This release has been yanked. See [#802](https://github.com/crossbeam-rs/crossbeam/issues/802) for details. + +- Fix panic on very large timeout. (#798) + # Version 0.5.2 -- Fix stacked borrows violations. (#763, #764) +**Note:** This release has been yanked. See [#802](https://github.com/crossbeam-rs/crossbeam/issues/802) for details. + +- Fix stacked borrows violations when `-Zmiri-tag-raw-pointers` is enabled. (#763, #764) # Version 0.5.1 @@ -22,6 +46,8 @@ # Version 0.4.3 +**Note:** This release has been yanked. See [GHSA-v5m7-53cv-f3hx](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-v5m7-53cv-f3hx) for details. + - Change license to "MIT OR Apache-2.0". # Version 0.4.2 @@ -10,7 +10,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "crossbeam-channel" -version = "0.5.2" +version = "0.5.7" dependencies = [ "cfg-if", "crossbeam-utils", @@ -21,19 +21,18 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.6" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120" +checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" dependencies = [ "cfg-if", - "lazy_static", ] [[package]] name = "getrandom" -version = "0.2.3" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", "libc", @@ -42,30 +41,24 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.19" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" dependencies = [ "libc", ] [[package]] -name = "lazy_static" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" - -[[package]] name = "libc" -version = "0.2.112" +version = "0.2.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125" +checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] name = "num_cpus" -version = "1.13.1" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" dependencies = [ "hermit-abi", "libc", @@ -73,20 +66,19 @@ dependencies = [ [[package]] name = "ppv-lite86" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "rand" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", "rand_core", - "rand_hc", ] [[package]] @@ -101,27 +93,18 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ "getrandom", ] [[package]] -name = "rand_hc" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" -dependencies = [ - "rand_core", -] - -[[package]] name = "signal-hook" -version = "0.3.13" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d" +checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9" dependencies = [ "libc", "signal-hook-registry", @@ -129,15 +112,15 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" dependencies = [ "libc", ] [[package]] name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" +version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" @@ -11,15 +11,27 @@ [package] edition = "2018" -rust-version = "1.36" +rust-version = "1.38" name = "crossbeam-channel" -version = "0.5.2" +version = "0.5.7" description = "Multi-producer multi-consumer channels for message passing" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel" -keywords = ["channel", "mpmc", "select", "golang", "message"] -categories = ["algorithms", "concurrency", "data-structures"] +readme = "README.md" +keywords = [ + "channel", + "mpmc", + "select", + "golang", + "message", +] +categories = [ + "algorithms", + "concurrency", + "data-structures", +] license = "MIT OR Apache-2.0" repository = "https://github.com/crossbeam-rs/crossbeam" + [dependencies.cfg-if] version = "1" @@ -27,6 +39,7 @@ version = "1" version = "0.8" optional = true default-features = false + [dev-dependencies.num_cpus] version = "1.13.0" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 640a808..25c3678 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,9 +4,9 @@ name = "crossbeam-channel" # - Update CHANGELOG.md # - Update README.md # - Create "crossbeam-channel-X.Y.Z" git tag -version = "0.5.2" +version = "0.5.7" edition = "2018" -rust-version = "1.36" +rust-version = "1.38" license = "MIT OR Apache-2.0" repository = "https://github.com/crossbeam-rs/crossbeam" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel" @@ -1,3 +1,7 @@ +# This project was upgraded with external_updater. +# Usage: tools/external_updater/updater.sh update rust/crates/crossbeam-channel +# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md + name: "crossbeam-channel" description: "Multi-producer multi-consumer channels for message passing" third_party { @@ -7,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.2.crate" + value: "https://static.crates.io/crates/crossbeam-channel/crossbeam-channel-0.5.7.crate" } - version: "0.5.2" + version: "0.5.7" license_type: NOTICE last_upgrade_date { - year: 2022 + year: 2023 month: 3 - day: 1 + day: 6 } } @@ -8,7 +8,7 @@ https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel#license) https://crates.io/crates/crossbeam-channel) [![Documentation](https://docs.rs/crossbeam-channel/badge.svg)]( https://docs.rs/crossbeam-channel) -[![Rust 1.36+](https://img.shields.io/badge/rust-1.36+-lightgray.svg)]( +[![Rust 1.38+](https://img.shields.io/badge/rust-1.38+-lightgray.svg)]( https://www.rust-lang.org) [![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ) @@ -48,7 +48,7 @@ crossbeam-channel = "0.5" Crossbeam Channel supports stable Rust releases going back at least six months, and every time the minimum supported Rust version is increased, a new minor -version is released. Currently, the minimum supported Rust version is 1.36. +version is released. Currently, the minimum supported Rust version is 1.38. ## License diff --git a/TEST_MAPPING b/TEST_MAPPING index 3cbd48d..55384f0 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -5,6 +5,9 @@ "path": "external/rust/crates/base64" }, { + "path": "external/rust/crates/hashbrown" + }, + { "path": "external/rust/crates/tinytemplate" }, { diff --git a/src/channel.rs b/src/channel.rs index 8988235..bd24115 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -159,7 +159,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { /// let ms = |ms| Duration::from_millis(ms); /// /// // Returns `true` if `a` and `b` are very close `Instant`s. -/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a; +/// let eq = |a, b| a + ms(60) > b && b + ms(60) > a; /// /// let start = Instant::now(); /// let r = after(ms(100)); @@ -171,8 +171,11 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { /// assert!(eq(Instant::now(), start + ms(500))); /// ``` pub fn after(duration: Duration) -> Receiver<Instant> { - Receiver { - flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))), + match Instant::now().checked_add(duration) { + Some(deadline) => Receiver { + flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))), + }, + None => never(), } } @@ -232,6 +235,8 @@ pub fn at(when: Instant) -> Receiver<Instant> { /// /// Using a `never` channel to optionally add a timeout to [`select!`]: /// +/// [`select!`]: crate::select! +/// /// ``` /// use std::thread; /// use std::time::Duration; @@ -297,7 +302,7 @@ pub fn never<T>() -> Receiver<T> { /// let ms = |ms| Duration::from_millis(ms); /// /// // Returns `true` if `a` and `b` are very close `Instant`s. -/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a; +/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a; /// /// let start = Instant::now(); /// let r = tick(ms(100)); @@ -317,8 +322,14 @@ pub fn never<T>() -> Receiver<T> { /// assert!(eq(Instant::now(), start + ms(700))); /// ``` pub fn tick(duration: Duration) -> Receiver<Instant> { - Receiver { - flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))), + match Instant::now().checked_add(duration) { + Some(delivery_time) => Receiver { + flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new( + delivery_time, + duration, + ))), + }, + None => never(), } } @@ -471,7 +482,10 @@ impl<T> Sender<T> { /// ); /// ``` pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { - self.send_deadline(msg, Instant::now() + timeout) + match Instant::now().checked_add(timeout) { + Some(deadline) => self.send_deadline(msg, deadline), + None => self.send(msg).map_err(SendTimeoutError::from), + } } /// Waits for a message to be sent into the channel, but only until a given deadline. @@ -861,7 +875,10 @@ impl<T> Receiver<T> { /// ); /// ``` pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { - self.recv_deadline(Instant::now() + timeout) + match Instant::now().checked_add(timeout) { + Some(deadline) => self.recv_deadline(deadline), + None => self.recv().map_err(RecvTimeoutError::from), + } } /// Waits for a message to be received from the channel, but only before a given deadline. @@ -308,7 +308,6 @@ impl From<RecvError> for TryRecvError { impl TryRecvError { /// Returns `true` if the receive operation failed because the channel is empty. - #[allow(clippy::trivially_copy_pass_by_ref)] pub fn is_empty(&self) -> bool { match self { TryRecvError::Empty => true, @@ -317,7 +316,6 @@ impl TryRecvError { } /// Returns `true` if the receive operation failed because the channel is disconnected. - #[allow(clippy::trivially_copy_pass_by_ref)] pub fn is_disconnected(&self) -> bool { match self { TryRecvError::Disconnected => true, @@ -347,7 +345,6 @@ impl From<RecvError> for RecvTimeoutError { impl RecvTimeoutError { /// Returns `true` if the receive operation timed out. - #[allow(clippy::trivially_copy_pass_by_ref)] pub fn is_timeout(&self) -> bool { match self { RecvTimeoutError::Timeout => true, @@ -356,7 +353,6 @@ impl RecvTimeoutError { } /// Returns `true` if the receive operation failed because the channel is disconnected. - #[allow(clippy::trivially_copy_pass_by_ref)] pub fn is_disconnected(&self) -> bool { match self { RecvTimeoutError::Disconnected => true, diff --git a/src/flavors/array.rs b/src/flavors/array.rs index 871768c..63b82eb 100644 --- a/src/flavors/array.rs +++ b/src/flavors/array.rs @@ -9,7 +9,6 @@ //! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub> use std::cell::UnsafeCell; -use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr; use std::sync::atomic::{self, AtomicUsize, Ordering}; @@ -33,7 +32,7 @@ struct Slot<T> { /// The token type for the array flavor. #[derive(Debug)] -pub struct ArrayToken { +pub(crate) struct ArrayToken { /// Slot to read from or write to. slot: *const u8, @@ -72,7 +71,7 @@ pub(crate) struct Channel<T> { tail: CachePadded<AtomicUsize>, /// The buffer holding slots. - buffer: *mut Slot<T>, + buffer: Box<[Slot<T>]>, /// The channel capacity. cap: usize, @@ -88,9 +87,6 @@ pub(crate) struct Channel<T> { /// Receivers waiting while the channel is empty and not disconnected. receivers: SyncWaker, - - /// Indicates that dropping a `Channel<T>` may drop values of type `T`. - _marker: PhantomData<T>, } impl<T> Channel<T> { @@ -109,18 +105,15 @@ impl<T> Channel<T> { // Allocate a buffer of `cap` slots initialized // with stamps. - let buffer = { - let boxed: Box<[Slot<T>]> = (0..cap) - .map(|i| { - // Set the stamp to `{ lap: 0, mark: 0, index: i }`. - Slot { - stamp: AtomicUsize::new(i), - msg: UnsafeCell::new(MaybeUninit::uninit()), - } - }) - .collect(); - Box::into_raw(boxed) as *mut Slot<T> - }; + let buffer: Box<[Slot<T>]> = (0..cap) + .map(|i| { + // Set the stamp to `{ lap: 0, mark: 0, index: i }`. + Slot { + stamp: AtomicUsize::new(i), + msg: UnsafeCell::new(MaybeUninit::uninit()), + } + }) + .collect(); Channel { buffer, @@ -131,7 +124,6 @@ impl<T> Channel<T> { tail: CachePadded::new(AtomicUsize::new(tail)), senders: SyncWaker::new(), receivers: SyncWaker::new(), - _marker: PhantomData, } } @@ -163,7 +155,8 @@ impl<T> Channel<T> { let lap = tail & !(self.one_lap - 1); // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the tail and the stamp match, we may attempt to push. @@ -223,7 +216,7 @@ impl<T> Channel<T> { return Err(msg); } - let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); + let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>(); // Write the message into the slot and update the stamp. slot.msg.get().write(MaybeUninit::new(msg)); @@ -245,7 +238,8 @@ impl<T> Channel<T> { let lap = head & !(self.one_lap - 1); // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the the stamp is ahead of the head by 1, we may attempt to pop. @@ -313,7 +307,7 @@ impl<T> Channel<T> { return Err(()); } - let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); + let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>(); // Read the message from the slot and update the stamp. let msg = slot.msg.get().read().assume_init(); @@ -475,7 +469,6 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. pub(crate) fn capacity(&self) -> Option<usize> { Some(self.cap) } @@ -528,10 +521,24 @@ impl<T> Channel<T> { impl<T> Drop for Channel<T> { fn drop(&mut self) { // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); + let head = *self.head.get_mut(); + let tail = *self.tail.get_mut(); + + let hix = head & (self.mark_bit - 1); + let tix = tail & (self.mark_bit - 1); + + let len = if hix < tix { + tix - hix + } else if hix > tix { + self.cap - hix + tix + } else if (tail & !self.mark_bit) == head { + 0 + } else { + self.cap + }; // Loop over all slots that hold a message and drop them. - for i in 0..self.len() { + for i in 0..len { // Compute the index of the next slot holding a message. let index = if hix + i < self.cap { hix + i @@ -540,23 +547,12 @@ impl<T> Drop for Channel<T> { }; unsafe { - let p = { - let slot = &mut *self.buffer.add(index); - let msg = &mut *slot.msg.get(); - msg.as_mut_ptr() - }; - p.drop_in_place(); + debug_assert!(index < self.buffer.len()); + let slot = self.buffer.get_unchecked_mut(index); + let msg = &mut *slot.msg.get(); + msg.as_mut_ptr().drop_in_place(); } } - - // Finally, deallocate the buffer, but don't run any destructors. - unsafe { - // Create a slice from the buffer to make - // a fat pointer. Then, use Box::from_raw - // to deallocate it. - let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; - Box::from_raw(ptr); - } } } diff --git a/src/flavors/at.rs b/src/flavors/at.rs index 4581edb..515c4e3 100644 --- a/src/flavors/at.rs +++ b/src/flavors/at.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Instant; use crate::context::Context; use crate::err::{RecvTimeoutError, TryRecvError}; @@ -32,11 +32,6 @@ impl Channel { received: AtomicBool::new(false), } } - /// Creates a channel that delivers a message after a certain duration of time. - #[inline] - pub(crate) fn new_timeout(dur: Duration) -> Self { - Self::new_deadline(Instant::now() + dur) - } /// Attempts to receive a message without blocking. #[inline] @@ -142,7 +137,6 @@ impl Channel { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] pub(crate) fn capacity(&self) -> Option<usize> { Some(1) diff --git a/src/flavors/list.rs b/src/flavors/list.rs index 5056aa4..6090b8d 100644 --- a/src/flavors/list.rs +++ b/src/flavors/list.rs @@ -49,6 +49,11 @@ struct Slot<T> { } impl<T> Slot<T> { + const UNINIT: Self = Self { + msg: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), + }; + /// Waits until a message is written into the slot. fn wait_write(&self) { let backoff = Backoff::new(); @@ -72,13 +77,10 @@ struct Block<T> { impl<T> Block<T> { /// Creates an empty block. fn new() -> Block<T> { - // SAFETY: This is safe because: - // [1] `Block::next` (AtomicPtr) may be safely zero initialized. - // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. - // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it - // holds a MaybeUninit. - // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. - unsafe { MaybeUninit::zeroed().assume_init() } + Self { + next: AtomicPtr::new(ptr::null_mut()), + slots: [Slot::UNINIT; BLOCK_CAP], + } } /// Waits until the next pointer is set. @@ -126,7 +128,7 @@ struct Position<T> { /// The token type for the list flavor. #[derive(Debug)] -pub struct ListToken { +pub(crate) struct ListToken { /// The block of slots. block: *const u8, @@ -283,7 +285,7 @@ impl<T> Channel<T> { } // Write the message into the slot. - let block = token.list.block as *mut Block<T>; + let block = token.list.block.cast::<Block<T>>(); let offset = token.list.offset; let slot = (*block).slots.get_unchecked(offset); slot.msg.get().write(MaybeUninit::new(msg)); @@ -634,9 +636,9 @@ impl<T> Channel<T> { impl<T> Drop for Channel<T> { fn drop(&mut self) { - let mut head = self.head.index.load(Ordering::Relaxed); - let mut tail = self.tail.index.load(Ordering::Relaxed); - let mut block = self.head.block.load(Ordering::Relaxed); + let mut head = *self.head.index.get_mut(); + let mut tail = *self.tail.index.get_mut(); + let mut block = *self.head.block.get_mut(); // Erase the lower bits. head &= !((1 << SHIFT) - 1); @@ -654,7 +656,7 @@ impl<T> Drop for Channel<T> { p.as_mut_ptr().drop_in_place(); } else { // Deallocate the block and move to the next one. - let next = (*block).next.load(Ordering::Relaxed); + let next = *(*block).next.get_mut(); drop(Box::from_raw(block)); block = next; } diff --git a/src/flavors/never.rs b/src/flavors/never.rs index 1951e96..277a61d 100644 --- a/src/flavors/never.rs +++ b/src/flavors/never.rs @@ -65,7 +65,6 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] pub(crate) fn capacity(&self) -> Option<usize> { Some(0) diff --git a/src/flavors/tick.rs b/src/flavors/tick.rs index d4b1f6c..d38f6a5 100644 --- a/src/flavors/tick.rs +++ b/src/flavors/tick.rs @@ -26,9 +26,9 @@ pub(crate) struct Channel { impl Channel { /// Creates a channel that delivers messages periodically. #[inline] - pub(crate) fn new(dur: Duration) -> Self { + pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self { Channel { - delivery_time: AtomicCell::new(Instant::now() + dur), + delivery_time: AtomicCell::new(delivery_time), duration: dur, } } @@ -112,7 +112,6 @@ impl Channel { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. #[inline] pub(crate) fn capacity(&self) -> Option<usize> { Some(1) diff --git a/src/flavors/zero.rs b/src/flavors/zero.rs index 4afbd8f..aae2ea3 100644 --- a/src/flavors/zero.rs +++ b/src/flavors/zero.rs @@ -5,6 +5,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; use std::time::Instant; use std::{fmt, ptr}; @@ -13,11 +14,10 @@ use crossbeam_utils::Backoff; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; -use crate::utils::Spinlock; use crate::waker::Waker; /// A pointer to a packet. -pub struct ZeroToken(*mut ()); +pub(crate) struct ZeroToken(*mut ()); impl Default for ZeroToken { fn default() -> Self { @@ -95,7 +95,7 @@ struct Inner { /// Zero-capacity channel. pub(crate) struct Channel<T> { /// Inner representation of the channel. - inner: Spinlock<Inner>, + inner: Mutex<Inner>, /// Indicates that dropping a `Channel<T>` may drop values of type `T`. _marker: PhantomData<T>, @@ -105,7 +105,7 @@ impl<T> Channel<T> { /// Constructs a new zero-capacity channel. pub(crate) fn new() -> Self { Channel { - inner: Spinlock::new(Inner { + inner: Mutex::new(Inner { senders: Waker::new(), receivers: Waker::new(), is_disconnected: false, @@ -126,7 +126,7 @@ impl<T> Channel<T> { /// Attempts to reserve a slot for sending a message. fn start_send(&self, token: &mut Token) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -155,7 +155,7 @@ impl<T> Channel<T> { /// Attempts to pair up with a sender. fn start_recv(&self, token: &mut Token) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -190,7 +190,7 @@ impl<T> Channel<T> { // heap-allocated packet. packet.wait_ready(); let msg = packet.msg.get().replace(None).unwrap(); - drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); + drop(Box::from_raw(token.zero.0.cast::<Packet<T>>())); Ok(msg) } } @@ -198,7 +198,7 @@ impl<T> Channel<T> { /// Attempts to send a message into the channel. pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -222,7 +222,7 @@ impl<T> Channel<T> { deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -254,12 +254,12 @@ impl<T> Channel<T> { match sel { Selected::Waiting => unreachable!(), Selected::Aborted => { - self.inner.lock().senders.unregister(oper).unwrap(); + self.inner.lock().unwrap().senders.unregister(oper).unwrap(); let msg = unsafe { packet.msg.get().replace(None).unwrap() }; Err(SendTimeoutError::Timeout(msg)) } Selected::Disconnected => { - self.inner.lock().senders.unregister(oper).unwrap(); + self.inner.lock().unwrap().senders.unregister(oper).unwrap(); let msg = unsafe { packet.msg.get().replace(None).unwrap() }; Err(SendTimeoutError::Disconnected(msg)) } @@ -275,7 +275,7 @@ impl<T> Channel<T> { /// Attempts to receive a message without blocking. pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -292,7 +292,7 @@ impl<T> Channel<T> { /// Receives a message from the channel. pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -325,11 +325,21 @@ impl<T> Channel<T> { match sel { Selected::Waiting => unreachable!(), Selected::Aborted => { - self.inner.lock().receivers.unregister(oper).unwrap(); + self.inner + .lock() + .unwrap() + .receivers + .unregister(oper) + .unwrap(); Err(RecvTimeoutError::Timeout) } Selected::Disconnected => { - self.inner.lock().receivers.unregister(oper).unwrap(); + self.inner + .lock() + .unwrap() + .receivers + .unregister(oper) + .unwrap(); Err(RecvTimeoutError::Disconnected) } Selected::Operation(_) => { @@ -345,7 +355,7 @@ impl<T> Channel<T> { /// /// Returns `true` if this call disconnected the channel. pub(crate) fn disconnect(&self) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); if !inner.is_disconnected { inner.is_disconnected = true; @@ -363,7 +373,6 @@ impl<T> Channel<T> { } /// Returns the capacity of the channel. - #[allow(clippy::unnecessary_wraps)] // This is intentional. pub(crate) fn capacity(&self) -> Option<usize> { Some(0) } @@ -397,18 +406,18 @@ impl<T> SelectHandle for Receiver<'_, T> { fn register(&self, oper: Operation, cx: &Context) -> bool { let packet = Box::into_raw(Packet::<T>::empty_on_heap()); - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner .receivers - .register_with_packet(oper, packet as *mut (), cx); + .register_with_packet(oper, packet.cast::<()>(), cx); inner.senders.notify(); inner.senders.can_select() || inner.is_disconnected } fn unregister(&self, oper: Operation) { - if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) { + if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) { unsafe { - drop(Box::from_raw(operation.packet as *mut Packet<T>)); + drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); } } } @@ -419,18 +428,18 @@ impl<T> SelectHandle for Receiver<'_, T> { } fn is_ready(&self) -> bool { - let inner = self.0.inner.lock(); + let inner = self.0.inner.lock().unwrap(); inner.senders.can_select() || inner.is_disconnected } fn watch(&self, oper: Operation, cx: &Context) -> bool { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.receivers.watch(oper, cx); inner.senders.can_select() || inner.is_disconnected } fn unwatch(&self, oper: Operation) { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.receivers.unwatch(oper); } } @@ -447,18 +456,18 @@ impl<T> SelectHandle for Sender<'_, T> { fn register(&self, oper: Operation, cx: &Context) -> bool { let packet = Box::into_raw(Packet::<T>::empty_on_heap()); - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner .senders - .register_with_packet(oper, packet as *mut (), cx); + .register_with_packet(oper, packet.cast::<()>(), cx); inner.receivers.notify(); inner.receivers.can_select() || inner.is_disconnected } fn unregister(&self, oper: Operation) { - if let Some(operation) = self.0.inner.lock().senders.unregister(oper) { + if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) { unsafe { - drop(Box::from_raw(operation.packet as *mut Packet<T>)); + drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); } } } @@ -469,18 +478,18 @@ impl<T> SelectHandle for Sender<'_, T> { } fn is_ready(&self) -> bool { - let inner = self.0.inner.lock(); + let inner = self.0.inner.lock().unwrap(); inner.receivers.can_select() || inner.is_disconnected } fn watch(&self, oper: Operation, cx: &Context) -> bool { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.senders.watch(oper, cx); inner.receivers.can_select() || inner.is_disconnected } fn unwatch(&self, oper: Operation) { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.senders.unwatch(oper); } } diff --git a/src/select.rs b/src/select.rs index 6103ef4..3eb0b97 100644 --- a/src/select.rs +++ b/src/select.rs @@ -22,12 +22,13 @@ use crate::utils; // This is a private API that is used by the select macro. #[derive(Debug, Default)] pub struct Token { - pub at: flavors::at::AtToken, - pub array: flavors::array::ArrayToken, - pub list: flavors::list::ListToken, - pub never: flavors::never::NeverToken, - pub tick: flavors::tick::TickToken, - pub zero: flavors::zero::ZeroToken, + pub(crate) at: flavors::at::AtToken, + pub(crate) array: flavors::array::ArrayToken, + pub(crate) list: flavors::list::ListToken, + #[allow(dead_code)] + pub(crate) never: flavors::never::NeverToken, + pub(crate) tick: flavors::tick::TickToken, + pub(crate) zero: flavors::zero::ZeroToken, } /// Identifier associated with an operation by a specific thread on a specific channel. @@ -486,7 +487,10 @@ pub fn select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { - select_deadline(handles, Instant::now() + timeout) + match Instant::now().checked_add(timeout) { + Some(deadline) => select_deadline(handles, deadline), + None => Ok(select(handles)), + } } /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. @@ -518,6 +522,8 @@ pub(crate) fn select_deadline<'a>( /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a /// dynamically created list of channel operations. /// +/// [`select!`]: crate::select! +/// /// Once a list of operations has been built with `Select`, there are two different ways of /// proceeding: /// @@ -1042,7 +1048,10 @@ impl<'a> Select<'a> { /// } /// ``` pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> { - self.ready_deadline(Instant::now() + timeout) + match Instant::now().checked_add(timeout) { + Some(deadline) => self.ready_deadline(deadline), + None => Ok(self.ready()), + } } /// Blocks until a given deadline, or until one of the operations becomes ready. diff --git a/src/select_macro.rs b/src/select_macro.rs index f8b247e..efe0ae4 100644 --- a/src/select_macro.rs +++ b/src/select_macro.rs @@ -121,18 +121,7 @@ macro_rules! crossbeam_channel_internal { }; // Only one case remains. (@list - ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr) - ($($head:tt)*) - ) => { - $crate::crossbeam_channel_internal!( - @list - () - ($($head)* $case ($($args)*) $(-> $res)* => { $body },) - ) - }; - // Accept a trailing comma at the end of the list. - (@list - ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr,) + ($case:ident ($($args:tt)*) $(-> $res:pat)* => $body:expr $(,)?) ($($head:tt)*) ) => { $crate::crossbeam_channel_internal!( @@ -373,20 +362,7 @@ macro_rules! crossbeam_channel_internal { // Check the format of a recv case. (@case - (recv($r:expr) -> $res:pat => $body:tt, $($tail:tt)*) - ($($cases:tt)*) - $default:tt - ) => { - $crate::crossbeam_channel_internal!( - @case - ($($tail)*) - ($($cases)* recv($r) -> $res => $body,) - $default - ) - }; - // Allow trailing comma... - (@case - (recv($r:expr,) -> $res:pat => $body:tt, $($tail:tt)*) + (recv($r:expr $(,)?) -> $res:pat => $body:tt, $($tail:tt)*) ($($cases:tt)*) $default:tt ) => { @@ -428,20 +404,7 @@ macro_rules! crossbeam_channel_internal { // Check the format of a send case. (@case - (send($s:expr, $m:expr) -> $res:pat => $body:tt, $($tail:tt)*) - ($($cases:tt)*) - $default:tt - ) => { - $crate::crossbeam_channel_internal!( - @case - ($($tail)*) - ($($cases)* send($s, $m) -> $res => $body,) - $default - ) - }; - // Allow trailing comma... - (@case - (send($s:expr, $m:expr,) -> $res:pat => $body:tt, $($tail:tt)*) + (send($s:expr, $m:expr $(,)?) -> $res:pat => $body:tt, $($tail:tt)*) ($($cases:tt)*) $default:tt ) => { @@ -496,20 +459,7 @@ macro_rules! crossbeam_channel_internal { }; // Check the format of a default case with timeout. (@case - (default($timeout:expr) => $body:tt, $($tail:tt)*) - $cases:tt - () - ) => { - $crate::crossbeam_channel_internal!( - @case - ($($tail)*) - $cases - (default($timeout) => $body,) - ) - }; - // Allow trailing comma... - (@case - (default($timeout:expr,) => $body:tt, $($tail:tt)*) + (default($timeout:expr $(,)?) => $body:tt, $($tail:tt)*) $cases:tt () ) => { @@ -1043,7 +993,7 @@ macro_rules! crossbeam_channel_internal { /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even /// when it will simply return an error because the channel is disconnected. /// -/// The `select` macro is a convenience wrapper around [`Select`]. However, it cannot select over a +/// The `select!` macro is a convenience wrapper around [`Select`]. However, it cannot select over a /// dynamically created list of channel operations. /// /// [`Select`]: super::Select diff --git a/src/utils.rs b/src/utils.rs index 557b6a0..f623f27 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,14 +1,10 @@ //! Miscellaneous utilities. -use std::cell::{Cell, UnsafeCell}; +use std::cell::Cell; use std::num::Wrapping; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::{Duration, Instant}; -use crossbeam_utils::Backoff; - /// Randomly shuffles a slice. pub(crate) fn shuffle<T>(v: &mut [T]) { let len = v.len(); @@ -60,53 +56,3 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) { } } } - -/// A simple spinlock. -pub(crate) struct Spinlock<T> { - flag: AtomicBool, - value: UnsafeCell<T>, -} - -impl<T> Spinlock<T> { - /// Returns a new spinlock initialized with `value`. - pub(crate) fn new(value: T) -> Spinlock<T> { - Spinlock { - flag: AtomicBool::new(false), - value: UnsafeCell::new(value), - } - } - - /// Locks the spinlock. - pub(crate) fn lock(&self) -> SpinlockGuard<'_, T> { - let backoff = Backoff::new(); - while self.flag.swap(true, Ordering::Acquire) { - backoff.snooze(); - } - SpinlockGuard { parent: self } - } -} - -/// A guard holding a spinlock locked. -pub(crate) struct SpinlockGuard<'a, T> { - parent: &'a Spinlock<T>, -} - -impl<T> Drop for SpinlockGuard<'_, T> { - fn drop(&mut self) { - self.parent.flag.store(false, Ordering::Release); - } -} - -impl<T> Deref for SpinlockGuard<'_, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.parent.value.get() } - } -} - -impl<T> DerefMut for SpinlockGuard<'_, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.parent.value.get() } - } -} diff --git a/src/waker.rs b/src/waker.rs index dec73a9..7eb58ba 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -2,11 +2,11 @@ use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; use std::thread::{self, ThreadId}; use crate::context::Context; use crate::select::{Operation, Selected}; -use crate::utils::Spinlock; /// Represents a thread blocked on a specific channel operation. pub(crate) struct Entry { @@ -77,26 +77,32 @@ impl Waker { /// Attempts to find another thread's entry, select the operation, and wake it up. #[inline] pub(crate) fn try_select(&mut self) -> Option<Entry> { - self.selectors - .iter() - .position(|selector| { - // Does the entry belong to a different thread? - selector.cx.thread_id() != current_thread_id() - && selector // Try selecting this operation. - .cx - .try_select(Selected::Operation(selector.oper)) - .is_ok() - && { - // Provide the packet. - selector.cx.store_packet(selector.packet); - // Wake the thread up. - selector.cx.unpark(); - true - } - }) - // Remove the entry from the queue to keep it clean and improve - // performance. - .map(|pos| self.selectors.remove(pos)) + if self.selectors.is_empty() { + None + } else { + let thread_id = current_thread_id(); + + self.selectors + .iter() + .position(|selector| { + // Does the entry belong to a different thread? + selector.cx.thread_id() != thread_id + && selector // Try selecting this operation. + .cx + .try_select(Selected::Operation(selector.oper)) + .is_ok() + && { + // Provide the packet. + selector.cx.store_packet(selector.packet); + // Wake the thread up. + selector.cx.unpark(); + true + } + }) + // Remove the entry from the queue to keep it clean and improve + // performance. + .map(|pos| self.selectors.remove(pos)) + } } /// Returns `true` if there is an entry which can be selected by the current thread. @@ -170,7 +176,7 @@ impl Drop for Waker { /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. pub(crate) struct SyncWaker { /// The inner `Waker`. - inner: Spinlock<Waker>, + inner: Mutex<Waker>, /// `true` if the waker is empty. is_empty: AtomicBool, @@ -181,7 +187,7 @@ impl SyncWaker { #[inline] pub(crate) fn new() -> Self { SyncWaker { - inner: Spinlock::new(Waker::new()), + inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true), } } @@ -189,7 +195,7 @@ impl SyncWaker { /// Registers the current thread with an operation. #[inline] pub(crate) fn register(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.register(oper, cx); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -200,7 +206,7 @@ impl SyncWaker { /// Unregisters an operation previously registered by the current thread. #[inline] pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); let entry = inner.unregister(oper); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -213,7 +219,7 @@ impl SyncWaker { #[inline] pub(crate) fn notify(&self) { if !self.is_empty.load(Ordering::SeqCst) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); if !self.is_empty.load(Ordering::SeqCst) { inner.try_select(); inner.notify(); @@ -228,7 +234,7 @@ impl SyncWaker { /// Registers an operation waiting to be ready. #[inline] pub(crate) fn watch(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.watch(oper, cx); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -239,7 +245,7 @@ impl SyncWaker { /// Unregisters an operation waiting to be ready. #[inline] pub(crate) fn unwatch(&self, oper: Operation) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.unwatch(oper); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -250,7 +256,7 @@ impl SyncWaker { /// Notifies all threads that the channel is disconnected. #[inline] pub(crate) fn disconnect(&self) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.disconnect(); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), diff --git a/tests/array.rs b/tests/array.rs index bb2cebe..6fd8ffc 100644 --- a/tests/array.rs +++ b/tests/array.rs @@ -1,7 +1,5 @@ //! Tests for the array channel flavor. -#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow - use std::any::Any; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -254,7 +252,13 @@ fn recv_after_disconnect() { #[test] fn len() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] const COUNT: usize = 25_000; + #[cfg(miri)] + const CAP: usize = 50; + #[cfg(not(miri))] const CAP: usize = 1000; let (s, r) = bounded(CAP); @@ -347,6 +351,9 @@ fn disconnect_wakes_receiver() { #[test] fn spsc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; let (s, r) = bounded(3); @@ -369,6 +376,9 @@ fn spsc() { #[test] fn mpmc() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -401,6 +411,9 @@ fn mpmc() { #[test] fn stress_oneshot() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; for _ in 0..COUNT { @@ -416,6 +429,9 @@ fn stress_oneshot() { #[test] fn stress_iter() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 100_000; let (request_s, request_r) = bounded(1); @@ -483,7 +499,14 @@ fn stress_timeout_two_threads() { #[test] fn drops() { + #[cfg(miri)] + const RUNS: usize = 10; + #[cfg(not(miri))] const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] + const STEPS: usize = 10_000; static DROPS: AtomicUsize = AtomicUsize::new(0); @@ -499,7 +522,7 @@ fn drops() { let mut rng = thread_rng(); for _ in 0..RUNS { - let steps = rng.gen_range(0..10_000); + let steps = rng.gen_range(0..STEPS); let additional = rng.gen_range(0..50); DROPS.store(0, Ordering::SeqCst); @@ -533,6 +556,9 @@ fn drops() { #[test] fn linearizable() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -553,6 +579,9 @@ fn linearizable() { #[test] fn fairness() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded::<()>(COUNT); @@ -575,6 +604,9 @@ fn fairness() { #[test] fn fairness_duplicates() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 10_000; let (s, r) = bounded::<()>(COUNT); @@ -619,6 +651,9 @@ fn recv_in_send() { #[test] fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] const COUNT: usize = 1000; type T = Box<dyn Any + Send>; @@ -654,3 +689,56 @@ fn channel_through_channel() { }) .unwrap(); } + +#[test] +fn panic_on_drop() { + struct Msg1<'a>(&'a mut bool); + impl Drop for Msg1<'_> { + fn drop(&mut self) { + if *self.0 && !std::thread::panicking() { + panic!("double drop"); + } else { + *self.0 = true; + } + } + } + + struct Msg2<'a>(&'a mut bool); + impl Drop for Msg2<'_> { + fn drop(&mut self) { + if *self.0 { + panic!("double drop"); + } else { + *self.0 = true; + panic!("first drop"); + } + } + } + + // normal + let (s, r) = bounded(2); + let (mut a, mut b) = (false, false); + s.send(Msg1(&mut a)).unwrap(); + s.send(Msg1(&mut b)).unwrap(); + drop(s); + drop(r); + assert!(a); + assert!(b); + + // panic on drop + let (s, r) = bounded(2); + let (mut a, mut b) = (false, false); + s.send(Msg2(&mut a)).unwrap(); + s.send(Msg2(&mut b)).unwrap(); + drop(s); + let res = std::panic::catch_unwind(move || { + drop(r); + }); + assert_eq!( + *res.unwrap_err().downcast_ref::<&str>().unwrap(), + "first drop" + ); + assert!(a); + // Elements after the panicked element will leak. + assert!(!b); +} diff --git a/tests/golang.rs b/tests/golang.rs index 05d67f6..41149f4 100644 --- a/tests/golang.rs +++ b/tests/golang.rs @@ -9,18 +9,18 @@ //! - https://golang.org/LICENSE //! - https://golang.org/PATENTS -#![allow(clippy::mutex_atomic, clippy::redundant_clone)] +#![allow(clippy::redundant_clone)] use std::alloc::{GlobalAlloc, Layout, System}; use std::any::Any; use std::cell::Cell; use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering::SeqCst}; +use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; -use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender}; +use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender}; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) @@ -32,7 +32,13 @@ struct Chan<T> { struct ChanInner<T> { s: Option<Sender<T>>, - r: Receiver<T>, + r: Option<Receiver<T>>, + // Receiver to use when r is None (Go blocks on receiving from nil) + nil_r: Receiver<T>, + // Sender to use when s is None (Go blocks on sending to nil) + nil_s: Sender<T>, + // Hold this receiver to prevent nil sender channel from disconnection + _nil_sr: Receiver<T>, } impl<T> Clone for Chan<T> { @@ -57,35 +63,53 @@ impl<T> Chan<T> { } fn try_recv(&self) -> Option<T> { - let r = self.inner.lock().unwrap().r.clone(); + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); r.try_recv().ok() } fn recv(&self) -> Option<T> { - let r = self.inner.lock().unwrap().r.clone(); + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); r.recv().ok() } - fn close(&self) { + fn close_s(&self) { self.inner .lock() .unwrap() .s .take() - .expect("channel already closed"); + .expect("channel sender already closed"); + } + + fn close_r(&self) { + self.inner + .lock() + .unwrap() + .r + .take() + .expect("channel receiver already closed"); + } + + fn has_rx(&self) -> bool { + self.inner.lock().unwrap().r.is_some() + } + + fn has_tx(&self) -> bool { + self.inner.lock().unwrap().s.is_some() } fn rx(&self) -> Receiver<T> { - self.inner.lock().unwrap().r.clone() + let inner = self.inner.lock().unwrap(); + match inner.r.as_ref() { + None => inner.nil_r.clone(), + Some(r) => r.clone(), + } } fn tx(&self) -> Sender<T> { - match self.inner.lock().unwrap().s.as_ref() { - None => { - let (s, r) = bounded(0); - std::mem::forget(r); - s - } + let inner = self.inner.lock().unwrap(); + match inner.s.as_ref() { + None => inner.nil_s.clone(), Some(s) => s.clone(), } } @@ -110,17 +134,32 @@ impl<'a, T> IntoIterator for &'a Chan<T> { fn make<T>(cap: usize) -> Chan<T> { let (s, r) = bounded(cap); + let (nil_s, _nil_sr) = bounded(0); Chan { - inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), } } fn make_unbounded<T>() -> Chan<T> { let (s, r) = unbounded(); + let (nil_s, _nil_sr) = bounded(0); Chan { - inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), } } + #[derive(Clone)] struct WaitGroup(Arc<WaitGroupInner>); @@ -199,14 +238,6 @@ macro_rules! defer { } macro_rules! go { - (@parse ref $v:ident, $($tail:tt)*) => {{ - let ref $v = $v; - go!(@parse $($tail)*) - }}; - (@parse move $v:ident, $($tail:tt)*) => {{ - let $v = $v; - go!(@parse $($tail)*) - }}; (@parse $v:ident, $($tail:tt)*) => {{ let $v = $v.clone(); go!(@parse $($tail)*) @@ -240,10 +271,10 @@ mod doubleselect { const ITERATIONS: i32 = 10_000; fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) { - defer! { c1.close() } - defer! { c2.close() } - defer! { c3.close() } - defer! { c4.close() } + defer! { c1.close_s() } + defer! { c2.close_s() } + defer! { c3.close_s() } + defer! { c4.close_s() } for i in 0..n { select! { @@ -292,7 +323,7 @@ mod doubleselect { done.recv(); done.recv(); done.recv(); - cmux.close(); + cmux.close_s(); }); recver(cmux); } @@ -697,7 +728,7 @@ mod select2 { use super::*; #[cfg(miri)] - const N: i32 = 1000; + const N: i32 = 200; #[cfg(not(miri))] const N: i32 = 100000; @@ -892,6 +923,9 @@ mod sieve1 { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, ]; + #[cfg(miri)] + let a = &a[..10]; + for item in a.iter() { let x = primes.recv().unwrap(); if x != *item { @@ -925,10 +959,15 @@ mod chan_test { #[test] fn test_chan() { #[cfg(miri)] - const N: i32 = 20; + const N: i32 = 12; #[cfg(not(miri))] const N: i32 = 200; + #[cfg(miri)] + const MESSAGES_COUNT: i32 = 20; + #[cfg(not(miri))] + const MESSAGES_COUNT: i32 = 100; + for cap in 0..N { { // Ensure that receive from empty chan blocks. @@ -999,7 +1038,7 @@ mod chan_test { for i in 0..cap { c.send(i); } - c.close(); + c.close_s(); for i in 0..cap { let v = c.recv(); @@ -1027,7 +1066,7 @@ mod chan_test { }); thread::sleep(ms(1)); - c.close(); + c.close_s(); if !done.recv().unwrap() { panic!(); @@ -1035,15 +1074,15 @@ mod chan_test { } { - // Send 100 integers, + // Send many integers, // ensure that we receive them non-corrupted in FIFO order. let c = make::<i32>(cap as usize); go!(c, { - for i in 0..100 { + for i in 0..MESSAGES_COUNT { c.send(i); } }); - for i in 0..100 { + for i in 0..MESSAGES_COUNT { if c.recv() != Some(i) { panic!(); } @@ -1051,11 +1090,11 @@ mod chan_test { // Same, but using recv2. go!(c, { - for i in 0..100 { + for i in 0..MESSAGES_COUNT { c.send(i); } }); - for i in 0..100 { + for i in 0..MESSAGES_COUNT { if c.recv() != Some(i) { panic!(); } @@ -1082,7 +1121,7 @@ mod chan_test { } }); - c.close(); + c.close_s(); c.recv(); t.join().unwrap(); } @@ -1149,7 +1188,7 @@ mod chan_test { done.send(true); }); - c2.close(); + c2.close_s(); select! { recv(c1.rx()) -> _ => {} default => {} @@ -1378,7 +1417,7 @@ mod chan_test { ); } - done.close(); + done.close_s(); wg.wait(); } @@ -1450,7 +1489,7 @@ mod chan_test { fn test_multi_consumer() { const NWORK: usize = 23; #[cfg(miri)] - const NITER: usize = 100; + const NITER: usize = 50; #[cfg(not(miri))] const NITER: usize = 271828; @@ -1481,9 +1520,9 @@ mod chan_test { *expect.lock().unwrap() += v; q.send(v); } - q.close(); + q.close_s(); wg.wait(); - r.close(); + r.close_s(); }); let mut n = 0; @@ -1542,7 +1581,502 @@ mod race_chan_test { // https://github.com/golang/go/blob/master/test/ken/chan.go mod chan { - // TODO + use super::*; + + const MESSAGES_PER_CHANEL: u32 = 76; + const MESSAGES_RANGE_LEN: u32 = 100; + const END: i32 = 10000; + + struct ChanWithVals { + chan: Chan<i32>, + /// Next value to send + sv: Arc<AtomicI32>, + /// Next value to receive + rv: Arc<AtomicI32>, + } + + struct Totals { + /// Total sent messages + tots: u32, + /// Total received messages + totr: u32, + } + + struct Context { + nproc: Arc<Mutex<i32>>, + cval: Arc<Mutex<i32>>, + tot: Arc<Mutex<Totals>>, + nc: ChanWithVals, + randx: Arc<Mutex<i32>>, + } + + impl ChanWithVals { + fn with_capacity(capacity: usize) -> Self { + ChanWithVals { + chan: make(capacity), + sv: Arc::new(AtomicI32::new(0)), + rv: Arc::new(AtomicI32::new(0)), + } + } + + fn closed() -> Self { + let ch = ChanWithVals::with_capacity(0); + ch.chan.close_r(); + ch.chan.close_s(); + ch + } + + fn rv(&self) -> i32 { + self.rv.load(SeqCst) + } + + fn sv(&self) -> i32 { + self.sv.load(SeqCst) + } + + fn send(&mut self, tot: &Mutex<Totals>) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.tots += 1 + } + let esv = expect(self.sv(), self.sv()); + self.sv.store(esv, SeqCst); + if self.sv() == END { + self.chan.close_s(); + return true; + } + false + } + + fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.totr += 1 + } + let erv = expect(self.rv(), v); + self.rv.store(erv, SeqCst); + if self.rv() == END { + self.chan.close_r(); + return true; + } + false + } + } + + impl Clone for ChanWithVals { + fn clone(&self) -> Self { + ChanWithVals { + chan: self.chan.clone(), + sv: self.sv.clone(), + rv: self.rv.clone(), + } + } + } + + impl Context { + fn nproc(&self) -> &Mutex<i32> { + self.nproc.as_ref() + } + + fn cval(&self) -> &Mutex<i32> { + self.cval.as_ref() + } + + fn tot(&self) -> &Mutex<Totals> { + self.tot.as_ref() + } + + fn randx(&self) -> &Mutex<i32> { + self.randx.as_ref() + } + } + + impl Clone for Context { + fn clone(&self) -> Self { + Context { + nproc: self.nproc.clone(), + cval: self.cval.clone(), + tot: self.tot.clone(), + nc: self.nc.clone(), + randx: self.randx.clone(), + } + } + } + + fn nrand(n: i32, randx: &Mutex<i32>) -> i32 { + let mut randx = randx.lock().unwrap(); + *randx += 10007; + if *randx >= 1000000 { + *randx -= 1000000 + } + *randx % n + } + + fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 { + let mut nproc = nproc.lock().unwrap(); + *nproc += adjust; + *nproc + } + + fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> { + let mut ca = Vec::<ChanWithVals>::with_capacity(n); + let mut cval = cval.lock().unwrap(); + for _ in 0..n { + *cval += MESSAGES_RANGE_LEN as i32; + let chl = ChanWithVals::with_capacity(c); + chl.sv.store(*cval, SeqCst); + chl.rv.store(*cval, SeqCst); + ca.push(chl); + } + ca + } + + fn expect(v: i32, v0: i32) -> i32 { + if v == v0 { + return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 { + END + } else { + v + 1 + }; + } + panic!("got {}, expected {}", v, v0 + 1); + } + + fn send(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in 0..=nrand(10, ctx.randx()) { + thread::yield_now(); + } + c.chan.tx().send(c.sv()).unwrap(); + if c.send(ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn recv(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in (0..nrand(10, ctx.randx())).rev() { + thread::yield_now(); + } + let v = c.chan.rx().recv().unwrap(); + if c.recv(v, ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + #[allow(clippy::too_many_arguments)] + fn sel( + mut r0: ChanWithVals, + mut r1: ChanWithVals, + mut r2: ChanWithVals, + mut r3: ChanWithVals, + mut s0: ChanWithVals, + mut s1: ChanWithVals, + mut s2: ChanWithVals, + mut s3: ChanWithVals, + ctx: Context, + ) { + let mut a = 0; // local chans running + + if r0.chan.has_rx() { + a += 1; + } + if r1.chan.has_rx() { + a += 1; + } + if r2.chan.has_rx() { + a += 1; + } + if r3.chan.has_rx() { + a += 1; + } + if s0.chan.has_tx() { + a += 1; + } + if s1.chan.has_tx() { + a += 1; + } + if s2.chan.has_tx() { + a += 1; + } + if s3.chan.has_tx() { + a += 1; + } + + loop { + for _ in 0..=nrand(5, ctx.randx()) { + thread::yield_now(); + } + select! { + recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 }, + send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 }, + send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 }, + send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 }, + } + if a == 0 { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals { + vec.get(idx).unwrap().clone() + } + + /// Direct send to direct recv + fn test1(c: ChanWithVals, ctx: &mut Context) { + change_nproc(2, ctx.nproc()); + go!(c, ctx, send(c, ctx)); + go!(c, ctx, recv(c, ctx)); + } + + /// Direct send to select recv + fn test2(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 0), ctx)); + go!(ca, ctx, send(get(&ca, 1), ctx)); + go!(ca, ctx, send(get(&ca, 2), ctx)); + go!(ca, ctx, send(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to direct recv + fn test3(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 0), ctx)); + go!(ca, ctx, recv(get(&ca, 1), ctx)); + go!(ca, ctx, recv(get(&ca, 2), ctx)); + go!(ca, ctx, recv(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + } + + /// Select send to select recv, 4 channels + fn test4(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to select recv, 8 channels + fn test5(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 8, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + ctx, + ) + ); + } + + // Direct and select send to direct and select recv + fn test6(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 12, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 4), ctx)); + go!(ca, ctx, send(get(&ca, 5), ctx)); + go!(ca, ctx, send(get(&ca, 6), ctx)); + go!(ca, ctx, send(get(&ca, 7), ctx)); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 8), ctx)); + go!(ca, ctx, recv(get(&ca, 9), ctx)); + go!(ca, ctx, recv(get(&ca, 10), ctx)); + go!(ca, ctx, recv(get(&ca, 11), ctx)); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 8), + get(&ca, 9), + get(&ca, 10), + get(&ca, 11), + ctx, + ) + ); + } + + fn wait(ctx: &mut Context) { + thread::yield_now(); + while change_nproc(0, ctx.nproc()) != 0 { + thread::yield_now(); + } + } + + fn tests(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + test1(get(&ca, 0), ctx); + test1(get(&ca, 1), ctx); + test1(get(&ca, 2), ctx); + test1(get(&ca, 3), ctx); + wait(ctx); + + test2(c, ctx); + wait(ctx); + + test3(c, ctx); + wait(ctx); + + test4(c, ctx); + wait(ctx); + + test5(c, ctx); + wait(ctx); + + test6(c, ctx); + wait(ctx); + } + + #[test] + #[cfg_attr(miri, ignore)] // Miri is too slow + fn main() { + let mut ctx = Context { + nproc: Arc::new(Mutex::new(0)), + cval: Arc::new(Mutex::new(0)), + tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })), + nc: ChanWithVals::closed(), + randx: Arc::new(Mutex::new(0)), + }; + + tests(0, &mut ctx); + tests(1, &mut ctx); + tests(10, &mut ctx); + tests(100, &mut ctx); + + #[rustfmt::skip] + let t = 4 * // buffer sizes + (4*4 + // tests 1,2,3,4 channels + 8 + // test 5 channels + 12) * // test 6 channels + MESSAGES_PER_CHANEL; // sends/recvs on a channel + + let tot = ctx.tot.lock().unwrap(); + if tot.tots != t || tot.totr != t { + panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t); + } + } } // https://github.com/golang/go/blob/master/test/ken/chan1.go @@ -1551,7 +2085,7 @@ mod chan1 { // sent messages #[cfg(miri)] - const N: usize = 100; + const N: usize = 20; #[cfg(not(miri))] const N: usize = 1000; // receiving "goroutines" diff --git a/tests/list.rs b/tests/list.rs index 619e1fc..ebe6f6f 100644 --- a/tests/list.rs +++ b/tests/list.rs @@ -67,6 +67,7 @@ fn len_empty_full() { } #[test] +#[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them fn try_recv() { let (s, r) = unbounded(); @@ -132,8 +133,13 @@ fn recv_timeout() { #[test] fn try_send() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + let (s, r) = unbounded(); - for i in 0..1000 { + for i in 0..COUNT { assert_eq!(s.try_send(i), Ok(())); } @@ -143,8 +149,13 @@ fn try_send() { #[test] fn send() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + let (s, r) = unbounded(); - for i in 0..1000 { + for i in 0..COUNT { assert_eq!(s.send(i), Ok(())); } @@ -154,8 +165,13 @@ fn send() { #[test] fn send_timeout() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + let (s, r) = unbounded(); - for i in 0..1000 { + for i in 0..COUNT { assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(())); } @@ -383,10 +399,16 @@ fn stress_timeout_two_threads() { .unwrap(); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { + #[cfg(miri)] + const RUNS: usize = 20; + #[cfg(not(miri))] const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] + const STEPS: usize = 10_000; static DROPS: AtomicUsize = AtomicUsize::new(0); @@ -402,8 +424,8 @@ fn drops() { let mut rng = thread_rng(); for _ in 0..RUNS { - let steps = rng.gen_range(0..10_000); - let additional = rng.gen_range(0..1000); + let steps = rng.gen_range(0..STEPS); + let additional = rng.gen_range(0..STEPS / 10); DROPS.store(0, Ordering::SeqCst); let (s, r) = unbounded::<DropCounter>(); diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 4d6e179..d7cc8e2 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -321,7 +321,7 @@ mod channel_tests { #[test] fn stress() { #[cfg(miri)] - const COUNT: usize = 500; + const COUNT: usize = 100; #[cfg(not(miri))] const COUNT: usize = 10000; @@ -339,25 +339,22 @@ mod channel_tests { #[test] fn stress_shared() { - #[cfg(miri)] - const AMT: u32 = 500; - #[cfg(not(miri))] - const AMT: u32 = 10000; - const NTHREADS: u32 = 8; + let amt: u32 = if cfg!(miri) { 100 } else { 10_000 }; + let nthreads: u32 = if cfg!(miri) { 4 } else { 8 }; let (tx, rx) = channel::<i32>(); let t = thread::spawn(move || { - for _ in 0..AMT * NTHREADS { + for _ in 0..amt * nthreads { assert_eq!(rx.recv().unwrap(), 1); } assert!(rx.try_recv().is_err()); }); - let mut ts = Vec::with_capacity(NTHREADS as usize); - for _ in 0..NTHREADS { + let mut ts = Vec::with_capacity(nthreads as usize); + for _ in 0..nthreads { let tx = tx.clone(); let t = thread::spawn(move || { - for _ in 0..AMT { + for _ in 0..amt { tx.send(1).unwrap(); } }); @@ -747,7 +744,7 @@ mod channel_tests { #[test] fn recv_a_lot() { #[cfg(miri)] - const N: usize = 100; + const N: usize = 50; #[cfg(not(miri))] const N: usize = 10000; diff --git a/tests/ready.rs b/tests/ready.rs index d8dd6ce..6e3fb2b 100644 --- a/tests/ready.rs +++ b/tests/ready.rs @@ -229,6 +229,7 @@ fn default_when_disconnected() { } #[test] +#[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them fn default_only() { let start = Instant::now(); diff --git a/tests/select.rs b/tests/select.rs index f24aed8..bc5824d 100644 --- a/tests/select.rs +++ b/tests/select.rs @@ -408,7 +408,6 @@ fn both_ready() { .unwrap(); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn loop_try() { const RUNS: usize = 20; @@ -694,7 +693,7 @@ fn nesting() { #[test] fn stress_recv() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -735,7 +734,7 @@ fn stress_recv() { #[test] fn stress_send() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -953,7 +952,7 @@ fn matching_with_leftover() { #[test] fn channel_through_channel() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 1000; @@ -1014,7 +1013,7 @@ fn channel_through_channel() { #[test] fn linearizable_try() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 100_000; @@ -1069,7 +1068,7 @@ fn linearizable_try() { #[test] fn linearizable_timeout() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 100_000; @@ -1124,7 +1123,7 @@ fn linearizable_timeout() { #[test] fn fairness1() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -1173,7 +1172,7 @@ fn fairness1() { #[test] fn fairness2() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -1292,7 +1291,7 @@ fn send_and_clone() { #[test] fn reuse() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; diff --git a/tests/select_macro.rs b/tests/select_macro.rs index 0b9a21a..119454c 100644 --- a/tests/select_macro.rs +++ b/tests/select_macro.rs @@ -284,7 +284,6 @@ fn both_ready() { .unwrap(); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn loop_try() { const RUNS: usize = 20; @@ -488,7 +487,7 @@ fn panic_receiver() { #[test] fn stress_recv() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -1468,3 +1467,14 @@ fn disconnect_wakes_receiver() { }) .unwrap(); } + +#[test] +fn trailing_comma() { + let (s, r) = unbounded::<usize>(); + + select! { + send(s, 1,) -> _ => {}, + recv(r,) -> _ => {}, + default(ms(1000),) => {}, + } +} diff --git a/tests/thread_locals.rs b/tests/thread_locals.rs index effb6a1..fb4e577 100644 --- a/tests/thread_locals.rs +++ b/tests/thread_locals.rs @@ -1,6 +1,6 @@ //! Tests that make sure accessing thread-locals while exiting the thread doesn't cause panics. -#![cfg(not(miri))] // error: abnormal termination: the evaluated program aborted execution +#![cfg(not(miri))] // Miri detects that this test is buggy: the destructor of `FOO` uses `std::thread::current()`! use std::thread; use std::time::Duration; diff --git a/tests/zero.rs b/tests/zero.rs index ba41b1a..74c9a3e 100644 --- a/tests/zero.rs +++ b/tests/zero.rs @@ -188,7 +188,7 @@ fn send_timeout() { #[test] fn len() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 25_000; @@ -253,7 +253,7 @@ fn disconnect_wakes_receiver() { #[test] fn spsc() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 100_000; @@ -278,7 +278,7 @@ fn spsc() { #[test] fn mpmc() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -313,7 +313,7 @@ fn mpmc() { #[test] fn stress_oneshot() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -328,9 +328,11 @@ fn stress_oneshot() { } } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_iter() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] const COUNT: usize = 1000; let (request_s, request_r) = bounded(0); @@ -396,10 +398,16 @@ fn stress_timeout_two_threads() { .unwrap(); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { + #[cfg(miri)] + const RUNS: usize = 20; + #[cfg(not(miri))] const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] + const STEPS: usize = 10_000; static DROPS: AtomicUsize = AtomicUsize::new(0); @@ -415,7 +423,7 @@ fn drops() { let mut rng = thread_rng(); for _ in 0..RUNS { - let steps = rng.gen_range(0..3_000); + let steps = rng.gen_range(0..STEPS); DROPS.store(0, Ordering::SeqCst); let (s, r) = bounded::<DropCounter>(0); @@ -445,7 +453,7 @@ fn drops() { #[test] fn fairness() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 10_000; @@ -540,7 +548,7 @@ fn recv_in_send() { #[test] fn channel_through_channel() { #[cfg(miri)] - const COUNT: usize = 100; + const COUNT: usize = 50; #[cfg(not(miri))] const COUNT: usize = 1000; |