diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-06-15 21:43:58 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-06-15 21:43:58 +0000 |
commit | 9d8eecbcef7019c85a6aeca985df23facf509e2b (patch) | |
tree | b70bd833b2f3f8450b92506a5a4f7e2a04b09e15 | |
parent | 2d7e4c7689730f30310ca78b72d3ad78991d27b9 (diff) | |
parent | 163c4fabb8b1b35c31fcd4a77f34f24a14093cef (diff) | |
download | futures-channel-aml_tz3_314012070.tar.gz |
Snap for 8730993 from 163c4fabb8b1b35c31fcd4a77f34f24a14093cef to mainline-tzdata3-releaseaml_tz3_314012070aml_tz3_314012050aml_tz3_314012010aml_tz3_313110000aml_tz3_312511020aml_tz3_312511010aml_tz3_312410020aml_tz3_312410010android12-mainline-tzdata3-releaseaml_tz3_314012010
Change-Id: Ia292ea706d9827b0c64b5a3554748294cca4a9c0
-rw-r--r-- | .cargo_vcs_info.json | 7 | ||||
-rw-r--r-- | Android.bp | 6 | ||||
-rw-r--r-- | Cargo.toml | 38 | ||||
-rw-r--r-- | Cargo.toml.orig | 18 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | README.md | 23 | ||||
-rw-r--r-- | TEST_MAPPING | 49 | ||||
-rw-r--r-- | benches/sync_mpsc.rs | 15 | ||||
-rw-r--r-- | build.rs | 41 | ||||
-rw-r--r-- | cargo2android.json | 5 | ||||
-rw-r--r-- | no_atomic_cas.rs | 13 | ||||
-rw-r--r-- | src/lib.rs | 54 | ||||
-rw-r--r-- | src/lock.rs | 7 | ||||
-rw-r--r-- | src/mpsc/mod.rs | 196 | ||||
-rw-r--r-- | src/mpsc/queue.rs | 22 | ||||
-rw-r--r-- | src/mpsc/sink_impl.rs | 58 | ||||
-rw-r--r-- | src/oneshot.rs | 39 | ||||
-rw-r--r-- | tests/channel.rs | 10 | ||||
-rw-r--r-- | tests/mpsc-close.rs | 51 | ||||
-rw-r--r-- | tests/mpsc.rs | 58 | ||||
-rw-r--r-- | tests/oneshot.rs | 18 |
21 files changed, 355 insertions, 381 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index deccf0d..f3ad3ab 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,5 @@ { "git": { - "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44" - }, - "path_in_vcs": "futures-channel" -}
\ No newline at end of file + "sha1": "c91f8691672c7401b1923ab00bf138975c99391a" + } +} @@ -45,8 +45,6 @@ rust_library { name: "libfutures_channel", host_supported: true, crate_name: "futures_channel", - cargo_env_compat: true, - cargo_pkg_version: "0.3.21", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -59,9 +57,11 @@ rust_library { ], apex_available: [ "//apex_available:platform", - "com.android.bluetooth", "com.android.resolv", "com.android.virt", ], min_sdk_version: "29", } + +// dependent_library ["feature_list"] +// futures-core-0.3.14 "alloc,std" @@ -3,37 +3,32 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies. +# to registry (e.g., crates.io) dependencies # -# If you are reading this file be aware that the original Cargo.toml -# will likely look very different (and much more reasonable). -# See Cargo.toml.orig for the original contents. +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) [package] edition = "2018" -rust-version = "1.45" name = "futures-channel" -version = "0.3.21" -description = """ -Channels for asynchronous communication using futures-rs. -""" +version = "0.3.13" +authors = ["Alex Crichton <alex@alexcrichton.com>"] +description = "Channels for asynchronous communication using futures-rs.\n" homepage = "https://rust-lang.github.io/futures-rs" +documentation = "https://docs.rs/futures-channel/0.3" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" - [package.metadata.docs.rs] all-features = true -rustdoc-args = [ - "--cfg", - "docsrs", -] - +rustdoc-args = ["--cfg", "docsrs"] [dependencies.futures-core] -version = "0.3.21" +version = "0.3.13" default-features = false [dependencies.futures-sink] -version = "0.3.21" +version = "0.3.13" optional = true default-features = false @@ -41,11 +36,8 @@ default-features = false [features] alloc = ["futures-core/alloc"] -cfg-target-has-atomic = [] +cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"] default = ["std"] sink = ["futures-sink"] -std = [ - "alloc", - "futures-core/std", -] -unstable = [] +std = ["alloc", "futures-core/std"] +unstable = ["futures-core/unstable"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index f356eab..9a33320 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,11 +1,12 @@ [package] name = "futures-channel" -version = "0.3.21" edition = "2018" -rust-version = "1.45" +version = "0.3.13" +authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" +documentation = "https://docs.rs/futures-channel/0.3" description = """ Channels for asynchronous communication using futures-rs. """ @@ -16,14 +17,15 @@ std = ["alloc", "futures-core/std"] alloc = ["futures-core/alloc"] sink = ["futures-sink"] -# These features are no longer used. -# TODO: remove in the next major version. -unstable = [] -cfg-target-has-atomic = [] +# Unstable features +# These features are outside of the normal semver guarantees and require the +# `unstable` feature as an explicit opt-in to unstable API. +unstable = ["futures-core/unstable"] +cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.21", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.13", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true } [dev-dependencies] futures = { path = "../futures", default-features = true } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.21.crate" + value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.13.crate" } - version: "0.3.21" + version: "0.3.13" license_type: NOTICE last_upgrade_date { - year: 2022 - month: 3 + year: 2021 + month: 4 day: 1 } } diff --git a/README.md b/README.md deleted file mode 100644 index 3287be9..0000000 --- a/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# futures-channel - -Channels for asynchronous communication using futures-rs. - -## Usage - -Add this to your `Cargo.toml`: - -```toml -[dependencies] -futures-channel = "0.3" -``` - -The current `futures-channel` requires Rust 1.45 or later. - -## License - -Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or -[MIT license](LICENSE-MIT) at your option. - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in the work by you, as defined in the Apache-2.0 license, shall -be dual licensed as above, without any additional terms or conditions. diff --git a/TEST_MAPPING b/TEST_MAPPING index 5ef61de..6798806 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -1,45 +1,56 @@ // Generated by update_crate_tests.py for tests that depend on this crate. { - "imports": [ + "presubmit": [ { - "path": "external/rust/crates/anyhow" + "name": "anyhow_device_test_tests_test_boxed" }, { - "path": "external/rust/crates/futures-util" + "name": "anyhow_device_test_tests_test_convert" }, { - "path": "external/rust/crates/tokio" + "name": "anyhow_device_test_tests_test_ffi" }, { - "path": "external/rust/crates/tokio-test" - } - ], - "presubmit": [ + "name": "anyhow_device_test_tests_test_repr" + }, { - "name": "ZipFuseTest" + "name": "tokio-test_device_test_tests_block_on" }, { - "name": "authfs_device_test_src_lib" + "name": "anyhow_device_test_tests_test_chain" }, { - "name": "doh_unit_test" + "name": "anyhow_device_test_tests_test_source" }, { - "name": "virtualizationservice_device_test" - } - ], - "presubmit-rust": [ + "name": "tokio-test_device_test_tests_io" + }, + { + "name": "anyhow_device_test_tests_test_autotrait" + }, + { + "name": "anyhow_device_test_src_lib" + }, + { + "name": "anyhow_device_test_tests_test_context" + }, + { + "name": "anyhow_device_test_tests_test_downcast" + }, + { + "name": "anyhow_device_test_tests_test_macros" + }, { - "name": "ZipFuseTest" + "name": "futures-util_device_test_src_lib" }, { - "name": "authfs_device_test_src_lib" + "name": "anyhow_device_test_tests_test_fmt" }, { - "name": "doh_unit_test" + "name": "tokio-test_device_test_tests_macros" }, { - "name": "virtualizationservice_device_test" + "name": "tokio-test_device_test_src_lib" } ] } diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs index 7c3c3d3..e22fe60 100644 --- a/benches/sync_mpsc.rs +++ b/benches/sync_mpsc.rs @@ -7,8 +7,8 @@ use { futures::{ channel::mpsc::{self, Sender, UnboundedSender}, ready, - sink::Sink, stream::{Stream, StreamExt}, + sink::Sink, task::{Context, Poll}, }, futures_test::task::noop_context, @@ -25,6 +25,7 @@ fn unbounded_1_tx(b: &mut Bencher) { // 1000 iterations to avoid measuring overhead of initialization // Result should be divided by 1000 for i in 0..1000 { + // Poll, not ready, park assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); @@ -72,6 +73,7 @@ fn unbounded_uncontended(b: &mut Bencher) { }) } + /// A Stream that continuously sends incrementing number of the queue struct TestSender { tx: Sender<u32>, @@ -82,7 +84,9 @@ struct TestSender { impl Stream for TestSender { type Item = u32; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll<Option<Self::Item>> + { let this = &mut *self; let mut tx = Pin::new(&mut this.tx); @@ -119,7 +123,12 @@ fn bounded_100_tx(b: &mut Bencher) { // Each sender can send one item after specified capacity let (tx, mut rx) = mpsc::channel(0); - let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect(); + let mut tx: Vec<_> = (0..100).map(|_| { + TestSender { + tx: tx.clone(), + last: 0 + } + }).collect(); for i in 0..10 { for x in &mut tx { diff --git a/build.rs b/build.rs deleted file mode 100644 index 05e0496..0000000 --- a/build.rs +++ /dev/null @@ -1,41 +0,0 @@ -// The rustc-cfg listed below are considered public API, but it is *unstable* -// and outside of the normal semver guarantees: -// -// - `futures_no_atomic_cas` -// Assume the target does *not* support atomic CAS operations. -// This is usually detected automatically by the build script, but you may -// need to enable it manually when building for custom targets or using -// non-cargo build systems that don't run the build script. -// -// With the exceptions mentioned above, the rustc-cfg emitted by the build -// script are *not* public API. - -#![warn(rust_2018_idioms, single_use_lifetimes)] - -use std::env; - -include!("no_atomic_cas.rs"); - -fn main() { - let target = match env::var("TARGET") { - Ok(target) => target, - Err(e) => { - println!( - "cargo:warning={}: unable to get TARGET environment variable: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Note that this is `no_*`, not `has_*`. This allows treating - // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't - // run. This is needed for compatibility with non-cargo build systems that - // don't run the build script. - if NO_ATOMIC_CAS.contains(&&*target) { - println!("cargo:rustc-cfg=futures_no_atomic_cas"); - } - - println!("cargo:rerun-if-changed=no_atomic_cas.rs"); -} diff --git a/cargo2android.json b/cargo2android.json index a7e2a4b..01465d0 100644 --- a/cargo2android.json +++ b/cargo2android.json @@ -1,12 +1,11 @@ { "apex-available": [ "//apex_available:platform", - "com.android.bluetooth", "com.android.resolv", "com.android.virt" ], + "min_sdk_version": "29", "dependencies": true, "device": true, - "min-sdk-version": "29", "run": true -} +}
\ No newline at end of file diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs deleted file mode 100644 index 9b05d4b..0000000 --- a/no_atomic_cas.rs +++ /dev/null @@ -1,13 +0,0 @@ -// This file is @generated by no_atomic_cas.sh. -// It is not intended for manual editing. - -const NO_ATOMIC_CAS: &[&str] = &[ - "avr-unknown-gnu-atmega328", - "bpfeb-unknown-none", - "bpfel-unknown-none", - "msp430-none-elf", - "riscv32i-unknown-none-elf", - "riscv32imc-unknown-none-elf", - "thumbv4t-none-eabi", - "thumbv6m-none-eabi", -]; @@ -11,32 +11,34 @@ //! All items are only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. +#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] + #![cfg_attr(not(feature = "std"), no_std)] -#![warn( - missing_debug_implementations, - missing_docs, - rust_2018_idioms, - single_use_lifetimes, - unreachable_pub -)] -#![doc(test( - no_crate_inject, - attr( - deny(warnings, rust_2018_idioms, single_use_lifetimes), - allow(dead_code, unused_assignments, unused_variables) - ) -))] -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "alloc")] -extern crate alloc; +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] +// It cannot be included in the published code because this lints have false positives in the minimum required version. +#![cfg_attr(test, warn(single_use_lifetimes))] +#![warn(clippy::all)] +#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] + +#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] +compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); + +macro_rules! cfg_target_has_atomic { + ($($item:item)*) => {$( + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + $item + )*}; +} + +cfg_target_has_atomic! { + #[cfg(feature = "alloc")] + extern crate alloc; -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "alloc")] -mod lock; -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "std")] -pub mod mpsc; -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "alloc")] -pub mod oneshot; + #[cfg(feature = "alloc")] + mod lock; + #[cfg(feature = "std")] + pub mod mpsc; + #[cfg(feature = "alloc")] + pub mod oneshot; +} diff --git a/src/lock.rs b/src/lock.rs index b328d0f..5eecdd9 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -6,8 +6,8 @@ use core::cell::UnsafeCell; use core::ops::{Deref, DerefMut}; -use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering::SeqCst; +use core::sync::atomic::AtomicBool; /// A "mutex" around a value, similar to `std::sync::Mutex<T>`. /// @@ -37,7 +37,10 @@ unsafe impl<T: Send> Sync for Lock<T> {} impl<T> Lock<T> { /// Creates a new lock around the given value. pub(crate) fn new(t: T) -> Self { - Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) } + Self { + locked: AtomicBool::new(false), + data: UnsafeCell::new(t), + } } /// Attempts to acquire this lock, returning whether the lock was acquired or diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs index 44834b7..dd50343 100644 --- a/src/mpsc/mod.rs +++ b/src/mpsc/mod.rs @@ -79,13 +79,13 @@ // by the queue structure. use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::__internal::AtomicWaker; use futures_core::task::{Context, Poll, Waker}; +use futures_core::task::__internal::AtomicWaker; use std::fmt; use std::pin::Pin; +use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; -use std::sync::{Arc, Mutex}; use std::thread; use crate::mpsc::queue::Queue; @@ -209,7 +209,9 @@ impl SendError { impl<T> fmt::Debug for TrySendError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() + f.debug_struct("TrySendError") + .field("kind", &self.err.kind) + .finish() } } @@ -249,7 +251,8 @@ impl<T> TrySendError<T> { impl fmt::Debug for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("TryRecvError").finish() + f.debug_tuple("TryRecvError") + .finish() } } @@ -332,7 +335,10 @@ struct SenderTask { impl SenderTask { fn new() -> Self { - Self { task: None, is_parked: false } + Self { + task: None, + is_parked: false, + } } fn notify(&mut self) { @@ -375,7 +381,9 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { maybe_parked: false, }; - let rx = Receiver { inner: Some(inner) }; + let rx = Receiver { + inner: Some(inner), + }; (Sender(Some(tx)), rx) } @@ -391,6 +399,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { + let inner = Arc::new(UnboundedInner { state: AtomicUsize::new(INIT_STATE), message_queue: Queue::new(), @@ -398,9 +407,13 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { recv_task: AtomicWaker::new(), }); - let tx = UnboundedSenderInner { inner: inner.clone() }; + let tx = UnboundedSenderInner { + inner: inner.clone(), + }; - let rx = UnboundedReceiver { inner: Some(inner) }; + let rx = UnboundedReceiver { + inner: Some(inner), + }; (UnboundedSender(Some(tx)), rx) } @@ -417,10 +430,13 @@ impl<T> UnboundedSenderInner<T> { if state.is_open { Poll::Ready(Ok(())) } else { - Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) + Poll::Ready(Err(SendError { + kind: SendErrorKind::Disconnected, + })) } } + // Push message to the queue and signal to the receiver fn queue_push_and_signal(&self, msg: T) { // Push the message onto the message queue @@ -446,17 +462,16 @@ impl<T> UnboundedSenderInner<T> { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!( - state.num_messages < MAX_CAPACITY, - "buffer space \ - exhausted; sending this messages would overflow the state" - ); + assert!(state.num_messages < MAX_CAPACITY, "buffer space \ + exhausted; sending this messages would overflow the state"); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => return Some(state.num_messages), + Ok(_) => { + return Some(state.num_messages) + } Err(actual) => curr = actual, } } @@ -501,7 +516,12 @@ impl<T> BoundedSenderInner<T> { fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { // If the sender is currently blocked, reject the message if !self.poll_unparked(None).is_ready() { - return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); + return Err(TrySendError { + err: SendError { + kind: SendErrorKind::Full, + }, + val: msg, + }); } // The channel has capacity to accept the message, so send it @@ -510,8 +530,11 @@ impl<T> BoundedSenderInner<T> { // Do the send without failing. // Can be called only by bounded sender. - fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { - // Anyone calling do_send *should* make sure there is room first, + #[allow(clippy::debug_assert_with_mut_call)] + fn do_send_b(&mut self, msg: T) + -> Result<(), TrySendError<T>> + { + // Anyone callig do_send *should* make sure there is room first, // but assert here for tests as a sanity check. debug_assert!(self.poll_unparked(None).is_ready()); @@ -528,12 +551,12 @@ impl<T> BoundedSenderInner<T> { // the configured buffer size num_messages > self.inner.buffer } - None => { - return Err(TrySendError { - err: SendError { kind: SendErrorKind::Disconnected }, - val: msg, - }) - } + None => return Err(TrySendError { + err: SendError { + kind: SendErrorKind::Disconnected, + }, + val: msg, + }), }; // If the channel has reached capacity, then the sender task needs to @@ -577,17 +600,16 @@ impl<T> BoundedSenderInner<T> { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!( - state.num_messages < MAX_CAPACITY, - "buffer space \ - exhausted; sending this messages would overflow the state" - ); + assert!(state.num_messages < MAX_CAPACITY, "buffer space \ + exhausted; sending this messages would overflow the state"); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => return Some(state.num_messages), + Ok(_) => { + return Some(state.num_messages) + } Err(actual) => curr = actual, } } @@ -622,10 +644,15 @@ impl<T> BoundedSenderInner<T> { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Result<(), SendError>> { let state = decode_state(self.inner.state.load(SeqCst)); if !state.is_open { - return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); + return Poll::Ready(Err(SendError { + kind: SendErrorKind::Disconnected, + })); } self.poll_unparked(Some(cx)).map(Ok) @@ -672,7 +699,7 @@ impl<T> BoundedSenderInner<T> { if !task.is_parked { self.maybe_parked = false; - return Poll::Ready(()); + return Poll::Ready(()) } // At this point, an unpark request is pending, so there will be an @@ -697,7 +724,12 @@ impl<T> Sender<T> { if let Some(inner) = &mut self.0 { inner.try_send(msg) } else { - Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) + Err(TrySendError { + err: SendError { + kind: SendErrorKind::Disconnected, + }, + val: msg, + }) } } @@ -707,7 +739,8 @@ impl<T> Sender<T> { /// [`poll_ready`](Sender::poll_ready) has reported that the channel is /// ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.try_send(msg).map_err(|e| e.err) + self.try_send(msg) + .map_err(|e| e.err) } /// Polls the channel to determine if there is guaranteed capacity to send @@ -722,8 +755,13 @@ impl<T> Sender<T> { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { - let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; + pub fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Result<(), SendError>> { + let inner = self.0.as_mut().ok_or(SendError { + kind: SendErrorKind::Disconnected, + })?; inner.poll_ready(cx) } @@ -761,10 +799,7 @@ impl<T> Sender<T> { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver<H>(&self, hasher: &mut H) - where - H: std::hash::Hasher, - { + pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -774,8 +809,13 @@ impl<T> Sender<T> { impl<T> UnboundedSender<T> { /// Check if the channel is ready to receive a message. - pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { - let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; + pub fn poll_ready( + &self, + _: &mut Context<'_>, + ) -> Poll<Result<(), SendError>> { + let inner = self.0.as_ref().ok_or(SendError { + kind: SendErrorKind::Disconnected, + })?; inner.poll_ready_nb() } @@ -805,7 +845,12 @@ impl<T> UnboundedSender<T> { } } - Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) + Err(TrySendError { + err: SendError { + kind: SendErrorKind::Disconnected, + }, + val: msg, + }) } /// Send a message on the channel. @@ -813,7 +858,8 @@ impl<T> UnboundedSender<T> { /// This method should only be called after `poll_ready` has been used to /// verify that the channel is ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.do_send_nb(msg).map_err(|e| e.err) + self.do_send_nb(msg) + .map_err(|e| e.err) } /// Sends a message along this channel. @@ -842,10 +888,7 @@ impl<T> UnboundedSender<T> { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver<H>(&self, hasher: &mut H) - where - H: std::hash::Hasher, - { + pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -885,7 +928,9 @@ impl<T> Clone for UnboundedSenderInner<T> { Ok(_) => { // The ABA problem doesn't matter here. We only care that the // number of senders never exceeds the maximum. - return Self { inner: self.inner.clone() }; + return Self { + inner: self.inner.clone(), + }; } Err(actual) => curr = actual, } @@ -976,22 +1021,19 @@ impl<T> Receiver<T> { /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. /// - /// This function returns: - /// * `Ok(Some(t))` when message is fetched - /// * `Ok(None)` when channel is closed and no messages left in the queue - /// * `Err(e)` when there are no messages available, but channel is not yet closed + /// This function will panic if called after `try_next` or `poll_next` has + /// returned `None`. pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => Ok(msg), + Poll::Ready(msg) => { + Ok(msg) + }, Poll::Pending => Err(TryRecvError { _priv: () }), } } fn next_message(&mut self) -> Poll<Option<T>> { - let inner = match self.inner.as_mut() { - None => return Poll::Ready(None), - Some(inner) => inner, - }; + let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1056,15 +1098,18 @@ impl<T> FusedStream for Receiver<T> { impl<T> Stream for Receiver<T> { type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - // Try to read a message off of the message queue. + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<T>> { + // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { if msg.is_none() { self.inner = None; } Poll::Ready(msg) - } + }, Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1124,22 +1169,19 @@ impl<T> UnboundedReceiver<T> { /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. /// - /// This function returns: - /// * `Ok(Some(t))` when message is fetched - /// * `Ok(None)` when channel is closed and no messages left in the queue - /// * `Err(e)` when there are no messages available, but channel is not yet closed + /// This function will panic if called after `try_next` or `poll_next` has + /// returned `None`. pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => Ok(msg), + Poll::Ready(msg) => { + Ok(msg) + }, Poll::Pending => Err(TryRecvError { _priv: () }), } } fn next_message(&mut self) -> Poll<Option<T>> { - let inner = match self.inner.as_mut() { - None => return Poll::Ready(None), - Some(inner) => inner, - }; + let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1188,7 +1230,10 @@ impl<T> FusedStream for UnboundedReceiver<T> { impl<T> Stream for UnboundedReceiver<T> { type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<T>> { // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { @@ -1196,7 +1241,7 @@ impl<T> Stream for UnboundedReceiver<T> { self.inner = None; } Poll::Ready(msg) - } + }, Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1294,7 +1339,10 @@ impl State { */ fn decode_state(num: usize) -> State { - State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } + State { + is_open: num & OPEN_MASK == OPEN_MASK, + num_messages: num & MAX_CAPACITY, + } } fn encode_state(state: &State) -> usize { diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs index 57dc7f5..b00e1b1 100644 --- a/src/mpsc/queue.rs +++ b/src/mpsc/queue.rs @@ -43,10 +43,10 @@ pub(super) use self::PopResult::*; +use std::thread; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; -use std::thread; /// A result of the `pop` function. pub(super) enum PopResult<T> { @@ -76,12 +76,15 @@ pub(super) struct Queue<T> { tail: UnsafeCell<*mut Node<T>>, } -unsafe impl<T: Send> Send for Queue<T> {} -unsafe impl<T: Send> Sync for Queue<T> {} +unsafe impl<T: Send> Send for Queue<T> { } +unsafe impl<T: Send> Sync for Queue<T> { } impl<T> Node<T> { unsafe fn new(v: Option<T>) -> *mut Self { - Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v })) + Box::into_raw(Box::new(Self { + next: AtomicPtr::new(ptr::null_mut()), + value: v, + })) } } @@ -90,7 +93,10 @@ impl<T> Queue<T> { /// one consumer. pub(super) fn new() -> Self { let stub = unsafe { Node::new(None) }; - Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } + Self { + head: AtomicPtr::new(stub), + tail: UnsafeCell::new(stub), + } } /// Pushes a new value onto this queue. @@ -127,11 +133,7 @@ impl<T> Queue<T> { return Data(ret); } - if self.head.load(Ordering::Acquire) == tail { - Empty - } else { - Inconsistent - } + if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} } /// Pop an element similarly to `pop` function, but spin-wait on inconsistent diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs index 1be2016..4ce66b4 100644 --- a/src/mpsc/sink_impl.rs +++ b/src/mpsc/sink_impl.rs @@ -6,15 +6,24 @@ use std::pin::Pin; impl<T> Sink<T> for Sender<T> { type Error = SendError; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { (*self).poll_ready(cx) } - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + fn start_send( + mut self: Pin<&mut Self>, + msg: T, + ) -> Result<(), Self::Error> { (*self).start_send(msg) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { match (*self).poll_ready(cx) { Poll::Ready(Err(ref e)) if e.is_disconnected() => { // If the receiver disconnected, we consider the sink to be flushed. @@ -24,7 +33,10 @@ impl<T> Sink<T> for Sender<T> { } } - fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_close( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { self.disconnect(); Poll::Ready(Ok(())) } @@ -33,19 +45,31 @@ impl<T> Sink<T> for Sender<T> { impl<T> Sink<T> for UnboundedSender<T> { type Error = SendError; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { Self::poll_ready(&*self, cx) } - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + fn start_send( + mut self: Pin<&mut Self>, + msg: T, + ) -> Result<(), Self::Error> { Self::start_send(&mut *self, msg) } - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_close( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { self.disconnect(); Poll::Ready(Ok(())) } @@ -54,19 +78,29 @@ impl<T> Sink<T> for UnboundedSender<T> { impl<T> Sink<T> for &UnboundedSender<T> { type Error = SendError; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { UnboundedSender::poll_ready(*self, cx) } fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.unbounded_send(msg).map_err(TrySendError::into_send_error) + self.unbounded_send(msg) + .map_err(TrySendError::into_send_error) } - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_close( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { self.close_channel(); Poll::Ready(Ok(())) } diff --git a/src/oneshot.rs b/src/oneshot.rs index 5af651b..dbbce81 100644 --- a/src/oneshot.rs +++ b/src/oneshot.rs @@ -7,7 +7,7 @@ use core::fmt; use core::pin::Pin; use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering::SeqCst; -use futures_core::future::{FusedFuture, Future}; +use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll, Waker}; use crate::lock::Lock; @@ -16,6 +16,7 @@ use crate::lock::Lock; /// /// This is created by the [`channel`](channel) function. #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct Receiver<T> { inner: Arc<Inner<T>>, } @@ -23,6 +24,7 @@ pub struct Receiver<T> { /// A means of transmitting a single value to another task. /// /// This is created by the [`channel`](channel) function. +#[derive(Debug)] pub struct Sender<T> { inner: Arc<Inner<T>>, } @@ -33,6 +35,7 @@ impl<T> Unpin for Sender<T> {} /// Internal state of the `Receiver`/`Sender` pair above. This is all used as /// the internal synchronization between the two for send/recv operations. +#[derive(Debug)] struct Inner<T> { /// Indicates whether this oneshot is complete yet. This is filled in both /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it @@ -103,8 +106,12 @@ struct Inner<T> { /// ``` pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Arc::new(Inner::new()); - let receiver = Receiver { inner: inner.clone() }; - let sender = Sender { inner }; + let receiver = Receiver { + inner: inner.clone(), + }; + let sender = Sender { + inner, + }; (sender, receiver) } @@ -120,7 +127,7 @@ impl<T> Inner<T> { fn send(&self, t: T) -> Result<(), T> { if self.complete.load(SeqCst) { - return Err(t); + return Err(t) } // Note that this lock acquisition may fail if the receiver @@ -157,7 +164,7 @@ impl<T> Inner<T> { // destructor, but our destructor hasn't run yet so if it's set then the // oneshot is gone. if self.complete.load(SeqCst) { - return Poll::Ready(()); + return Poll::Ready(()) } // If our other half is not gone then we need to park our current task @@ -266,10 +273,7 @@ impl<T> Inner<T> { } else { let task = cx.waker().clone(); match self.rx_task.try_lock() { - Some(mut slot) => { - *slot = Some(task); - false - } + Some(mut slot) => { *slot = Some(task); false }, None => true, } }; @@ -390,12 +394,6 @@ impl<T> Drop for Sender<T> { } } -impl<T: fmt::Debug> fmt::Debug for Sender<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Sender").field("complete", &self.inner.complete).finish() - } -} - /// A future that resolves when the receiving end of a channel has hung up. /// /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). @@ -455,7 +453,10 @@ impl<T> Receiver<T> { impl<T> Future for Receiver<T> { type Output = Result<T, Canceled>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<T, Canceled>> { self.inner.recv(cx) } } @@ -480,9 +481,3 @@ impl<T> Drop for Receiver<T> { self.inner.drop_rx() } } - -impl<T: fmt::Debug> fmt::Debug for Receiver<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Receiver").field("complete", &self.inner.complete).finish() - } -} diff --git a/tests/channel.rs b/tests/channel.rs index 5f01a8e..73dac64 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -1,8 +1,8 @@ use futures::channel::mpsc; use futures::executor::block_on; use futures::future::poll_fn; -use futures::sink::SinkExt; use futures::stream::StreamExt; +use futures::sink::SinkExt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; @@ -11,7 +11,9 @@ fn sequence() { let (tx, rx) = mpsc::channel(1); let amt = 20; - let t = thread::spawn(move || block_on(send_sequence(amt, tx))); + let t = thread::spawn(move || { + block_on(send_sequence(amt, tx)) + }); let list: Vec<_> = block_on(rx.collect()); let mut list = list.into_iter(); for i in (1..=amt).rev() { @@ -32,7 +34,9 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) { fn drop_sender() { let (tx, mut rx) = mpsc::channel::<u32>(1); drop(tx); - let f = poll_fn(|cx| rx.poll_next_unpin(cx)); + let f = poll_fn(|cx| { + rx.poll_next_unpin(cx) + }); assert_eq!(block_on(f), None) } diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs index 1a14067..9eb5296 100644 --- a/tests/mpsc-close.rs +++ b/tests/mpsc-close.rs @@ -13,7 +13,9 @@ use std::time::{Duration, Instant}; fn smoke() { let (mut sender, receiver) = mpsc::channel(1); - let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {}); + let t = thread::spawn(move || { + while let Ok(()) = block_on(sender.send(42)) {} + }); // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join. block_on(receiver.take(3).for_each(|_| futures::future::ready(()))); @@ -147,7 +149,6 @@ fn single_receiver_drop_closes_channel_and_drains() { // Stress test that `try_send()`s occurring concurrently with receiver // close/drops don't appear as successful sends. -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_try_send_as_receiver_closes() { const AMT: usize = 10000; @@ -165,7 +166,7 @@ fn stress_try_send_as_receiver_closes() { struct TestRx { rx: mpsc::Receiver<Arc<()>>, // The number of times to query `rx` before dropping it. - poll_count: usize, + poll_count: usize } struct TestTask { command_rx: mpsc::Receiver<TestRx>, @@ -189,11 +190,14 @@ fn stress_try_send_as_receiver_closes() { impl Future for TestTask { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { // Poll the test channel, if one is present. if let Some(rx) = &mut self.test_rx { if let Poll::Ready(v) = rx.poll_next_unpin(cx) { - let _ = v.expect("test finished unexpectedly!"); + let _ = v.expect("test finished unexpectedly!"); } self.countdown -= 1; // Busy-poll until the countdown is finished. @@ -205,9 +209,9 @@ fn stress_try_send_as_receiver_closes() { self.test_rx = Some(rx); self.countdown = poll_count; cx.waker().wake_by_ref(); - } + }, Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => {} + Poll::Pending => {}, } if self.countdown == 0 { // Countdown complete -- drop the Receiver. @@ -251,14 +255,10 @@ fn stress_try_send_as_receiver_closes() { if prev_weak.upgrade().is_none() { break; } - assert!( - t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), + assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), "item not dropped on iteration {} after \ {} sends ({} successful). spin=({})", - i, - attempted_sends, - successful_sends, - spins + i, attempted_sends, successful_sends, spins ); spins += 1; thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); @@ -273,27 +273,6 @@ fn stress_try_send_as_receiver_closes() { } } drop(cmd_tx); - bg.join().expect("background thread join"); -} - -#[test] -fn unbounded_try_next_after_none() { - let (tx, mut rx) = mpsc::unbounded::<String>(); - // Drop the sender, close the channel. - drop(tx); - // Receive the end of channel. - assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); - // None received, check we can call `try_next` again. - assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); -} - -#[test] -fn bounded_try_next_after_none() { - let (tx, mut rx) = mpsc::channel::<String>(17); - // Drop the sender, close the channel. - drop(tx); - // Receive the end of channel. - assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); - // None received, check we can call `try_next` again. - assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + bg.join() + .expect("background thread join"); } diff --git a/tests/mpsc.rs b/tests/mpsc.rs index da0899d..61c5a50 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -1,13 +1,13 @@ use futures::channel::{mpsc, oneshot}; use futures::executor::{block_on, block_on_stream}; -use futures::future::{poll_fn, FutureExt}; -use futures::pin_mut; -use futures::sink::{Sink, SinkExt}; +use futures::future::{FutureExt, poll_fn}; use futures::stream::{Stream, StreamExt}; +use futures::sink::{Sink, SinkExt}; use futures::task::{Context, Poll}; +use futures::pin_mut; use futures_test::task::{new_count_waker, noop_context}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; trait AssertSend: Send {} @@ -77,7 +77,7 @@ fn send_shared_recv() { fn send_recv_threads() { let (mut tx, rx) = mpsc::channel::<i32>(16); - let t = thread::spawn(move || { + let t = thread::spawn(move|| { block_on(tx.send(1)).unwrap(); }); @@ -200,14 +200,11 @@ fn tx_close_gets_none() { #[test] fn stress_shared_unbounded() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::unbounded::<i32>(); - let t = thread::spawn(move || { + let t = thread::spawn(move|| { let result: Vec<_> = block_on(rx.collect()); assert_eq!(result.len(), (AMT * NTHREADS) as usize); for item in result { @@ -218,7 +215,7 @@ fn stress_shared_unbounded() { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move || { + thread::spawn(move|| { for _ in 0..AMT { tx.unbounded_send(1).unwrap(); } @@ -232,14 +229,11 @@ fn stress_shared_unbounded() { #[test] fn stress_shared_bounded_hard() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::channel::<i32>(0); - let t = thread::spawn(move || { + let t = thread::spawn(move|| { let result: Vec<_> = block_on(rx.collect()); assert_eq!(result.len(), (AMT * NTHREADS) as usize); for item in result { @@ -265,9 +259,6 @@ fn stress_shared_bounded_hard() { #[allow(clippy::same_item_push)] #[test] fn stress_receiver_multi_task_bounded_hard() { - #[cfg(miri)] - const AMT: usize = 100; - #[cfg(not(miri))] const AMT: usize = 10_000; const NTHREADS: u32 = 2; @@ -306,9 +297,9 @@ fn stress_receiver_multi_task_bounded_hard() { } Poll::Ready(None) => { *rx_opt = None; - break; - } - Poll::Pending => {} + break + }, + Poll::Pending => {}, } } } else { @@ -320,6 +311,7 @@ fn stress_receiver_multi_task_bounded_hard() { th.push(t); } + for i in 0..AMT { block_on(tx.send(i)).unwrap(); } @@ -336,12 +328,7 @@ fn stress_receiver_multi_task_bounded_hard() { /// after sender dropped. #[test] fn stress_drop_sender() { - #[cfg(miri)] - const ITER: usize = 100; - #[cfg(not(miri))] - const ITER: usize = 10000; - - fn list() -> impl Stream<Item = i32> { + fn list() -> impl Stream<Item=i32> { let (tx, rx) = mpsc::channel(1); thread::spawn(move || { block_on(send_one_two_three(tx)); @@ -349,7 +336,7 @@ fn stress_drop_sender() { rx } - for _ in 0..ITER { + for _ in 0..10000 { let v: Vec<_> = block_on(list().collect()); assert_eq!(v, vec![1, 2, 3]); } @@ -394,12 +381,9 @@ fn stress_close_receiver_iter() { } } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_close_receiver() { - const ITER: usize = 10000; - - for _ in 0..ITER { + for _ in 0..10000 { stress_close_receiver_iter(); } } @@ -414,9 +398,6 @@ async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) { #[allow(clippy::same_item_push)] #[test] fn stress_poll_ready() { - #[cfg(miri)] - const AMT: u32 = 100; - #[cfg(not(miri))] const AMT: u32 = 1000; const NTHREADS: u32 = 8; @@ -426,7 +407,9 @@ fn stress_poll_ready() { let mut threads = Vec::new(); for _ in 0..NTHREADS { let sender = tx.clone(); - threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT)))); + threads.push(thread::spawn(move || { + block_on(stress_poll_ready_sender(sender, AMT)) + })); } drop(tx); @@ -444,7 +427,6 @@ fn stress_poll_ready() { stress(16); } -#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn try_send_1() { const N: usize = 3000; @@ -454,7 +436,7 @@ fn try_send_1() { for i in 0..N { loop { if tx.try_send(i).is_ok() { - break; + break } } } @@ -560,8 +542,8 @@ fn is_connected_to() { #[test] fn hash_receiver() { - use std::collections::hash_map::DefaultHasher; use std::hash::Hasher; + use std::collections::hash_map::DefaultHasher; let mut hasher_a1 = DefaultHasher::new(); let mut hasher_a2 = DefaultHasher::new(); diff --git a/tests/oneshot.rs b/tests/oneshot.rs index c9f5508..a22d039 100644 --- a/tests/oneshot.rs +++ b/tests/oneshot.rs @@ -1,6 +1,6 @@ use futures::channel::oneshot::{self, Sender}; use futures::executor::block_on; -use futures::future::{poll_fn, FutureExt}; +use futures::future::{FutureExt, poll_fn}; use futures::task::{Context, Poll}; use futures_test::task::panic_waker_ref; use std::sync::mpsc; @@ -35,11 +35,6 @@ fn cancel_notifies() { #[test] fn cancel_lots() { - #[cfg(miri)] - const N: usize = 100; - #[cfg(not(miri))] - const N: usize = 20000; - let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); let t = thread::spawn(move || { for (mut tx, tx2) in rx { @@ -48,7 +43,7 @@ fn cancel_lots() { } }); - for _ in 0..N { + for _ in 0..20000 { let (otx, orx) = oneshot::channel::<u32>(); let (tx2, rx2) = mpsc::channel(); tx.send((otx, tx2)).unwrap(); @@ -75,7 +70,7 @@ fn close() { rx.close(); block_on(poll_fn(|cx| { match rx.poll_unpin(cx) { - Poll::Ready(Err(_)) => {} + Poll::Ready(Err(_)) => {}, _ => panic!(), }; assert!(tx.poll_canceled(cx).is_ready()); @@ -106,11 +101,6 @@ fn is_canceled() { #[test] fn cancel_sends() { - #[cfg(miri)] - const N: usize = 100; - #[cfg(not(miri))] - const N: usize = 20000; - let (tx, rx) = mpsc::channel::<Sender<_>>(); let t = thread::spawn(move || { for otx in rx { @@ -118,7 +108,7 @@ fn cancel_sends() { } }); - for _ in 0..N { + for _ in 0..20000 { let (otx, mut orx) = oneshot::channel::<u32>(); tx.send(otx).unwrap(); |