aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2023-02-17 09:14:08 +0100
committerJeff Vander Stoep <jeffv@google.com>2023-02-17 09:14:08 +0100
commite27ffc4ea6455b714614a8bdab40aaafb02253ac (patch)
tree726fe77ff8ed6a97c897ec126a4a93427f002ff6
parentecec3053e344f9a90a9b055bf28dad7c56719407 (diff)
downloadrayon-core-e27ffc4ea6455b714614a8bdab40aaafb02253ac.tar.gz
Upgrade rayon-core to 1.10.2
This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update rust/crates/rayon-core For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md Test: TreeHugger Change-Id: I81858fa6a9a8c9ff1fa5b373374dd8138be77542
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp2
-rw-r--r--Cargo.toml3
-rw-r--r--Cargo.toml.orig3
-rw-r--r--METADATA10
-rw-r--r--src/broadcast/mod.rs5
-rw-r--r--[-rwxr-xr-x]src/broadcast/test.rs0
-rw-r--r--src/job.rs2
-rw-r--r--src/latch.rs90
-rw-r--r--src/registry.rs10
-rw-r--r--src/scope/mod.rs70
-rw-r--r--tests/stack_overflow_crash.rs93
12 files changed, 172 insertions, 118 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 975ebdb..213b63f 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "8e48eae53e969f328ef41f8d6977bb8cb7084bec"
+ "sha1": "b6cdc9da7adc7fe42b28758b2033f0bf8f8dc4b8"
},
"path_in_vcs": "rayon-core"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 54c4506..681659a 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@ rust_library {
host_supported: true,
crate_name: "rayon_core",
cargo_env_compat: true,
- cargo_pkg_version: "1.10.1",
+ cargo_pkg_version: "1.10.2",
srcs: ["src/lib.rs"],
edition: "2021",
rustlibs: [
diff --git a/Cargo.toml b/Cargo.toml
index da520f7..d7e36a4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
edition = "2021"
rust-version = "1.56"
name = "rayon-core"
-version = "1.10.1"
+version = "1.10.2"
authors = [
"Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>",
@@ -37,7 +37,6 @@ repository = "https://github.com/rayon-rs/rayon"
[[test]]
name = "stack_overflow_crash"
path = "tests/stack_overflow_crash.rs"
-harness = false
[[test]]
name = "double_init_fail"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 9e45992..878dac7 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,6 +1,6 @@
[package]
name = "rayon-core"
-version = "1.10.1"
+version = "1.10.2"
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon"
@@ -33,7 +33,6 @@ libc = "0.2"
[[test]]
name = "stack_overflow_crash"
path = "tests/stack_overflow_crash.rs"
-harness = false
# NB: having one [[test]] manually defined means we need to declare them all
diff --git a/METADATA b/METADATA
index 58639ba..ae2b730 100644
--- a/METADATA
+++ b/METADATA
@@ -11,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/rayon-core/rayon-core-1.10.1.crate"
+ value: "https://static.crates.io/crates/rayon-core/rayon-core-1.10.2.crate"
}
- version: "1.10.1"
+ version: "1.10.2"
license_type: NOTICE
last_upgrade_date {
- year: 2022
- month: 12
- day: 13
+ year: 2023
+ month: 2
+ day: 17
}
}
diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs
index 452aa71..d991c54 100644
--- a/src/broadcast/mod.rs
+++ b/src/broadcast/mod.rs
@@ -1,4 +1,5 @@
use crate::job::{ArcJob, StackJob};
+use crate::latch::LatchRef;
use crate::registry::{Registry, WorkerThread};
use crate::scope::ScopeLatch;
use std::fmt;
@@ -107,7 +108,9 @@ where
let n_threads = registry.num_threads();
let current_thread = WorkerThread::current().as_ref();
let latch = ScopeLatch::with_count(n_threads, current_thread);
- let jobs: Vec<_> = (0..n_threads).map(|_| StackJob::new(&f, &latch)).collect();
+ let jobs: Vec<_> = (0..n_threads)
+ .map(|_| StackJob::new(&f, LatchRef::new(&latch)))
+ .collect();
let job_refs = jobs.iter().map(|job| job.as_job_ref());
registry.inject_broadcast(job_refs);
diff --git a/src/broadcast/test.rs b/src/broadcast/test.rs
index a765cb0..a765cb0 100755..100644
--- a/src/broadcast/test.rs
+++ b/src/broadcast/test.rs
diff --git a/src/job.rs b/src/job.rs
index b7a3dae..deccebc 100644
--- a/src/job.rs
+++ b/src/job.rs
@@ -112,7 +112,7 @@ where
let abort = unwind::AbortIfPanic;
let func = (*this.func.get()).take().unwrap();
(*this.result.get()) = JobResult::call(func);
- this.latch.set();
+ Latch::set(&this.latch);
mem::forget(abort);
}
}
diff --git a/src/latch.rs b/src/latch.rs
index 0909293..de43272 100644
--- a/src/latch.rs
+++ b/src/latch.rs
@@ -1,3 +1,5 @@
+use std::marker::PhantomData;
+use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::usize;
@@ -37,10 +39,15 @@ pub(super) trait Latch {
///
/// Setting a latch triggers other threads to wake up and (in some
/// cases) complete. This may, in turn, cause memory to be
- /// allocated and so forth. One must be very careful about this,
+ /// deallocated and so forth. One must be very careful about this,
/// and it's typically better to read all the fields you will need
/// to access *before* a latch is set!
- fn set(&self);
+ ///
+ /// This function operates on `*const Self` instead of `&self` to allow it
+ /// to become dangling during this call. The caller must ensure that the
+ /// pointer is valid upon entry, and not invalidated during the call by any
+ /// actions other than `set` itself.
+ unsafe fn set(this: *const Self);
}
pub(super) trait AsCoreLatch {
@@ -123,8 +130,8 @@ impl CoreLatch {
/// doing some wakeups; those are encapsulated in the surrounding
/// latch code.
#[inline]
- fn set(&self) -> bool {
- let old_state = self.state.swap(SET, Ordering::AcqRel);
+ unsafe fn set(this: *const Self) -> bool {
+ let old_state = (*this).state.swap(SET, Ordering::AcqRel);
old_state == SLEEPING
}
@@ -186,16 +193,16 @@ impl<'r> AsCoreLatch for SpinLatch<'r> {
impl<'r> Latch for SpinLatch<'r> {
#[inline]
- fn set(&self) {
+ unsafe fn set(this: *const Self) {
let cross_registry;
- let registry: &Registry = if self.cross {
+ let registry: &Registry = if (*this).cross {
// Ensure the registry stays alive while we notify it.
// Otherwise, it would be possible that we set the spin
// latch and the other thread sees it and exits, causing
// the registry to be deallocated, all before we get a
// chance to invoke `registry.notify_worker_latch_is_set`.
- cross_registry = Arc::clone(self.registry);
+ cross_registry = Arc::clone((*this).registry);
&cross_registry
} else {
// If this is not a "cross-registry" spin-latch, then the
@@ -203,12 +210,12 @@ impl<'r> Latch for SpinLatch<'r> {
// that the registry stays alive. However, that doesn't
// include this *particular* `Arc` handle if the waiting
// thread then exits, so we must completely dereference it.
- self.registry
+ (*this).registry
};
- let target_worker_index = self.target_worker_index;
+ let target_worker_index = (*this).target_worker_index;
- // NOTE: Once we `set`, the target may proceed and invalidate `&self`!
- if self.core_latch.set() {
+ // NOTE: Once we `set`, the target may proceed and invalidate `this`!
+ if CoreLatch::set(&(*this).core_latch) {
// Subtle: at this point, we can no longer read from
// `self`, because the thread owning this spin latch may
// have awoken and deallocated the latch. Therefore, we
@@ -255,10 +262,10 @@ impl LockLatch {
impl Latch for LockLatch {
#[inline]
- fn set(&self) {
- let mut guard = self.m.lock().unwrap();
+ unsafe fn set(this: *const Self) {
+ let mut guard = (*this).m.lock().unwrap();
*guard = true;
- self.v.notify_all();
+ (*this).v.notify_all();
}
}
@@ -307,9 +314,9 @@ impl CountLatch {
/// count, then the latch is **set**, and calls to `probe()` will
/// return true. Returns whether the latch was set.
#[inline]
- pub(super) fn set(&self) -> bool {
- if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- self.core_latch.set();
+ pub(super) unsafe fn set(this: *const Self) -> bool {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ CoreLatch::set(&(*this).core_latch);
true
} else {
false
@@ -320,8 +327,12 @@ impl CountLatch {
/// the latch is set, then the specific worker thread is tickled,
/// which should be the one that owns this latch.
#[inline]
- pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
- if self.set() {
+ pub(super) unsafe fn set_and_tickle_one(
+ this: *const Self,
+ registry: &Registry,
+ target_worker_index: usize,
+ ) {
+ if Self::set(this) {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
@@ -362,19 +373,42 @@ impl CountLockLatch {
impl Latch for CountLockLatch {
#[inline]
- fn set(&self) {
- if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- self.lock_latch.set();
+ unsafe fn set(this: *const Self) {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ LockLatch::set(&(*this).lock_latch);
}
}
}
-impl<'a, L> Latch for &'a L
-where
- L: Latch,
-{
+/// `&L` without any implication of `dereferenceable` for `Latch::set`
+pub(super) struct LatchRef<'a, L> {
+ inner: *const L,
+ marker: PhantomData<&'a L>,
+}
+
+impl<L> LatchRef<'_, L> {
+ pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
+ LatchRef {
+ inner,
+ marker: PhantomData,
+ }
+ }
+}
+
+unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
+
+impl<L> Deref for LatchRef<'_, L> {
+ type Target = L;
+
+ fn deref(&self) -> &L {
+ // SAFETY: if we have &self, the inner latch is still alive
+ unsafe { &*self.inner }
+ }
+}
+
+impl<L: Latch> Latch for LatchRef<'_, L> {
#[inline]
- fn set(&self) {
- L::set(self);
+ unsafe fn set(this: *const Self) {
+ L::set((*this).inner);
}
}
diff --git a/src/registry.rs b/src/registry.rs
index 279e298..24c0855 100644
--- a/src/registry.rs
+++ b/src/registry.rs
@@ -1,5 +1,5 @@
use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
+use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
use crate::log::Event::*;
use crate::log::Logger;
use crate::sleep::Sleep;
@@ -505,7 +505,7 @@ impl Registry {
assert!(injected && !worker_thread.is_null());
op(&*worker_thread, true)
},
- l,
+ LatchRef::new(l),
);
self.inject(&[job.as_job_ref()]);
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
@@ -575,7 +575,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
- thread_info.terminate.set_and_tickle_one(self, i);
+ unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
@@ -869,7 +869,7 @@ unsafe fn main_loop(
let registry = &*worker_thread.registry;
// let registry know we are ready to do work
- registry.thread_infos[index].primed.set();
+ Latch::set(&registry.thread_infos[index].primed);
// Worker threads should not panic. If they do, just abort, as the
// internal state of the threadpool is corrupted. Note that if
@@ -892,7 +892,7 @@ unsafe fn main_loop(
debug_assert!(worker_thread.take_local_job().is_none());
// let registry know we are done
- registry.thread_infos[index].stopped.set();
+ Latch::set(&registry.thread_infos[index].stopped);
// Normal termination, do not abort.
mem::forget(abort_guard);
diff --git a/src/scope/mod.rs b/src/scope/mod.rs
index 25cda83..b014cf0 100644
--- a/src/scope/mod.rs
+++ b/src/scope/mod.rs
@@ -13,7 +13,7 @@ use crate::unwind;
use std::any::Any;
use std::fmt;
use std::marker::PhantomData;
-use std::mem;
+use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
@@ -540,10 +540,10 @@ impl<'scope> Scope<'scope> {
BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || {
+ let job = HeapJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
- scope.base.execute_job(move || body(scope))
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
});
let job_ref = self.base.heap_job_ref(job);
@@ -562,12 +562,12 @@ impl<'scope> Scope<'scope> {
BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || {
+ let job = ArcJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
+ let scope = scope_ptr.as_ref();
let body = &body;
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- scope.base.execute_job(func);
+ ScopeBase::execute_job(&scope.base, func)
});
self.base.inject_broadcast(job)
}
@@ -600,10 +600,10 @@ impl<'scope> ScopeFifo<'scope> {
BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || {
+ let job = HeapJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
- scope.base.execute_job(move || body(scope))
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
});
let job_ref = self.base.heap_job_ref(job);
@@ -628,12 +628,12 @@ impl<'scope> ScopeFifo<'scope> {
BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || {
+ let job = ArcJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
+ let scope = scope_ptr.as_ref();
let body = &body;
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- scope.base.execute_job(func);
+ ScopeBase::execute_job(&scope.base, func)
});
self.base.inject_broadcast(job)
}
@@ -688,7 +688,7 @@ impl<'scope> ScopeBase<'scope> {
where
FUNC: FnOnce() -> R,
{
- let result = self.execute_job_closure(func);
+ let result = unsafe { Self::execute_job_closure(self, func) };
self.job_completed_latch.wait(owner);
self.maybe_propagate_panic();
result.unwrap() // only None if `op` panicked, and that would have been propagated
@@ -696,28 +696,28 @@ impl<'scope> ScopeBase<'scope> {
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
- fn execute_job<FUNC>(&self, func: FUNC)
+ unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
where
FUNC: FnOnce(),
{
- let _: Option<()> = self.execute_job_closure(func);
+ let _: Option<()> = Self::execute_job_closure(this, func);
}
/// Executes `func` as a job in scope. Adjusts the "job completed"
/// counters and also catches any panic and stores it into
/// `scope`.
- fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
+ unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
where
FUNC: FnOnce() -> R,
{
match unwind::halt_unwinding(func) {
Ok(r) => {
- self.job_completed_latch.set();
+ Latch::set(&(*this).job_completed_latch);
Some(r)
}
Err(err) => {
- self.job_panicked(err);
- self.job_completed_latch.set();
+ (*this).job_panicked(err);
+ Latch::set(&(*this).job_completed_latch);
None
}
}
@@ -725,14 +725,20 @@ impl<'scope> ScopeBase<'scope> {
fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
// capture the first error we see, free the rest
- let nil = ptr::null_mut();
- let mut err = Box::new(err); // box up the fat ptr
- if self
- .panic
- .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
- .is_ok()
- {
- mem::forget(err); // ownership now transferred into self.panic
+ if self.panic.load(Ordering::Relaxed).is_null() {
+ let nil = ptr::null_mut();
+ let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
+ let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
+ if self
+ .panic
+ .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ // ownership now transferred into self.panic
+ } else {
+ // another panic raced in ahead of us, so drop ours
+ let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
+ }
}
}
@@ -791,14 +797,14 @@ impl ScopeLatch {
}
impl Latch for ScopeLatch {
- fn set(&self) {
- match self {
+ unsafe fn set(this: *const Self) {
+ match &*this {
ScopeLatch::Stealing {
latch,
registry,
worker_index,
- } => latch.set_and_tickle_one(registry, *worker_index),
- ScopeLatch::Blocking { latch } => latch.set(),
+ } => CountLatch::set_and_tickle_one(latch, registry, *worker_index),
+ ScopeLatch::Blocking { latch } => Latch::set(latch),
}
}
}
diff --git a/tests/stack_overflow_crash.rs b/tests/stack_overflow_crash.rs
index 6128898..e87e151 100644
--- a/tests/stack_overflow_crash.rs
+++ b/tests/stack_overflow_crash.rs
@@ -1,13 +1,14 @@
use rayon_core::ThreadPoolBuilder;
use std::env;
-use std::process::Command;
+use std::process::{Command, ExitStatus, Stdio};
#[cfg(target_os = "linux")]
use std::os::unix::process::ExitStatusExt;
fn force_stack_overflow(depth: u32) {
- let _buffer = [0u8; 1024 * 1024];
+ let mut buffer = [0u8; 1024 * 1024];
+ std::hint::black_box(&mut buffer);
if depth > 0 {
force_stack_overflow(depth - 1);
}
@@ -34,49 +35,61 @@ fn overflow_code() -> Option<i32> {
#[cfg(windows)]
fn overflow_code() -> Option<i32> {
use std::os::windows::process::ExitStatusExt;
- use std::process::ExitStatus;
ExitStatus::from_raw(0xc00000fd /*STATUS_STACK_OVERFLOW*/).code()
}
-fn main() {
- if env::args().len() == 1 {
- // first check that the recursivecall actually causes a stack overflow, and does not get optimized away
- {
- let status = Command::new(env::current_exe().unwrap())
- .arg("8")
- .status()
- .unwrap();
+#[test]
+fn stack_overflow_crash() {
+ // First check that the recursive call actually causes a stack overflow,
+ // and does not get optimized away.
+ let status = run_ignored("run_with_small_stack");
+ #[cfg(any(unix, windows))]
+ assert_eq!(status.code(), overflow_code());
+ #[cfg(target_os = "linux")]
+ assert!(matches!(
+ status.signal(),
+ Some(libc::SIGABRT | libc::SIGSEGV)
+ ));
- #[cfg(any(unix, windows))]
- assert_eq!(status.code(), overflow_code());
+ // Now run with a larger stack and verify correct operation.
+ let status = run_ignored("run_with_large_stack");
+ assert_eq!(status.code(), Some(0));
+ #[cfg(target_os = "linux")]
+ assert_eq!(status.signal(), None);
+}
- #[cfg(target_os = "linux")]
- assert!(
- status.signal() == Some(11 /*SIGABRT*/) || status.signal() == Some(6 /*SIGSEGV*/)
- );
- }
+fn run_ignored(test: &str) -> ExitStatus {
+ Command::new(env::current_exe().unwrap())
+ .arg("--ignored")
+ .arg("--exact")
+ .arg(test)
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .status()
+ .unwrap()
+}
- // now run with a larger stack and verify correct operation
- {
- let status = Command::new(env::current_exe().unwrap())
- .arg("48")
- .status()
- .unwrap();
- assert_eq!(status.code(), Some(0));
- #[cfg(target_os = "linux")]
- assert_eq!(status.signal(), None);
- }
- } else {
- let stack_size_in_mb: usize = env::args().nth(1).unwrap().parse().unwrap();
- let pool = ThreadPoolBuilder::new()
- .stack_size(stack_size_in_mb * 1024 * 1024)
- .build()
- .unwrap();
- pool.install(|| {
- #[cfg(unix)]
- disable_core();
- force_stack_overflow(32);
- });
- }
+#[test]
+#[ignore]
+fn run_with_small_stack() {
+ run_with_stack(8);
+}
+
+#[test]
+#[ignore]
+fn run_with_large_stack() {
+ run_with_stack(48);
+}
+
+fn run_with_stack(stack_size_in_mb: usize) {
+ let pool = ThreadPoolBuilder::new()
+ .stack_size(stack_size_in_mb * 1024 * 1024)
+ .build()
+ .unwrap();
+ pool.install(|| {
+ #[cfg(unix)]
+ disable_core();
+ force_stack_overflow(32);
+ });
}