diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:02:58 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:02:58 +0000 |
commit | 4b052a2173c6a5ce9eb9ed48e8f9b75719b65c81 (patch) | |
tree | c438d5e508594ff96dcc6763baf825c2d1c8ed86 | |
parent | 8a698f122a7264b15fc46930b2653709d48c6e53 (diff) | |
parent | 465e78621904030014cc9c69e3195946a8523038 (diff) | |
download | crossbeam-deque-android13-mainline-tethering-release.tar.gz |
Snap for 8564071 from 465e78621904030014cc9c69e3195946a8523038 to mainline-tethering-releaseaml_tet_331910040aml_tet_331820050aml_tet_331711040aml_tet_331511160aml_tet_331511000aml_tet_331412030aml_tet_331312080aml_tet_331117000aml_tet_331012080aml_tet_330911010aml_tet_330812150android13-mainline-tethering-release
Change-Id: Ife9028e7b1a6009eacfac869ec2d2ed20c10bb98
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 76 | ||||
-rw-r--r-- | CHANGELOG.md | 5 | ||||
-rw-r--r-- | Cargo.toml | 14 | ||||
-rw-r--r-- | Cargo.toml.orig | 7 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | README.md | 6 | ||||
-rw-r--r-- | TEST_MAPPING | 45 | ||||
-rw-r--r-- | cargo2android.json | 5 | ||||
-rw-r--r-- | src/deque.rs | 135 | ||||
-rw-r--r-- | src/lib.rs | 9 | ||||
-rw-r--r-- | tests/fifo.rs | 20 | ||||
-rw-r--r-- | tests/injector.rs | 12 | ||||
-rw-r--r-- | tests/lifo.rs | 20 |
14 files changed, 264 insertions, 102 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 08632f0..9716123 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "d9dfc9e1ffabcb3c01addad14878f16c2795c371" + "sha1": "0e2a930eac3586ab52498413310c45af6c67d830" } } @@ -1,4 +1,4 @@ -// This file is generated by cargo2android.py --run --device --dependencies. +// This file is generated by cargo2android.py --config cargo2android.json. // Do not modify this file as changes will be overridden on upgrade. package { @@ -39,10 +39,75 @@ license { ], } +rust_defaults { + name: "crossbeam-deque_test_defaults", + crate_name: "crossbeam_deque", + cargo_env_compat: true, + cargo_pkg_version: "0.8.1", + test_suites: ["general-tests"], + auto_gen_config: true, + edition: "2018", + features: [ + "crossbeam-epoch", + "crossbeam-utils", + "default", + "std", + ], + rustlibs: [ + "libcfg_if", + "libcrossbeam_deque", + "libcrossbeam_epoch", + "libcrossbeam_utils", + "librand", + ], +} + +rust_test { + name: "crossbeam-deque_test_tests_fifo", + defaults: ["crossbeam-deque_test_defaults"], + host_supported: true, + srcs: ["tests/fifo.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "crossbeam-deque_test_tests_injector", + defaults: ["crossbeam-deque_test_defaults"], + host_supported: true, + srcs: ["tests/injector.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "crossbeam-deque_test_tests_lifo", + defaults: ["crossbeam-deque_test_defaults"], + host_supported: true, + srcs: ["tests/lifo.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "crossbeam-deque_test_tests_steal", + defaults: ["crossbeam-deque_test_defaults"], + host_supported: true, + srcs: ["tests/steal.rs"], + test_options: { + unit_test: true, + }, +} + rust_library { name: "libcrossbeam_deque", host_supported: true, crate_name: "crossbeam_deque", + cargo_env_compat: true, + cargo_pkg_version: "0.8.1", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -57,12 +122,3 @@ rust_library { "libcrossbeam_utils", ], } - -// dependent_library ["feature_list"] -// autocfg-1.0.1 -// cfg-if-1.0.0 -// crossbeam-epoch-0.9.3 "alloc,lazy_static,std" -// crossbeam-utils-0.8.3 "lazy_static,std" -// lazy_static-1.4.0 -// memoffset-0.6.1 "default" -// scopeguard-1.1.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index da37edc..14dcc20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# Version 0.8.1 + +- Fix deque steal race condition. (#726) +- Add `Stealer::len` method. (#708) + # Version 0.8.0 - Bump the minimum supported Rust version to 1.36. @@ -3,22 +3,20 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" name = "crossbeam-deque" -version = "0.8.0" +version = "0.8.1" authors = ["The Crossbeam Project Developers"] description = "Concurrent work-stealing deque" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque" documentation = "https://docs.rs/crossbeam-deque" -readme = "README.md" keywords = ["chase-lev", "lock-free", "scheduler", "scheduling"] categories = ["algorithms", "concurrency", "data-structures"] license = "MIT OR Apache-2.0" @@ -36,7 +34,7 @@ version = "0.8" optional = true default-features = false [dev-dependencies.rand] -version = "0.7.3" +version = "0.8" [features] default = ["std"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 8d38e22..572ddfd 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,11 +4,10 @@ name = "crossbeam-deque" # - Update CHANGELOG.md # - Update README.md # - Create "crossbeam-deque-X.Y.Z" git tag -version = "0.8.0" +version = "0.8.1" authors = ["The Crossbeam Project Developers"] edition = "2018" license = "MIT OR Apache-2.0" -readme = "README.md" repository = "https://github.com/crossbeam-rs/crossbeam" homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque" documentation = "https://docs.rs/crossbeam-deque" @@ -21,6 +20,8 @@ default = ["std"] # Enable to use APIs that require `std`. # This is enabled by default. +# +# NOTE: Disabling `std` feature is not supported yet. std = ["crossbeam-epoch/std", "crossbeam-utils/std"] [dependencies] @@ -39,4 +40,4 @@ default-features = false optional = true [dev-dependencies] -rand = "0.7.3" +rand = "0.8" @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/crossbeam-deque/crossbeam-deque-0.8.0.crate" + value: "https://static.crates.io/crates/crossbeam-deque/crossbeam-deque-0.8.1.crate" } - version: "0.8.0" + version: "0.8.1" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 12 - day: 21 + year: 2021 + month: 8 + day: 9 } } @@ -2,7 +2,7 @@ [![Build Status](https://github.com/crossbeam-rs/crossbeam/workflows/CI/badge.svg)]( https://github.com/crossbeam-rs/crossbeam/actions) -[![License](https://img.shields.io/badge/license-MIT%20OR%20Apache--2.0-blue.svg)]( +[![License](https://img.shields.io/badge/license-MIT_OR_Apache--2.0-blue.svg)]( https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque#license) [![Cargo](https://img.shields.io/crates/v/crossbeam-deque.svg)]( https://crates.io/crates/crossbeam-deque) @@ -10,7 +10,7 @@ https://crates.io/crates/crossbeam-deque) https://docs.rs/crossbeam-deque) [![Rust 1.36+](https://img.shields.io/badge/rust-1.36+-lightgray.svg)]( https://www.rust-lang.org) -[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.gg/BBYwKq) +[![chat](https://img.shields.io/discord/569610676205781012.svg?logo=discord)](https://discord.com/invite/JXYwgWZ) This crate provides work-stealing deques, which are primarily intended for building task schedulers. @@ -21,7 +21,7 @@ Add this to your `Cargo.toml`: ```toml [dependencies] -crossbeam-deque = "0.7" +crossbeam-deque = "0.8" ``` ## Compatibility diff --git a/TEST_MAPPING b/TEST_MAPPING new file mode 100644 index 0000000..3601da1 --- /dev/null +++ b/TEST_MAPPING @@ -0,0 +1,45 @@ +// Generated by update_crate_tests.py for tests that depend on this crate. +{ + "imports": [ + { + "path": "external/rust/crates/base64" + }, + { + "path": "external/rust/crates/tinytemplate" + }, + { + "path": "external/rust/crates/tinyvec" + }, + { + "path": "external/rust/crates/unicode-xid" + } + ], + "presubmit": [ + { + "name": "crossbeam-deque_test_tests_fifo" + }, + { + "name": "crossbeam-deque_test_tests_injector" + }, + { + "name": "crossbeam-deque_test_tests_lifo" + }, + { + "name": "crossbeam-deque_test_tests_steal" + } + ], + "presubmit-rust": [ + { + "name": "crossbeam-deque_test_tests_fifo" + }, + { + "name": "crossbeam-deque_test_tests_injector" + }, + { + "name": "crossbeam-deque_test_tests_lifo" + }, + { + "name": "crossbeam-deque_test_tests_steal" + } + ] +} diff --git a/cargo2android.json b/cargo2android.json new file mode 100644 index 0000000..d36fb44 --- /dev/null +++ b/cargo2android.json @@ -0,0 +1,5 @@ +{ + "device": true, + "run": true, + "tests": true +}
\ No newline at end of file diff --git a/src/deque.rs b/src/deque.rs index fcd6f9f..802a2fe 100644 --- a/src/deque.rs +++ b/src/deque.rs @@ -1,7 +1,3 @@ -// TODO(@jeehoonkang): we mutates `batch_size` inside `for i in 0..batch_size {}`. It is difficult -// to read because we're mutating the range bound. -#![allow(clippy::mut_range_bound)] - use std::cell::{Cell, UnsafeCell}; use std::cmp; use std::fmt; @@ -592,6 +588,27 @@ impl<T> Stealer<T> { b.wrapping_sub(f) <= 0 } + /// Returns the number of tasks in the deque. + /// + /// ``` + /// use crossbeam_deque::Worker; + /// + /// let w = Worker::new_lifo(); + /// let s = w.stealer(); + /// + /// assert_eq!(s.len(), 0); + /// w.push(1); + /// assert_eq!(s.len(), 1); + /// w.push(2); + /// assert_eq!(s.len(), 2); + /// ``` + pub fn len(&self) -> usize { + let f = self.inner.front.load(Ordering::Acquire); + atomic::fence(Ordering::SeqCst); + let b = self.inner.back.load(Ordering::Acquire); + b.wrapping_sub(f).max(0) as usize + } + /// Steals a task from the queue. /// /// # Examples @@ -635,11 +652,13 @@ impl<T> Stealer<T> { let task = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) + .is_err() { // We didn't steal this task, forget it. mem::forget(task); @@ -741,16 +760,18 @@ impl<T> Stealer<T> { } // Try incrementing the front index to steal the batch. - if self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(batch_size), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(batch_size), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { return Steal::Retry; } @@ -760,7 +781,12 @@ impl<T> Stealer<T> { // Steal a batch of tasks from the front one by one. Flavor::Lifo => { - for i in 0..batch_size { + // This loop may modify the batch_size, which triggers a clippy lint warning. + // Use a new variable to avoid the warning, and to make it clear we aren't + // modifying the loop exit condition during iteration. + let original_batch_size = batch_size; + + for i in 0..original_batch_size { // If this is not the first steal, check whether the queue is empty. if i > 0 { // We've already got the current front index. Now execute the fence to @@ -781,11 +807,18 @@ impl<T> Stealer<T> { let task = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it and break from the loop. mem::forget(task); @@ -927,17 +960,19 @@ impl<T> Stealer<T> { } } - // Try incrementing the front index to steal the batch. - if self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(batch_size + 1), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() + // Try incrementing the front index to steal the task. + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(batch_size + 1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it. mem::forget(task); @@ -965,7 +1000,12 @@ impl<T> Stealer<T> { f = f.wrapping_add(1); // Repeat the same procedure for the batch steals. - for i in 0..batch_size { + // + // This loop may modify the batch_size, which triggers a clippy lint warning. + // Use a new variable to avoid the warning, and to make it clear we aren't + // modifying the loop exit condition during iteration. + let original_batch_size = batch_size; + for i in 0..original_batch_size { // We've already got the current front index. Now execute the fence to // synchronize with other threads. atomic::fence(Ordering::SeqCst); @@ -983,11 +1023,18 @@ impl<T> Stealer<T> { let tmp = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it and break from the loop. mem::forget(tmp); @@ -1367,7 +1414,9 @@ impl<T> Injector<T> { // Destroy the block if we've reached the end, or if another thread wanted to destroy // but couldn't because we were busy reading from the slot. - if (offset + 1 == BLOCK_CAP) || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) { + if (offset + 1 == BLOCK_CAP) + || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) + { Block::destroy(block, offset); } @@ -89,10 +89,13 @@ allow(dead_code, unused_assignments, unused_variables) ) ))] -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![warn( + missing_docs, + missing_debug_implementations, + rust_2018_idioms, + unreachable_pub +)] #![cfg_attr(not(feature = "std"), no_std)] -// matches! requires Rust 1.42 -#![allow(clippy::match_like_matches_macro)] use cfg_if::cfg_if; diff --git a/tests/fifo.rs b/tests/fifo.rs index 19a1f58..e2365fb 100644 --- a/tests/fifo.rs +++ b/tests/fifo.rs @@ -167,7 +167,7 @@ fn stress() { hits.fetch_add(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { hits.fetch_add(1, SeqCst); } } @@ -177,8 +177,8 @@ fn stress() { let mut rng = rand::thread_rng(); let mut expected = 0; while expected < COUNT { - if rng.gen_range(0, 3) == 0 { - while let Some(_) = w.pop() { + if rng.gen_range(0..3) == 0 { + while w.pop().is_some() { hits.fetch_add(1, SeqCst); } } else { @@ -188,7 +188,7 @@ fn stress() { } while hits.load(SeqCst) < COUNT { - while let Some(_) = w.pop() { + while w.pop().is_some() { hits.fetch_add(1, SeqCst); } } @@ -227,7 +227,7 @@ fn no_starvation() { hits.fetch_add(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { hits.fetch_add(1, SeqCst); } } @@ -237,9 +237,9 @@ fn no_starvation() { let mut rng = rand::thread_rng(); let mut my_hits = 0; loop { - for i in 0..rng.gen_range(0, COUNT) { - if rng.gen_range(0, 3) == 0 && my_hits == 0 { - while let Some(_) = w.pop() { + for i in 0..rng.gen_range(0..COUNT) { + if rng.gen_range(0..3) == 0 && my_hits == 0 { + while w.pop().is_some() { my_hits += 1; } } else { @@ -300,7 +300,7 @@ fn destructors() { remaining.fetch_sub(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { cnt += 1; remaining.fetch_sub(1, SeqCst); } @@ -309,7 +309,7 @@ fn destructors() { } for _ in 0..STEPS { - if let Some(_) = w.pop() { + if w.pop().is_some() { remaining.fetch_sub(1, SeqCst); } } diff --git a/tests/injector.rs b/tests/injector.rs index 0165e1a..3f74d1b 100644 --- a/tests/injector.rs +++ b/tests/injector.rs @@ -178,7 +178,7 @@ fn stress() { hits.fetch_add(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { hits.fetch_add(1, SeqCst); } } @@ -188,7 +188,7 @@ fn stress() { let mut rng = rand::thread_rng(); let mut expected = 0; while expected < COUNT { - if rng.gen_range(0, 3) == 0 { + if rng.gen_range(0..3) == 0 { while let Success(_) = q.steal() { hits.fetch_add(1, SeqCst); } @@ -238,7 +238,7 @@ fn no_starvation() { hits.fetch_add(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { hits.fetch_add(1, SeqCst); } } @@ -248,8 +248,8 @@ fn no_starvation() { let mut rng = rand::thread_rng(); let mut my_hits = 0; loop { - for i in 0..rng.gen_range(0, COUNT) { - if rng.gen_range(0, 3) == 0 && my_hits == 0 { + for i in 0..rng.gen_range(0..COUNT) { + if rng.gen_range(0..3) == 0 && my_hits == 0 { while let Success(_) = q.steal() { my_hits += 1; } @@ -311,7 +311,7 @@ fn destructors() { remaining.fetch_sub(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { cnt += 1; remaining.fetch_sub(1, SeqCst); } diff --git a/tests/lifo.rs b/tests/lifo.rs index d7e498a..3e99e95 100644 --- a/tests/lifo.rs +++ b/tests/lifo.rs @@ -167,7 +167,7 @@ fn stress() { hits.fetch_add(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { hits.fetch_add(1, SeqCst); } } @@ -177,8 +177,8 @@ fn stress() { let mut rng = rand::thread_rng(); let mut expected = 0; while expected < COUNT { - if rng.gen_range(0, 3) == 0 { - while let Some(_) = w.pop() { + if rng.gen_range(0..3) == 0 { + while w.pop().is_some() { hits.fetch_add(1, SeqCst); } } else { @@ -188,7 +188,7 @@ fn stress() { } while hits.load(SeqCst) < COUNT { - while let Some(_) = w.pop() { + while w.pop().is_some() { hits.fetch_add(1, SeqCst); } } @@ -227,7 +227,7 @@ fn no_starvation() { hits.fetch_add(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { hits.fetch_add(1, SeqCst); } } @@ -237,9 +237,9 @@ fn no_starvation() { let mut rng = rand::thread_rng(); let mut my_hits = 0; loop { - for i in 0..rng.gen_range(0, COUNT) { - if rng.gen_range(0, 3) == 0 && my_hits == 0 { - while let Some(_) = w.pop() { + for i in 0..rng.gen_range(0..COUNT) { + if rng.gen_range(0..3) == 0 && my_hits == 0 { + while w.pop().is_some() { my_hits += 1; } } else { @@ -300,7 +300,7 @@ fn destructors() { remaining.fetch_sub(1, SeqCst); } - while let Some(_) = w2.pop() { + while w2.pop().is_some() { cnt += 1; remaining.fetch_sub(1, SeqCst); } @@ -309,7 +309,7 @@ fn destructors() { } for _ in 0..STEPS { - if let Some(_) = w.pop() { + if w.pop().is_some() { remaining.fetch_sub(1, SeqCst); } } |