diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2023-02-17 09:14:08 +0100 |
---|---|---|
committer | Jeff Vander Stoep <jeffv@google.com> | 2023-02-17 09:14:08 +0100 |
commit | e27ffc4ea6455b714614a8bdab40aaafb02253ac (patch) | |
tree | 726fe77ff8ed6a97c897ec126a4a93427f002ff6 | |
parent | ecec3053e344f9a90a9b055bf28dad7c56719407 (diff) | |
download | rayon-core-e27ffc4ea6455b714614a8bdab40aaafb02253ac.tar.gz |
Upgrade rayon-core to 1.10.2
This project was upgraded with external_updater.
Usage: tools/external_updater/updater.sh update rust/crates/rayon-core
For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
Test: TreeHugger
Change-Id: I81858fa6a9a8c9ff1fa5b373374dd8138be77542
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 2 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | Cargo.toml.orig | 3 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | src/broadcast/mod.rs | 5 | ||||
-rw-r--r--[-rwxr-xr-x] | src/broadcast/test.rs | 0 | ||||
-rw-r--r-- | src/job.rs | 2 | ||||
-rw-r--r-- | src/latch.rs | 90 | ||||
-rw-r--r-- | src/registry.rs | 10 | ||||
-rw-r--r-- | src/scope/mod.rs | 70 | ||||
-rw-r--r-- | tests/stack_overflow_crash.rs | 93 |
12 files changed, 172 insertions, 118 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 975ebdb..213b63f 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "8e48eae53e969f328ef41f8d6977bb8cb7084bec" + "sha1": "b6cdc9da7adc7fe42b28758b2033f0bf8f8dc4b8" }, "path_in_vcs": "rayon-core" }
\ No newline at end of file @@ -42,7 +42,7 @@ rust_library { host_supported: true, crate_name: "rayon_core", cargo_env_compat: true, - cargo_pkg_version: "1.10.1", + cargo_pkg_version: "1.10.2", srcs: ["src/lib.rs"], edition: "2021", rustlibs: [ @@ -13,7 +13,7 @@ edition = "2021" rust-version = "1.56" name = "rayon-core" -version = "1.10.1" +version = "1.10.2" authors = [ "Niko Matsakis <niko@alum.mit.edu>", "Josh Stone <cuviper@gmail.com>", @@ -37,7 +37,6 @@ repository = "https://github.com/rayon-rs/rayon" [[test]] name = "stack_overflow_crash" path = "tests/stack_overflow_crash.rs" -harness = false [[test]] name = "double_init_fail" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 9e45992..878dac7 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "rayon-core" -version = "1.10.1" +version = "1.10.2" authors = ["Niko Matsakis <niko@alum.mit.edu>", "Josh Stone <cuviper@gmail.com>"] description = "Core APIs for Rayon" @@ -33,7 +33,6 @@ libc = "0.2" [[test]] name = "stack_overflow_crash" path = "tests/stack_overflow_crash.rs" -harness = false # NB: having one [[test]] manually defined means we need to declare them all @@ -11,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/rayon-core/rayon-core-1.10.1.crate" + value: "https://static.crates.io/crates/rayon-core/rayon-core-1.10.2.crate" } - version: "1.10.1" + version: "1.10.2" license_type: NOTICE last_upgrade_date { - year: 2022 - month: 12 - day: 13 + year: 2023 + month: 2 + day: 17 } } diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs index 452aa71..d991c54 100644 --- a/src/broadcast/mod.rs +++ b/src/broadcast/mod.rs @@ -1,4 +1,5 @@ use crate::job::{ArcJob, StackJob}; +use crate::latch::LatchRef; use crate::registry::{Registry, WorkerThread}; use crate::scope::ScopeLatch; use std::fmt; @@ -107,7 +108,9 @@ where let n_threads = registry.num_threads(); let current_thread = WorkerThread::current().as_ref(); let latch = ScopeLatch::with_count(n_threads, current_thread); - let jobs: Vec<_> = (0..n_threads).map(|_| StackJob::new(&f, &latch)).collect(); + let jobs: Vec<_> = (0..n_threads) + .map(|_| StackJob::new(&f, LatchRef::new(&latch))) + .collect(); let job_refs = jobs.iter().map(|job| job.as_job_ref()); registry.inject_broadcast(job_refs); diff --git a/src/broadcast/test.rs b/src/broadcast/test.rs index a765cb0..a765cb0 100755..100644 --- a/src/broadcast/test.rs +++ b/src/broadcast/test.rs @@ -112,7 +112,7 @@ where let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); (*this.result.get()) = JobResult::call(func); - this.latch.set(); + Latch::set(&this.latch); mem::forget(abort); } } diff --git a/src/latch.rs b/src/latch.rs index 0909293..de43272 100644 --- a/src/latch.rs +++ b/src/latch.rs @@ -1,3 +1,5 @@ +use std::marker::PhantomData; +use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::usize; @@ -37,10 +39,15 @@ pub(super) trait Latch { /// /// Setting a latch triggers other threads to wake up and (in some /// cases) complete. This may, in turn, cause memory to be - /// allocated and so forth. One must be very careful about this, + /// deallocated and so forth. One must be very careful about this, /// and it's typically better to read all the fields you will need /// to access *before* a latch is set! - fn set(&self); + /// + /// This function operates on `*const Self` instead of `&self` to allow it + /// to become dangling during this call. The caller must ensure that the + /// pointer is valid upon entry, and not invalidated during the call by any + /// actions other than `set` itself. + unsafe fn set(this: *const Self); } pub(super) trait AsCoreLatch { @@ -123,8 +130,8 @@ impl CoreLatch { /// doing some wakeups; those are encapsulated in the surrounding /// latch code. #[inline] - fn set(&self) -> bool { - let old_state = self.state.swap(SET, Ordering::AcqRel); + unsafe fn set(this: *const Self) -> bool { + let old_state = (*this).state.swap(SET, Ordering::AcqRel); old_state == SLEEPING } @@ -186,16 +193,16 @@ impl<'r> AsCoreLatch for SpinLatch<'r> { impl<'r> Latch for SpinLatch<'r> { #[inline] - fn set(&self) { + unsafe fn set(this: *const Self) { let cross_registry; - let registry: &Registry = if self.cross { + let registry: &Registry = if (*this).cross { // Ensure the registry stays alive while we notify it. // Otherwise, it would be possible that we set the spin // latch and the other thread sees it and exits, causing // the registry to be deallocated, all before we get a // chance to invoke `registry.notify_worker_latch_is_set`. - cross_registry = Arc::clone(self.registry); + cross_registry = Arc::clone((*this).registry); &cross_registry } else { // If this is not a "cross-registry" spin-latch, then the @@ -203,12 +210,12 @@ impl<'r> Latch for SpinLatch<'r> { // that the registry stays alive. However, that doesn't // include this *particular* `Arc` handle if the waiting // thread then exits, so we must completely dereference it. - self.registry + (*this).registry }; - let target_worker_index = self.target_worker_index; + let target_worker_index = (*this).target_worker_index; - // NOTE: Once we `set`, the target may proceed and invalidate `&self`! - if self.core_latch.set() { + // NOTE: Once we `set`, the target may proceed and invalidate `this`! + if CoreLatch::set(&(*this).core_latch) { // Subtle: at this point, we can no longer read from // `self`, because the thread owning this spin latch may // have awoken and deallocated the latch. Therefore, we @@ -255,10 +262,10 @@ impl LockLatch { impl Latch for LockLatch { #[inline] - fn set(&self) { - let mut guard = self.m.lock().unwrap(); + unsafe fn set(this: *const Self) { + let mut guard = (*this).m.lock().unwrap(); *guard = true; - self.v.notify_all(); + (*this).v.notify_all(); } } @@ -307,9 +314,9 @@ impl CountLatch { /// count, then the latch is **set**, and calls to `probe()` will /// return true. Returns whether the latch was set. #[inline] - pub(super) fn set(&self) -> bool { - if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { - self.core_latch.set(); + pub(super) unsafe fn set(this: *const Self) -> bool { + if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { + CoreLatch::set(&(*this).core_latch); true } else { false @@ -320,8 +327,12 @@ impl CountLatch { /// the latch is set, then the specific worker thread is tickled, /// which should be the one that owns this latch. #[inline] - pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) { - if self.set() { + pub(super) unsafe fn set_and_tickle_one( + this: *const Self, + registry: &Registry, + target_worker_index: usize, + ) { + if Self::set(this) { registry.notify_worker_latch_is_set(target_worker_index); } } @@ -362,19 +373,42 @@ impl CountLockLatch { impl Latch for CountLockLatch { #[inline] - fn set(&self) { - if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { - self.lock_latch.set(); + unsafe fn set(this: *const Self) { + if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { + LockLatch::set(&(*this).lock_latch); } } } -impl<'a, L> Latch for &'a L -where - L: Latch, -{ +/// `&L` without any implication of `dereferenceable` for `Latch::set` +pub(super) struct LatchRef<'a, L> { + inner: *const L, + marker: PhantomData<&'a L>, +} + +impl<L> LatchRef<'_, L> { + pub(super) fn new(inner: &L) -> LatchRef<'_, L> { + LatchRef { + inner, + marker: PhantomData, + } + } +} + +unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} + +impl<L> Deref for LatchRef<'_, L> { + type Target = L; + + fn deref(&self) -> &L { + // SAFETY: if we have &self, the inner latch is still alive + unsafe { &*self.inner } + } +} + +impl<L: Latch> Latch for LatchRef<'_, L> { #[inline] - fn set(&self) { - L::set(self); + unsafe fn set(this: *const Self) { + L::set((*this).inner); } } diff --git a/src/registry.rs b/src/registry.rs index 279e298..24c0855 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -1,5 +1,5 @@ use crate::job::{JobFifo, JobRef, StackJob}; -use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch}; +use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch}; use crate::log::Event::*; use crate::log::Logger; use crate::sleep::Sleep; @@ -505,7 +505,7 @@ impl Registry { assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - l, + LatchRef::new(l), ); self.inject(&[job.as_job_ref()]); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. @@ -575,7 +575,7 @@ impl Registry { pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { - thread_info.terminate.set_and_tickle_one(self, i); + unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; } } } @@ -869,7 +869,7 @@ unsafe fn main_loop( let registry = &*worker_thread.registry; // let registry know we are ready to do work - registry.thread_infos[index].primed.set(); + Latch::set(®istry.thread_infos[index].primed); // Worker threads should not panic. If they do, just abort, as the // internal state of the threadpool is corrupted. Note that if @@ -892,7 +892,7 @@ unsafe fn main_loop( debug_assert!(worker_thread.take_local_job().is_none()); // let registry know we are done - registry.thread_infos[index].stopped.set(); + Latch::set(®istry.thread_infos[index].stopped); // Normal termination, do not abort. mem::forget(abort_guard); diff --git a/src/scope/mod.rs b/src/scope/mod.rs index 25cda83..b014cf0 100644 --- a/src/scope/mod.rs +++ b/src/scope/mod.rs @@ -13,7 +13,7 @@ use crate::unwind; use std::any::Any; use std::fmt; use std::marker::PhantomData; -use std::mem; +use std::mem::ManuallyDrop; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; @@ -540,10 +540,10 @@ impl<'scope> Scope<'scope> { BODY: FnOnce(&Scope<'scope>) + Send + 'scope, { let scope_ptr = ScopePtr(self); - let job = HeapJob::new(move || { + let job = HeapJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; - scope.base.execute_job(move || body(scope)) + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) }); let job_ref = self.base.heap_job_ref(job); @@ -562,12 +562,12 @@ impl<'scope> Scope<'scope> { BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { let scope_ptr = ScopePtr(self); - let job = ArcJob::new(move || { + let job = ArcJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; + let scope = scope_ptr.as_ref(); let body = &body; let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); - scope.base.execute_job(func); + ScopeBase::execute_job(&scope.base, func) }); self.base.inject_broadcast(job) } @@ -600,10 +600,10 @@ impl<'scope> ScopeFifo<'scope> { BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, { let scope_ptr = ScopePtr(self); - let job = HeapJob::new(move || { + let job = HeapJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; - scope.base.execute_job(move || body(scope)) + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) }); let job_ref = self.base.heap_job_ref(job); @@ -628,12 +628,12 @@ impl<'scope> ScopeFifo<'scope> { BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { let scope_ptr = ScopePtr(self); - let job = ArcJob::new(move || { + let job = ArcJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; + let scope = scope_ptr.as_ref(); let body = &body; let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); - scope.base.execute_job(func); + ScopeBase::execute_job(&scope.base, func) }); self.base.inject_broadcast(job) } @@ -688,7 +688,7 @@ impl<'scope> ScopeBase<'scope> { where FUNC: FnOnce() -> R, { - let result = self.execute_job_closure(func); + let result = unsafe { Self::execute_job_closure(self, func) }; self.job_completed_latch.wait(owner); self.maybe_propagate_panic(); result.unwrap() // only None if `op` panicked, and that would have been propagated @@ -696,28 +696,28 @@ impl<'scope> ScopeBase<'scope> { /// Executes `func` as a job, either aborting or executing as /// appropriate. - fn execute_job<FUNC>(&self, func: FUNC) + unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC) where FUNC: FnOnce(), { - let _: Option<()> = self.execute_job_closure(func); + let _: Option<()> = Self::execute_job_closure(this, func); } /// Executes `func` as a job in scope. Adjusts the "job completed" /// counters and also catches any panic and stores it into /// `scope`. - fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R> + unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R> where FUNC: FnOnce() -> R, { match unwind::halt_unwinding(func) { Ok(r) => { - self.job_completed_latch.set(); + Latch::set(&(*this).job_completed_latch); Some(r) } Err(err) => { - self.job_panicked(err); - self.job_completed_latch.set(); + (*this).job_panicked(err); + Latch::set(&(*this).job_completed_latch); None } } @@ -725,14 +725,20 @@ impl<'scope> ScopeBase<'scope> { fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { // capture the first error we see, free the rest - let nil = ptr::null_mut(); - let mut err = Box::new(err); // box up the fat ptr - if self - .panic - .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed) - .is_ok() - { - mem::forget(err); // ownership now transferred into self.panic + if self.panic.load(Ordering::Relaxed).is_null() { + let nil = ptr::null_mut(); + let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr + let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err; + if self + .panic + .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + // ownership now transferred into self.panic + } else { + // another panic raced in ahead of us, so drop ours + let _: Box<Box<_>> = ManuallyDrop::into_inner(err); + } } } @@ -791,14 +797,14 @@ impl ScopeLatch { } impl Latch for ScopeLatch { - fn set(&self) { - match self { + unsafe fn set(this: *const Self) { + match &*this { ScopeLatch::Stealing { latch, registry, worker_index, - } => latch.set_and_tickle_one(registry, *worker_index), - ScopeLatch::Blocking { latch } => latch.set(), + } => CountLatch::set_and_tickle_one(latch, registry, *worker_index), + ScopeLatch::Blocking { latch } => Latch::set(latch), } } } diff --git a/tests/stack_overflow_crash.rs b/tests/stack_overflow_crash.rs index 6128898..e87e151 100644 --- a/tests/stack_overflow_crash.rs +++ b/tests/stack_overflow_crash.rs @@ -1,13 +1,14 @@ use rayon_core::ThreadPoolBuilder; use std::env; -use std::process::Command; +use std::process::{Command, ExitStatus, Stdio}; #[cfg(target_os = "linux")] use std::os::unix::process::ExitStatusExt; fn force_stack_overflow(depth: u32) { - let _buffer = [0u8; 1024 * 1024]; + let mut buffer = [0u8; 1024 * 1024]; + std::hint::black_box(&mut buffer); if depth > 0 { force_stack_overflow(depth - 1); } @@ -34,49 +35,61 @@ fn overflow_code() -> Option<i32> { #[cfg(windows)] fn overflow_code() -> Option<i32> { use std::os::windows::process::ExitStatusExt; - use std::process::ExitStatus; ExitStatus::from_raw(0xc00000fd /*STATUS_STACK_OVERFLOW*/).code() } -fn main() { - if env::args().len() == 1 { - // first check that the recursivecall actually causes a stack overflow, and does not get optimized away - { - let status = Command::new(env::current_exe().unwrap()) - .arg("8") - .status() - .unwrap(); +#[test] +fn stack_overflow_crash() { + // First check that the recursive call actually causes a stack overflow, + // and does not get optimized away. + let status = run_ignored("run_with_small_stack"); + #[cfg(any(unix, windows))] + assert_eq!(status.code(), overflow_code()); + #[cfg(target_os = "linux")] + assert!(matches!( + status.signal(), + Some(libc::SIGABRT | libc::SIGSEGV) + )); - #[cfg(any(unix, windows))] - assert_eq!(status.code(), overflow_code()); + // Now run with a larger stack and verify correct operation. + let status = run_ignored("run_with_large_stack"); + assert_eq!(status.code(), Some(0)); + #[cfg(target_os = "linux")] + assert_eq!(status.signal(), None); +} - #[cfg(target_os = "linux")] - assert!( - status.signal() == Some(11 /*SIGABRT*/) || status.signal() == Some(6 /*SIGSEGV*/) - ); - } +fn run_ignored(test: &str) -> ExitStatus { + Command::new(env::current_exe().unwrap()) + .arg("--ignored") + .arg("--exact") + .arg(test) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .unwrap() +} - // now run with a larger stack and verify correct operation - { - let status = Command::new(env::current_exe().unwrap()) - .arg("48") - .status() - .unwrap(); - assert_eq!(status.code(), Some(0)); - #[cfg(target_os = "linux")] - assert_eq!(status.signal(), None); - } - } else { - let stack_size_in_mb: usize = env::args().nth(1).unwrap().parse().unwrap(); - let pool = ThreadPoolBuilder::new() - .stack_size(stack_size_in_mb * 1024 * 1024) - .build() - .unwrap(); - pool.install(|| { - #[cfg(unix)] - disable_core(); - force_stack_overflow(32); - }); - } +#[test] +#[ignore] +fn run_with_small_stack() { + run_with_stack(8); +} + +#[test] +#[ignore] +fn run_with_large_stack() { + run_with_stack(48); +} + +fn run_with_stack(stack_size_in_mb: usize) { + let pool = ThreadPoolBuilder::new() + .stack_size(stack_size_in_mb * 1024 * 1024) + .build() + .unwrap(); + pool.install(|| { + #[cfg(unix)] + disable_core(); + force_stack_overflow(32); + }); } |