diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 04:54:54 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 04:54:54 +0000 |
commit | 942ea2a730368dfb6c7f8b39e04b846c25657af1 (patch) | |
tree | a455806bb80ea81083a9686ce41a5c8dc7164ae6 | |
parent | 3d20b75eda8856d568271db5b2868a6b4012fa2b (diff) | |
parent | fa6d4166c29101129730d31dd1f7fa973ec5cba0 (diff) | |
download | crossbeam-deque-android14-mainline-media-swcodec-release.tar.gz |
Snap for 10453563 from fa6d4166c29101129730d31dd1f7fa973ec5cba0 to mainline-media-swcodec-releaseaml_swc_341711000aml_swc_341619000aml_swc_341513600aml_swc_341312300aml_swc_341312020aml_swc_341111000aml_swc_341011020aml_swc_340922010android14-mainline-media-swcodec-release
Change-Id: I7b77d1f9ac7829154123d05ddbf5a48a65df473d
-rw-r--r-- | .cargo_vcs_info.json | 7 | ||||
-rw-r--r-- | Android.bp | 10 | ||||
-rw-r--r-- | CHANGELOG.md | 23 | ||||
-rw-r--r-- | Cargo.toml | 26 | ||||
-rw-r--r-- | Cargo.toml.orig | 5 | ||||
-rw-r--r-- | METADATA | 14 | ||||
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | TEST_MAPPING | 3 | ||||
-rw-r--r-- | src/deque.rs | 248 | ||||
-rw-r--r-- | tests/fifo.rs | 21 | ||||
-rw-r--r-- | tests/injector.rs | 28 | ||||
-rw-r--r-- | tests/lifo.rs | 23 |
12 files changed, 341 insertions, 71 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 9716123..ee1b175 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "0e2a930eac3586ab52498413310c45af6c67d830" - } -} + "sha1": "721382b00b5dadd81954ed66764d547e2f1bb7a3" + }, + "path_in_vcs": "crossbeam-deque" +}
\ No newline at end of file @@ -43,7 +43,7 @@ rust_defaults { name: "crossbeam-deque_test_defaults", crate_name: "crossbeam_deque", cargo_env_compat: true, - cargo_pkg_version: "0.8.1", + cargo_pkg_version: "0.8.3", test_suites: ["general-tests"], auto_gen_config: true, edition: "2018", @@ -107,7 +107,7 @@ rust_library { host_supported: true, crate_name: "crossbeam_deque", cargo_env_compat: true, - cargo_pkg_version: "0.8.1", + cargo_pkg_version: "0.8.3", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -121,4 +121,10 @@ rust_library { "libcrossbeam_epoch", "libcrossbeam_utils", ], + apex_available: [ + "//apex_available:platform", + "//apex_available:anyapex", + ], + product_available: true, + vendor_available: true, } diff --git a/CHANGELOG.md b/CHANGELOG.md index 14dcc20..0937d19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# Version 0.8.3 + +- Add `Stealer::{steal_batch_with_limit, steal_batch_with_limit_and_pop}` methods. (#903) +- Add `Injector::{steal_batch_with_limit, steal_batch_with_limit_and_pop}` methods. (#903) + +# Version 0.8.2 + +- Bump the minimum supported Rust version to 1.38. (#877) + # Version 0.8.1 - Fix deque steal race condition. (#726) @@ -5,26 +14,40 @@ # Version 0.8.0 +**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. + - Bump the minimum supported Rust version to 1.36. - Add `Worker::len()` and `Injector::len()` methods. - Add `std` (enabled by default) feature for forward compatibility. +# Version 0.7.4 + +- Fix deque steal race condition. + # Version 0.7.3 +**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. + - Stop stealing from the same deque. (#448) - Fix unsoundness issues by adopting `MaybeUninit`. (#458) # Version 0.7.2 +**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. + - Bump `crossbeam-epoch` to `0.8`. - Bump `crossbeam-utils` to `0.7`. # Version 0.7.1 +**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. + - Bump the minimum required version of `crossbeam-utils`. # Version 0.7.0 +**Note:** This release has been yanked. See [GHSA-pqqp-xmhj-wgcw](https://github.com/crossbeam-rs/crossbeam/security/advisories/GHSA-pqqp-xmhj-wgcw) for details. + - Make `Worker::pop()` faster in the FIFO case. - Replace `fifo()` nad `lifo()` with `Worker::new_fifo()` and `Worker::new_lifo()`. - Add more batched steal methods. @@ -11,16 +11,26 @@ [package] edition = "2018" +rust-version = "1.38" name = "crossbeam-deque" -version = "0.8.1" -authors = ["The Crossbeam Project Developers"] +version = "0.8.3" description = "Concurrent work-stealing deque" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque" -documentation = "https://docs.rs/crossbeam-deque" -keywords = ["chase-lev", "lock-free", "scheduler", "scheduling"] -categories = ["algorithms", "concurrency", "data-structures"] +readme = "README.md" +keywords = [ + "chase-lev", + "lock-free", + "scheduler", + "scheduling", +] +categories = [ + "algorithms", + "concurrency", + "data-structures", +] license = "MIT OR Apache-2.0" repository = "https://github.com/crossbeam-rs/crossbeam" + [dependencies.cfg-if] version = "1" @@ -33,9 +43,13 @@ default-features = false version = "0.8" optional = true default-features = false + [dev-dependencies.rand] version = "0.8" [features] default = ["std"] -std = ["crossbeam-epoch/std", "crossbeam-utils/std"] +std = [ + "crossbeam-epoch/std", + "crossbeam-utils/std", +] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 572ddfd..805a7e0 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,13 +4,12 @@ name = "crossbeam-deque" # - Update CHANGELOG.md # - Update README.md # - Create "crossbeam-deque-X.Y.Z" git tag -version = "0.8.1" -authors = ["The Crossbeam Project Developers"] +version = "0.8.3" edition = "2018" +rust-version = "1.38" license = "MIT OR Apache-2.0" repository = "https://github.com/crossbeam-rs/crossbeam" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque" -documentation = "https://docs.rs/crossbeam-deque" description = "Concurrent work-stealing deque" keywords = ["chase-lev", "lock-free", "scheduler", "scheduling"] categories = ["algorithms", "concurrency", "data-structures"] @@ -1,3 +1,7 @@ +# This project was upgraded with external_updater. +# Usage: tools/external_updater/updater.sh update rust/crates/crossbeam-deque +# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md + name: "crossbeam-deque" description: "Concurrent work-stealing deque" third_party { @@ -7,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/crossbeam-deque/crossbeam-deque-0.8.1.crate" + value: "https://static.crates.io/crates/crossbeam-deque/crossbeam-deque-0.8.3.crate" } - version: "0.8.1" + version: "0.8.3" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 8 - day: 9 + year: 2023 + month: 3 + day: 6 } } @@ -8,7 +8,7 @@ https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque#license) https://crates.io/crates/crossbeam-deque) [![Documentation](https://docs.rs/crossbeam-deque/badge.svg)]( https://docs.rs/crossbeam-deque) -[![Rust 1.36+](https://img.shields.io/badge/rust-1.36+-lightgray.svg)]( +[![Rust 1.38+](https://img.shields.io/badge/rust-1.38+-lightgray.svg)]( https://www.rust-lang.org) [![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ) @@ -28,7 +28,7 @@ crossbeam-deque = "0.8" Crossbeam Deque supports stable Rust releases going back at least six months, and every time the minimum supported Rust version is increased, a new minor -version is released. Currently, the minimum supported Rust version is 1.36. +version is released. Currently, the minimum supported Rust version is 1.38. ## License diff --git a/TEST_MAPPING b/TEST_MAPPING index 3601da1..4e15c8b 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -5,6 +5,9 @@ "path": "external/rust/crates/base64" }, { + "path": "external/rust/crates/hashbrown" + }, + { "path": "external/rust/crates/tinytemplate" }, { diff --git a/src/deque.rs b/src/deque.rs index 802a2fe..8afe15f 100644 --- a/src/deque.rs +++ b/src/deque.rs @@ -3,7 +3,7 @@ use std::cmp; use std::fmt; use std::iter::FromIterator; use std::marker::PhantomData; -use std::mem::{self, MaybeUninit}; +use std::mem::{self, ManuallyDrop, MaybeUninit}; use std::ptr; use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; use std::sync::Arc; @@ -38,9 +38,8 @@ impl<T> Buffer<T> { fn alloc(cap: usize) -> Buffer<T> { debug_assert_eq!(cap, cap.next_power_of_two()); - let mut v = Vec::with_capacity(cap); + let mut v = ManuallyDrop::new(Vec::with_capacity(cap)); let ptr = v.as_mut_ptr(); - mem::forget(v); Buffer { ptr, cap } } @@ -53,6 +52,8 @@ impl<T> Buffer<T> { /// Returns a pointer to the task at the specified `index`. unsafe fn at(&self, index: isize) -> *mut T { // `self.cap` is always a power of two. + // We do all the loads at `MaybeUninit` because we might realize, after loading, that we + // don't actually have the right to access this memory. self.ptr.offset(index & (self.cap - 1) as isize) } @@ -62,8 +63,8 @@ impl<T> Buffer<T> { /// technically speaking a data race and therefore UB. We should use an atomic store here, but /// that would be more expensive and difficult to implement generically for all types `T`. /// Hence, as a hack, we use a volatile write instead. - unsafe fn write(&self, index: isize, task: T) { - ptr::write_volatile(self.at(index), task) + unsafe fn write(&self, index: isize, task: MaybeUninit<T>) { + ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task) } /// Reads a task from the specified `index`. @@ -71,9 +72,9 @@ impl<T> Buffer<T> { /// This method might be concurrently called with another `write` at the same index, which is /// technically speaking a data race and therefore UB. We should use an atomic load here, but /// that would be more expensive and difficult to implement generically for all types `T`. - /// Hence, as a hack, we use a volatile write instead. - unsafe fn read(&self, index: isize) -> T { - ptr::read_volatile(self.at(index)) + /// Hence, as a hack, we use a volatile load instead. + unsafe fn read(&self, index: isize) -> MaybeUninit<T> { + ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>()) } } @@ -115,8 +116,8 @@ struct Inner<T> { impl<T> Drop for Inner<T> { fn drop(&mut self) { // Load the back index, front index, and buffer. - let b = self.back.load(Ordering::Relaxed); - let f = self.front.load(Ordering::Relaxed); + let b = *self.back.get_mut(); + let f = *self.front.get_mut(); unsafe { let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); @@ -406,7 +407,7 @@ impl<T> Worker<T> { // Write `task` into the slot. unsafe { - buffer.write(b, task); + buffer.write(b, MaybeUninit::new(task)); } atomic::fence(Ordering::Release); @@ -461,7 +462,7 @@ impl<T> Worker<T> { unsafe { // Read the popped task. let buffer = self.buffer.get(); - let task = buffer.read(f); + let task = buffer.read(f).assume_init(); // Shrink the buffer if `len - 1` is less than one fourth of the capacity. if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { @@ -509,8 +510,8 @@ impl<T> Worker<T> { ) .is_err() { - // Failed. We didn't pop anything. - mem::forget(task.take()); + // Failed. We didn't pop anything. Reset to `None`. + task.take(); } // Restore the back index to the original task. @@ -524,7 +525,7 @@ impl<T> Worker<T> { } } - task + task.map(|t| unsafe { t.assume_init() }) } } } @@ -661,12 +662,11 @@ impl<T> Stealer<T> { .is_err() { // We didn't steal this task, forget it. - mem::forget(task); return Steal::Retry; } // Return the stolen task. - Steal::Success(task) + Steal::Success(unsafe { task.assume_init() }) } /// Steals a batch of tasks and pushes them into another worker. @@ -693,6 +693,45 @@ impl<T> Stealer<T> { /// assert_eq!(w2.pop(), Some(2)); /// ``` pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { + self.steal_batch_with_limit(dest, MAX_BATCH) + } + + /// Steals no more than `limit` of tasks and pushes them into another worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than the given limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Worker; + /// + /// let w1 = Worker::new_fifo(); + /// w1.push(1); + /// w1.push(2); + /// w1.push(3); + /// w1.push(4); + /// w1.push(5); + /// w1.push(6); + /// + /// let s = w1.stealer(); + /// let w2 = Worker::new_fifo(); + /// + /// let _ = s.steal_batch_with_limit(&w2, 2); + /// assert_eq!(w2.pop(), Some(1)); + /// assert_eq!(w2.pop(), Some(2)); + /// assert_eq!(w2.pop(), None); + /// + /// w1.push(7); + /// w1.push(8); + /// // Setting a large limit does not guarantee that all elements will be popped. In this case, + /// // half of the elements are currently popped, but the number of popped elements is considered + /// // an implementation detail that may be changed in the future. + /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX); + /// assert_eq!(w2.len(), 3); + /// ``` + pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { + assert!(limit > 0); if Arc::ptr_eq(&self.inner, &dest.inner) { if dest.is_empty() { return Steal::Empty; @@ -725,7 +764,7 @@ impl<T> Stealer<T> { } // Reserve capacity for the stolen batch. - let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH); + let batch_size = cmp::min((len as usize + 1) / 2, limit); dest.reserve(batch_size); let mut batch_size = batch_size as isize; @@ -821,7 +860,6 @@ impl<T> Stealer<T> { .is_err() { // We didn't steal this task, forget it and break from the loop. - mem::forget(task); batch_size = i; break; } @@ -892,6 +930,47 @@ impl<T> Stealer<T> { /// assert_eq!(w2.pop(), Some(2)); /// ``` pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { + self.steal_batch_with_limit_and_pop(dest, MAX_BATCH) + } + + /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from + /// that worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than the given limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Steal, Worker}; + /// + /// let w1 = Worker::new_fifo(); + /// w1.push(1); + /// w1.push(2); + /// w1.push(3); + /// w1.push(4); + /// w1.push(5); + /// w1.push(6); + /// + /// let s = w1.stealer(); + /// let w2 = Worker::new_fifo(); + /// + /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1)); + /// assert_eq!(w2.pop(), Some(2)); + /// assert_eq!(w2.pop(), None); + /// + /// w1.push(7); + /// w1.push(8); + /// // Setting a large limit does not guarantee that all elements will be popped. In this case, + /// // half of the elements are currently popped, but the number of popped elements is considered + /// // an implementation detail that may be changed in the future. + /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3)); + /// assert_eq!(w2.pop(), Some(4)); + /// assert_eq!(w2.pop(), Some(5)); + /// assert_eq!(w2.pop(), None); + /// ``` + pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { + assert!(limit > 0); if Arc::ptr_eq(&self.inner, &dest.inner) { match dest.pop() { None => return Steal::Empty, @@ -923,7 +1002,7 @@ impl<T> Stealer<T> { } // Reserve capacity for the stolen batch. - let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1); + let batch_size = cmp::min((len as usize - 1) / 2, limit - 1); dest.reserve(batch_size); let mut batch_size = batch_size as isize; @@ -975,7 +1054,6 @@ impl<T> Stealer<T> { .is_err() { // We didn't steal this task, forget it. - mem::forget(task); return Steal::Retry; } @@ -992,7 +1070,6 @@ impl<T> Stealer<T> { .is_err() { // We didn't steal this task, forget it. - mem::forget(task); return Steal::Retry; } @@ -1037,7 +1114,6 @@ impl<T> Stealer<T> { .is_err() { // We didn't steal this task, forget it and break from the loop. - mem::forget(tmp); batch_size = i; break; } @@ -1077,7 +1153,7 @@ impl<T> Stealer<T> { dest.inner.back.store(dest_b, Ordering::Release); // Return with success. - Steal::Success(task) + Steal::Success(unsafe { task.assume_init() }) } } @@ -1123,6 +1199,11 @@ struct Slot<T> { } impl<T> Slot<T> { + const UNINIT: Self = Self { + task: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), + }; + /// Waits until a task is written into the slot. fn wait_write(&self) { let backoff = Backoff::new(); @@ -1146,13 +1227,10 @@ struct Block<T> { impl<T> Block<T> { /// Creates an empty block that starts at `start_index`. fn new() -> Block<T> { - // SAFETY: This is safe because: - // [1] `Block::next` (AtomicPtr) may be safely zero initialized. - // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. - // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it - // holds a MaybeUninit. - // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. - unsafe { MaybeUninit::zeroed().assume_init() } + Self { + next: AtomicPtr::new(ptr::null_mut()), + slots: [Slot::UNINIT; BLOCK_CAP], + } } /// Waits until the next pointer is set. @@ -1446,6 +1524,43 @@ impl<T> Injector<T> { /// assert_eq!(w.pop(), Some(2)); /// ``` pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { + self.steal_batch_with_limit(dest, MAX_BATCH) + } + + /// Steals no more than of tasks and pushes them into a worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than some constant limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Injector, Worker}; + /// + /// let q = Injector::new(); + /// q.push(1); + /// q.push(2); + /// q.push(3); + /// q.push(4); + /// q.push(5); + /// q.push(6); + /// + /// let w = Worker::new_fifo(); + /// let _ = q.steal_batch_with_limit(&w, 2); + /// assert_eq!(w.pop(), Some(1)); + /// assert_eq!(w.pop(), Some(2)); + /// assert_eq!(w.pop(), None); + /// + /// q.push(7); + /// q.push(8); + /// // Setting a large limit does not guarantee that all elements will be popped. In this case, + /// // half of the elements are currently popped, but the number of popped elements is considered + /// // an implementation detail that may be changed in the future. + /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX); + /// assert_eq!(w.len(), 3); + /// ``` + pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { + assert!(limit > 0); let mut head; let mut block; let mut offset; @@ -1483,15 +1598,15 @@ impl<T> Injector<T> { if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { new_head |= HAS_NEXT; // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(MAX_BATCH); + advance = (BLOCK_CAP - offset).min(limit); } else { let len = (tail - head) >> SHIFT; // Steal half of the available tasks. - advance = ((len + 1) / 2).min(MAX_BATCH); + advance = ((len + 1) / 2).min(limit); } } else { // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(MAX_BATCH); + advance = (BLOCK_CAP - offset).min(limit); } new_head += advance << SHIFT; @@ -1535,7 +1650,7 @@ impl<T> Injector<T> { // Read the task. let slot = (*block).slots.get_unchecked(offset + i); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add(i as isize), task); @@ -1547,7 +1662,7 @@ impl<T> Injector<T> { // Read the task. let slot = (*block).slots.get_unchecked(offset + i); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); @@ -1605,6 +1720,45 @@ impl<T> Injector<T> { /// assert_eq!(w.pop(), Some(2)); /// ``` pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { + // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly + // better, but we may change it in the future to be compatible with the same method in Stealer. + self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1) + } + + /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than the given limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Injector, Steal, Worker}; + /// + /// let q = Injector::new(); + /// q.push(1); + /// q.push(2); + /// q.push(3); + /// q.push(4); + /// q.push(5); + /// q.push(6); + /// + /// let w = Worker::new_fifo(); + /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1)); + /// assert_eq!(w.pop(), Some(2)); + /// assert_eq!(w.pop(), None); + /// + /// q.push(7); + /// // Setting a large limit does not guarantee that all elements will be popped. In this case, + /// // half of the elements are currently popped, but the number of popped elements is considered + /// // an implementation detail that may be changed in the future. + /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3)); + /// assert_eq!(w.pop(), Some(4)); + /// assert_eq!(w.pop(), Some(5)); + /// assert_eq!(w.pop(), None); + /// ``` + pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { + assert!(limit > 0); let mut head; let mut block; let mut offset; @@ -1641,15 +1795,15 @@ impl<T> Injector<T> { if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { new_head |= HAS_NEXT; // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); + advance = (BLOCK_CAP - offset).min(limit); } else { let len = (tail - head) >> SHIFT; // Steal half of the available tasks. - advance = ((len + 1) / 2).min(MAX_BATCH + 1); + advance = ((len + 1) / 2).min(limit); } } else { // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); + advance = (BLOCK_CAP - offset).min(limit); } new_head += advance << SHIFT; @@ -1689,7 +1843,7 @@ impl<T> Injector<T> { // Read the task. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); match dest.flavor { Flavor::Fifo => { @@ -1698,7 +1852,7 @@ impl<T> Injector<T> { // Read the task. let slot = (*block).slots.get_unchecked(offset + i + 1); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add(i as isize), task); @@ -1711,7 +1865,7 @@ impl<T> Injector<T> { // Read the task. let slot = (*block).slots.get_unchecked(offset + i + 1); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); @@ -1744,7 +1898,7 @@ impl<T> Injector<T> { } } - Steal::Success(task) + Steal::Success(task.assume_init()) } } @@ -1820,9 +1974,9 @@ impl<T> Injector<T> { impl<T> Drop for Injector<T> { fn drop(&mut self) { - let mut head = self.head.index.load(Ordering::Relaxed); - let mut tail = self.tail.index.load(Ordering::Relaxed); - let mut block = self.head.block.load(Ordering::Relaxed); + let mut head = *self.head.index.get_mut(); + let mut tail = *self.tail.index.get_mut(); + let mut block = *self.head.block.get_mut(); // Erase the lower bits. head &= !((1 << SHIFT) - 1); @@ -1840,7 +1994,7 @@ impl<T> Drop for Injector<T> { p.as_mut_ptr().drop_in_place(); } else { // Deallocate the block and move to the next one. - let next = (*block).next.load(Ordering::Relaxed); + let next = *(*block).next.get_mut(); drop(Box::from_raw(block)); block = next; } diff --git a/tests/fifo.rs b/tests/fifo.rs index e2365fb..f98737b 100644 --- a/tests/fifo.rs +++ b/tests/fifo.rs @@ -71,6 +71,9 @@ fn is_empty() { #[test] fn spsc() { + #[cfg(miri)] + const STEPS: usize = 500; + #[cfg(not(miri))] const STEPS: usize = 50_000; let w = Worker::new_fifo(); @@ -100,6 +103,9 @@ fn spsc() { #[test] fn stampede() { const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; let w = Worker::new_fifo(); @@ -141,6 +147,9 @@ fn stampede() { #[test] fn stress() { const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; let w = Worker::new_fifo(); @@ -197,6 +206,7 @@ fn stress() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn no_starvation() { const THREADS: usize = 8; @@ -258,8 +268,17 @@ fn no_starvation() { #[test] fn destructors() { + #[cfg(miri)] + const THREADS: usize = 2; + #[cfg(not(miri))] const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] const STEPS: usize = 1000; struct Elem(usize, Arc<Mutex<Vec<usize>>>); @@ -330,7 +349,7 @@ fn destructors() { { let mut v = dropped.lock().unwrap(); assert_eq!(v.len(), rem); - v.sort(); + v.sort_unstable(); for pair in v.windows(2) { assert_eq!(pair[0] + 1, pair[1]); } diff --git a/tests/injector.rs b/tests/injector.rs index 3f74d1b..f706a8d 100644 --- a/tests/injector.rs +++ b/tests/injector.rs @@ -46,6 +46,9 @@ fn is_empty() { #[test] fn spsc() { + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 100_000; let q = Injector::new(); @@ -58,6 +61,8 @@ fn spsc() { assert_eq!(i, v); break; } + #[cfg(miri)] + std::hint::spin_loop(); } } @@ -73,6 +78,9 @@ fn spsc() { #[test] fn mpmc() { + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; @@ -96,6 +104,8 @@ fn mpmc() { v[n].fetch_add(1, SeqCst); break; } + #[cfg(miri)] + std::hint::spin_loop(); } } }); @@ -111,6 +121,9 @@ fn mpmc() { #[test] fn stampede() { const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; let q = Injector::new(); @@ -152,6 +165,9 @@ fn stampede() { #[test] fn stress() { const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; let q = Injector::new(); @@ -208,6 +224,7 @@ fn stress() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn no_starvation() { const THREADS: usize = 8; @@ -269,8 +286,17 @@ fn no_starvation() { #[test] fn destructors() { + #[cfg(miri)] + const THREADS: usize = 2; + #[cfg(not(miri))] const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] const STEPS: usize = 1000; struct Elem(usize, Arc<Mutex<Vec<usize>>>); @@ -341,7 +367,7 @@ fn destructors() { { let mut v = dropped.lock().unwrap(); assert_eq!(v.len(), rem); - v.sort(); + v.sort_unstable(); for pair in v.windows(2) { assert_eq!(pair[0] + 1, pair[1]); } diff --git a/tests/lifo.rs b/tests/lifo.rs index 3e99e95..c1a65cd 100644 --- a/tests/lifo.rs +++ b/tests/lifo.rs @@ -71,6 +71,9 @@ fn is_empty() { #[test] fn spsc() { + #[cfg(miri)] + const STEPS: usize = 500; + #[cfg(not(miri))] const STEPS: usize = 50_000; let w = Worker::new_lifo(); @@ -84,6 +87,8 @@ fn spsc() { assert_eq!(i, v); break; } + #[cfg(miri)] + std::hint::spin_loop(); } } @@ -100,6 +105,9 @@ fn spsc() { #[test] fn stampede() { const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; let w = Worker::new_lifo(); @@ -141,6 +149,9 @@ fn stampede() { #[test] fn stress() { const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; let w = Worker::new_lifo(); @@ -197,6 +208,7 @@ fn stress() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn no_starvation() { const THREADS: usize = 8; @@ -258,8 +270,17 @@ fn no_starvation() { #[test] fn destructors() { + #[cfg(miri)] + const THREADS: usize = 2; + #[cfg(not(miri))] const THREADS: usize = 8; + #[cfg(miri)] + const COUNT: usize = 500; + #[cfg(not(miri))] const COUNT: usize = 50_000; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] const STEPS: usize = 1000; struct Elem(usize, Arc<Mutex<Vec<usize>>>); @@ -330,7 +351,7 @@ fn destructors() { { let mut v = dropped.lock().unwrap(); assert_eq!(v.len(), rem); - v.sort(); + v.sort_unstable(); for pair in v.windows(2) { assert_eq!(pair[0] + 1, pair[1]); } |