aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2023-02-16 08:11:46 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2023-02-16 08:11:46 +0000
commitba8e3c645e64f514e55e72fadec1c9d0ca77d099 (patch)
tree891d5f9cf99250a6597cbba176ae3a55c5ca5b1c
parent2803a603612e7ea859eb320fb0f643197183cf0e (diff)
parent65c1ccd9611a8cb9f823e84b55a3e097625c90aa (diff)
downloadfutures-util-ba8e3c645e64f514e55e72fadec1c9d0ca77d099.tar.gz
Upgrade futures-util to 0.3.26 am: 65c1ccd961
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/2438159 Change-Id: I9f892cc729ba21bc7866a4dec4497cf285dfaf07 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp4
-rw-r--r--Cargo.toml15
-rw-r--r--Cargo.toml.orig15
-rw-r--r--METADATA10
-rw-r--r--src/future/either.rs58
-rw-r--r--src/future/future/shared.rs42
-rw-r--r--src/future/select_all.rs1
-rw-r--r--src/future/try_join_all.rs14
-rw-r--r--src/sink/unfold.rs5
-rw-r--r--src/stream/futures_ordered.rs18
-rw-r--r--src/stream/futures_unordered/mod.rs3
-rw-r--r--src/stream/stream/buffered.rs12
-rw-r--r--src/stream/stream/chain.rs3
-rw-r--r--src/stream/stream/mod.rs3
-rw-r--r--src/stream/stream/ready_chunks.rs48
16 files changed, 161 insertions, 92 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index bd04ae0..dde4c7f 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "77d82198c5afd04af3e760a6aa50b7e875289fc3"
+ "sha1": "5e3693a350f96244151081d2c030208cd15f9572"
},
"path_in_vcs": "futures-util"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 8b06222..e3cad4a 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@ rust_test {
host_supported: true,
crate_name: "futures_util",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.25",
+ cargo_pkg_version: "0.3.26",
srcs: ["src/lib.rs"],
test_suites: ["general-tests"],
auto_gen_config: true,
@@ -86,7 +86,7 @@ rust_library {
host_supported: true,
crate_name: "futures_util",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.25",
+ cargo_pkg_version: "0.3.26",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/Cargo.toml b/Cargo.toml
index 907dbf1..47e9f55 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
edition = "2018"
rust-version = "1.45"
name = "futures-util"
-version = "0.3.25"
+version = "0.3.26"
description = """
Common utilities and extension traits for the futures-rs library.
"""
@@ -30,33 +30,33 @@ rustdoc-args = [
]
[dependencies.futures-channel]
-version = "0.3.25"
+version = "0.3.26"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.25"
+version = "0.3.26"
default-features = false
[dependencies.futures-io]
-version = "0.3.25"
+version = "0.3.26"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.25"
+version = "=0.3.26"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.25"
+version = "0.3.26"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.25"
+version = "0.3.26"
default-features = false
[dependencies.futures_01]
@@ -120,6 +120,7 @@ io-compat = [
"compat",
"tokio-io",
]
+portable-atomic = ["futures-core/portable-atomic"]
sink = ["futures-sink"]
std = [
"alloc",
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index aeecf0f..95c3dee 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,6 +1,6 @@
[package]
name = "futures-util"
-version = "0.3.25"
+version = "0.3.26"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
@@ -21,6 +21,7 @@ io-compat = ["io", "compat", "tokio-io"]
sink = ["futures-sink"]
io = ["std", "futures-io", "memchr"]
channel = ["std", "futures-channel"]
+portable-atomic = ["futures-core/portable-atomic"]
# Unstable features
# These features are outside of the normal semver guarantees and require the
@@ -34,12 +35,12 @@ write-all-vectored = ["io"]
cfg-target-has-atomic = []
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.25", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.25", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.25", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.25", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.25", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.25", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.26", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.26", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.26", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.26", default-features = false, optional = true }
slab = { version = "0.4.2", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
diff --git a/METADATA b/METADATA
index 99c8fb7..93b66be 100644
--- a/METADATA
+++ b/METADATA
@@ -11,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-util/futures-util-0.3.25.crate"
+ value: "https://static.crates.io/crates/futures-util/futures-util-0.3.26.crate"
}
- version: "0.3.25"
+ version: "0.3.26"
license_type: NOTICE
last_upgrade_date {
- year: 2022
- month: 12
- day: 12
+ year: 2023
+ month: 2
+ day: 15
}
}
diff --git a/src/future/either.rs b/src/future/either.rs
index 9602de7..27e5064 100644
--- a/src/future/either.rs
+++ b/src/future/either.rs
@@ -33,11 +33,31 @@ pub enum Either<A, B> {
}
impl<A, B> Either<A, B> {
- fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
+ /// Convert `Pin<&Either<A, B>>` to `Either<Pin<&A>, Pin<&B>>`,
+ /// pinned projections of the inner variants.
+ pub fn as_pin_ref(self: Pin<&Self>) -> Either<Pin<&A>, Pin<&B>> {
+ // SAFETY: We can use `new_unchecked` because the `inner` parts are
+ // guaranteed to be pinned, as they come from `self` which is pinned.
unsafe {
- match self.get_unchecked_mut() {
- Either::Left(a) => Either::Left(Pin::new_unchecked(a)),
- Either::Right(b) => Either::Right(Pin::new_unchecked(b)),
+ match *Pin::get_ref(self) {
+ Either::Left(ref inner) => Either::Left(Pin::new_unchecked(inner)),
+ Either::Right(ref inner) => Either::Right(Pin::new_unchecked(inner)),
+ }
+ }
+ }
+
+ /// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`,
+ /// pinned projections of the inner variants.
+ pub fn as_pin_mut(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
+ // SAFETY: `get_unchecked_mut` is fine because we don't move anything.
+ // We can use `new_unchecked` because the `inner` parts are guaranteed
+ // to be pinned, as they come from `self` which is pinned, and we never
+ // offer an unpinned `&mut A` or `&mut B` through `Pin<&mut Self>`. We
+ // also don't have an implementation of `Drop`, nor manual `Unpin`.
+ unsafe {
+ match *Pin::get_unchecked_mut(self) {
+ Either::Left(ref mut inner) => Either::Left(Pin::new_unchecked(inner)),
+ Either::Right(ref mut inner) => Either::Right(Pin::new_unchecked(inner)),
}
}
}
@@ -85,7 +105,7 @@ where
type Output = A::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll(cx),
Either::Right(x) => x.poll(cx),
}
@@ -113,7 +133,7 @@ where
type Item = A::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_next(cx),
Either::Right(x) => x.poll_next(cx),
}
@@ -149,28 +169,28 @@ where
type Error = A::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_ready(cx),
Either::Right(x) => x.poll_ready(cx),
}
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.start_send(item),
Either::Right(x) => x.start_send(item),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_flush(cx),
Either::Right(x) => x.poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_close(cx),
Either::Right(x) => x.poll_close(cx),
}
@@ -198,7 +218,7 @@ mod if_std {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_read(cx, buf),
Either::Right(x) => x.poll_read(cx, buf),
}
@@ -209,7 +229,7 @@ mod if_std {
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_read_vectored(cx, bufs),
Either::Right(x) => x.poll_read_vectored(cx, bufs),
}
@@ -226,7 +246,7 @@ mod if_std {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_write(cx, buf),
Either::Right(x) => x.poll_write(cx, buf),
}
@@ -237,21 +257,21 @@ mod if_std {
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_write_vectored(cx, bufs),
Either::Right(x) => x.poll_write_vectored(cx, bufs),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_flush(cx),
Either::Right(x) => x.poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_close(cx),
Either::Right(x) => x.poll_close(cx),
}
@@ -268,7 +288,7 @@ mod if_std {
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_seek(cx, pos),
Either::Right(x) => x.poll_seek(cx, pos),
}
@@ -281,14 +301,14 @@ mod if_std {
B: AsyncBufRead,
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_fill_buf(cx),
Either::Right(x) => x.poll_fill_buf(cx),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.consume(amt),
Either::Right(x) => x.consume(amt),
}
diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs
index 9859315..ecd1b42 100644
--- a/src/future/future/shared.rs
+++ b/src/future/future/shared.rs
@@ -4,7 +4,9 @@ use futures_core::task::{Context, Poll, Waker};
use slab::Slab;
use std::cell::UnsafeCell;
use std::fmt;
+use std::hash::Hasher;
use std::pin::Pin;
+use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use std::sync::{Arc, Mutex, Weak};
@@ -103,7 +105,6 @@ impl<Fut: Future> Shared<Fut> {
impl<Fut> Shared<Fut>
where
Fut: Future,
- Fut::Output: Clone,
{
/// Returns [`Some`] containing a reference to this [`Shared`]'s output if
/// it has already been computed by a clone or [`None`] if it hasn't been
@@ -139,6 +140,7 @@ where
/// This method by itself is safe, but using it correctly requires extra care. Another thread
/// can change the strong count at any time, including potentially between calling this method
/// and acting on the result.
+ #[allow(clippy::unnecessary_safety_doc)]
pub fn strong_count(&self) -> Option<usize> {
self.inner.as_ref().map(|arc| Arc::strong_count(arc))
}
@@ -152,15 +154,44 @@ where
/// This method by itself is safe, but using it correctly requires extra care. Another thread
/// can change the weak count at any time, including potentially between calling this method
/// and acting on the result.
+ #[allow(clippy::unnecessary_safety_doc)]
pub fn weak_count(&self) -> Option<usize> {
self.inner.as_ref().map(|arc| Arc::weak_count(arc))
}
+
+ /// Hashes the internal state of this `Shared` in a way that's compatible with `ptr_eq`.
+ pub fn ptr_hash<H: Hasher>(&self, state: &mut H) {
+ match self.inner.as_ref() {
+ Some(arc) => {
+ state.write_u8(1);
+ ptr::hash(Arc::as_ptr(arc), state);
+ }
+ None => {
+ state.write_u8(0);
+ }
+ }
+ }
+
+ /// Returns `true` if the two `Shared`s point to the same future (in a vein similar to
+ /// `Arc::ptr_eq`).
+ ///
+ /// Returns `false` if either `Shared` has terminated.
+ pub fn ptr_eq(&self, rhs: &Self) -> bool {
+ let lhs = match self.inner.as_ref() {
+ Some(lhs) => lhs,
+ None => return false,
+ };
+ let rhs = match rhs.inner.as_ref() {
+ Some(rhs) => rhs,
+ None => return false,
+ };
+ Arc::ptr_eq(lhs, rhs)
+ }
}
impl<Fut> Inner<Fut>
where
Fut: Future,
- Fut::Output: Clone,
{
/// Safety: callers must first ensure that `self.inner.state`
/// is `COMPLETE`
@@ -170,6 +201,13 @@ where
FutureOrOutput::Future(_) => unreachable!(),
}
}
+}
+
+impl<Fut> Inner<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
/// Registers the current task to receive a wakeup when we are awoken.
fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) {
let mut wakers_guard = self.notifier.wakers.lock().unwrap();
diff --git a/src/future/select_all.rs b/src/future/select_all.rs
index 07d65ca..0a51d0d 100644
--- a/src/future/select_all.rs
+++ b/src/future/select_all.rs
@@ -58,6 +58,7 @@ impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
});
match item {
Some((idx, res)) => {
+ #[allow(clippy::let_underscore_future)]
let _ = self.inner.swap_remove(idx);
let rest = mem::take(&mut self.inner);
Poll::Ready((res, idx, rest))
diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs
index 25fcfcb..506f450 100644
--- a/src/future/try_join_all.rs
+++ b/src/future/try_join_all.rs
@@ -77,6 +77,20 @@ where
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
+/// # See Also
+///
+/// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance
+/// reasons if the number of futures is large. You may want to look into using it or
+/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
+///
+/// Some examples for additional functionality provided by these are:
+///
+/// * Adding new futures to the set even after it has been started.
+///
+/// * Only polling the specific futures that have been woken. In cases where
+/// you have a lot of futures this will result in much more efficient polling.
+///
+///
/// # Examples
///
/// ```
diff --git a/src/sink/unfold.rs b/src/sink/unfold.rs
index 330a068..dea1307 100644
--- a/src/sink/unfold.rs
+++ b/src/sink/unfold.rs
@@ -73,7 +73,10 @@ where
this.state.set(UnfoldState::Value { value: state });
Ok(())
}
- Err(err) => Err(err),
+ Err(err) => {
+ this.state.set(UnfoldState::Empty);
+ Err(err)
+ }
}
} else {
Ok(())
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs
index f1c93fd..618bf1b 100644
--- a/src/stream/futures_ordered.rs
+++ b/src/stream/futures_ordered.rs
@@ -19,7 +19,7 @@ pin_project! {
struct OrderWrapper<T> {
#[pin]
data: T, // A future or a future's output
- index: usize,
+ index: isize,
}
}
@@ -58,7 +58,7 @@ where
/// An unbounded queue of futures.
///
-/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
+/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order
/// on top of the set of futures. While futures in the set will race to
/// completion in parallel, results will only be returned in the order their
/// originating futures were added to the queue.
@@ -95,8 +95,8 @@ where
pub struct FuturesOrdered<T: Future> {
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
- next_incoming_index: usize,
- next_outgoing_index: usize,
+ next_incoming_index: isize,
+ next_outgoing_index: isize,
}
impl<T: Future> Unpin for FuturesOrdered<T> {}
@@ -160,13 +160,9 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// task notifications. This future will be the next future to be returned
/// complete.
pub fn push_front(&mut self, future: Fut) {
- if self.next_outgoing_index == 0 {
- self.push_back(future)
- } else {
- let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
- self.next_outgoing_index -= 1;
- self.in_progress_queue.push(wrapped);
- }
+ let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
+ self.next_outgoing_index -= 1;
+ self.in_progress_queue.push(wrapped);
}
}
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 5e995fd..6b5804d 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -33,6 +33,9 @@ use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};
/// A set of futures which may complete in any order.
///
+/// See [`FuturesOrdered`](crate::stream::FuturesOrdered) for a version of this
+/// type that preserves a FIFO order.
+///
/// This structure is optimized to manage a large number of futures.
/// Futures managed by [`FuturesUnordered`] will only be polled when they
/// generate wake-up notifications. This reduces the required amount of work
diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs
index 8ca0391..5854eb7 100644
--- a/src/stream/stream/buffered.rs
+++ b/src/stream/stream/buffered.rs
@@ -1,4 +1,4 @@
-use crate::stream::{Fuse, FuturesOrdered, StreamExt};
+use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
@@ -95,6 +95,16 @@ where
}
}
+impl<St> FusedStream for Buffered<St>
+where
+ St: Stream,
+ St::Item: Future,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_done() && self.in_progress_queue.is_terminated()
+ }
+}
+
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Buffered<S>
diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs
index c5da35e..36ff1e5 100644
--- a/src/stream/stream/chain.rs
+++ b/src/stream/stream/chain.rs
@@ -50,8 +50,9 @@ where
if let Some(item) = ready!(first.poll_next(cx)) {
return Poll::Ready(Some(item));
}
+
+ this.first.set(None);
}
- this.first.set(None);
this.second.poll_next(cx)
}
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index a823fab..bb5e249 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -1513,8 +1513,7 @@ pub trait StreamExt: Stream {
/// be immediately returned.
///
/// If the underlying stream ended and only a partial vector was created,
- /// it'll be returned. Additionally if an error happens from the underlying
- /// stream then the currently buffered items will be yielded.
+ /// it will be returned.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs
index 49116d4..192054c 100644
--- a/src/stream/stream/ready_chunks.rs
+++ b/src/stream/stream/ready_chunks.rs
@@ -1,6 +1,5 @@
-use crate::stream::Fuse;
+use crate::stream::{Fuse, StreamExt};
use alloc::vec::Vec;
-use core::mem;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
@@ -15,7 +14,6 @@ pin_project! {
pub struct ReadyChunks<St: Stream> {
#[pin]
stream: Fuse<St>,
- items: Vec<St::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}
@@ -24,11 +22,7 @@ impl<St: Stream> ReadyChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
- Self {
- stream: super::Fuse::new(stream),
- items: Vec::with_capacity(capacity),
- cap: capacity,
- }
+ Self { stream: stream.fuse(), cap: capacity }
}
delegate_access_inner!(stream, St, (.));
@@ -40,40 +34,33 @@ impl<St: Stream> Stream for ReadyChunks<St> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
+ let mut items: Vec<St::Item> = Vec::new();
+
loop {
match this.stream.as_mut().poll_next(cx) {
// Flush all collected data if underlying stream doesn't contain
// more ready values
Poll::Pending => {
- return if this.items.is_empty() {
- Poll::Pending
- } else {
- Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
- }
+ return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) }
}
// Push the ready item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Poll::Ready(Some(item)) => {
- this.items.push(item);
- if this.items.len() >= *this.cap {
- return Poll::Ready(Some(mem::replace(
- this.items,
- Vec::with_capacity(*this.cap),
- )));
+ if items.is_empty() {
+ items.reserve(*this.cap);
+ }
+ items.push(item);
+ if items.len() >= *this.cap {
+ return Poll::Ready(Some(items));
}
}
// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
- let last = if this.items.is_empty() {
- None
- } else {
- let full_buf = mem::take(this.items);
- Some(full_buf)
- };
+ let last = if items.is_empty() { None } else { Some(items) };
return Poll::Ready(last);
}
@@ -82,20 +69,15 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
- let lower = (lower / self.cap).saturating_add(chunk_len);
- let upper = match upper {
- Some(x) => x.checked_add(chunk_len),
- None => None,
- };
+ let lower = lower / self.cap;
(lower, upper)
}
}
-impl<St: FusedStream> FusedStream for ReadyChunks<St> {
+impl<St: Stream> FusedStream for ReadyChunks<St> {
fn is_terminated(&self) -> bool {
- self.stream.is_terminated() && self.items.is_empty()
+ self.stream.is_terminated()
}
}