diff options
Diffstat (limited to 'src/bytes.rs')
-rw-r--r-- | src/bytes.rs | 106 |
1 files changed, 68 insertions, 38 deletions
diff --git a/src/bytes.rs b/src/bytes.rs index 79a09f3..b1b35ea 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -10,16 +10,23 @@ use crate::loom::sync::atomic::AtomicMut; use crate::loom::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use crate::Buf; -/// A reference counted contiguous slice of memory. +/// A cheaply cloneable and sliceable chunk of contiguous memory. /// /// `Bytes` is an efficient container for storing and operating on contiguous /// slices of memory. It is intended for use primarily in networking code, but /// could have applications elsewhere as well. /// /// `Bytes` values facilitate zero-copy network programming by allowing multiple -/// `Bytes` objects to point to the same underlying memory. This is managed by -/// using a reference count to track when the memory is no longer needed and can -/// be freed. +/// `Bytes` objects to point to the same underlying memory. +/// +/// `Bytes` does not have a single implementation. It is an interface, whose +/// exact behavior is implemented through dynamic dispatch in several underlying +/// implementations of `Bytes`. +/// +/// All `Bytes` implementations must fulfill the following requirements: +/// - They are cheaply cloneable and thereby shareable between an unlimited amount +/// of components, for example by modifying a reference count. +/// - Instances can be sliced to refer to a subset of the the original buffer. /// /// ``` /// use bytes::Bytes; @@ -41,17 +48,33 @@ use crate::Buf; /// to track information about which segment of the underlying memory the /// `Bytes` handle has access to. /// -/// `Bytes` keeps both a pointer to the shared `Arc` containing the full memory +/// `Bytes` keeps both a pointer to the shared state containing the full memory /// slice and a pointer to the start of the region visible by the handle. /// `Bytes` also tracks the length of its view into the memory. /// /// # Sharing /// -/// The memory itself is reference counted, and multiple `Bytes` objects may -/// point to the same region. Each `Bytes` handle point to different sections within -/// the memory region, and `Bytes` handle may or may not have overlapping views +/// `Bytes` contains a vtable, which allows implementations of `Bytes` to define +/// how sharing/cloneing is implemented in detail. +/// When `Bytes::clone()` is called, `Bytes` will call the vtable function for +/// cloning the backing storage in order to share it behind between multiple +/// `Bytes` instances. +/// +/// For `Bytes` implementations which refer to constant memory (e.g. created +/// via `Bytes::from_static()`) the cloning implementation will be a no-op. +/// +/// For `Bytes` implementations which point to a reference counted shared storage +/// (e.g. an `Arc<[u8]>`), sharing will be implemented by increasing the +/// the reference count. +/// +/// Due to this mechanism, multiple `Bytes` instances may point to the same +/// shared memory region. +/// Each `Bytes` instance can point to different sections within that +/// memory region, and `Bytes` instances may or may not have overlapping views /// into the memory. /// +/// The following diagram visualizes a scenario where 2 `Bytes` instances make +/// use of an `Arc`-based backing storage, and provide access to different views: /// /// ```text /// @@ -175,7 +198,7 @@ impl Bytes { self.len == 0 } - ///Creates `Bytes` instance from slice, by copying it. + /// Creates `Bytes` instance from slice, by copying it. pub fn copy_from_slice(data: &[u8]) -> Self { data.to_vec().into() } @@ -214,7 +237,7 @@ impl Bytes { }; let end = match range.end_bound() { - Bound::Included(&n) => n + 1, + Bound::Included(&n) => n.checked_add(1).expect("out of range"), Bound::Excluded(&n) => n, Bound::Unbounded => len, }; @@ -507,7 +530,7 @@ impl Buf for Bytes { } #[inline] - fn bytes(&self) -> &[u8] { + fn chunk(&self) -> &[u8] { self.as_slice() } @@ -525,8 +548,14 @@ impl Buf for Bytes { } } - fn to_bytes(&mut self) -> crate::Bytes { - core::mem::replace(self, Bytes::new()) + fn copy_to_bytes(&mut self, len: usize) -> crate::Bytes { + if len == self.remaining() { + core::mem::replace(self, Bytes::new()) + } else { + let ret = self.slice(..len); + self.advance(len); + ret + } } } @@ -777,8 +806,7 @@ impl From<Vec<u8>> for Bytes { let slice = vec.into_boxed_slice(); let len = slice.len(); - let ptr = slice.as_ptr(); - drop(Box::into_raw(slice)); + let ptr = Box::into_raw(slice) as *mut u8; if ptr as usize & 0x1 == 0 { let data = ptr as usize | KIND_VEC; @@ -994,33 +1022,35 @@ unsafe fn shallow_clone_vec( // `Release` is used synchronize with other threads that // will load the `arc` field. // - // If the `compare_and_swap` fails, then the thread lost the + // If the `compare_exchange` fails, then the thread lost the // race to promote the buffer to shared. The `Acquire` - // ordering will synchronize with the `compare_and_swap` + // ordering will synchronize with the `compare_exchange` // that happened in the other thread and the `Shared` // pointed to by `actual` will be visible. - let actual = atom.compare_and_swap(ptr as _, shared as _, Ordering::AcqRel); - - if actual as usize == ptr as usize { - // The upgrade was successful, the new handle can be - // returned. - return Bytes { - ptr: offset, - len, - data: AtomicPtr::new(shared as _), - vtable: &SHARED_VTABLE, - }; + match atom.compare_exchange(ptr as _, shared as _, Ordering::AcqRel, Ordering::Acquire) { + Ok(actual) => { + debug_assert!(actual as usize == ptr as usize); + // The upgrade was successful, the new handle can be + // returned. + Bytes { + ptr: offset, + len, + data: AtomicPtr::new(shared as _), + vtable: &SHARED_VTABLE, + } + } + Err(actual) => { + // The upgrade failed, a concurrent clone happened. Release + // the allocation that was made in this thread, it will not + // be needed. + let shared = Box::from_raw(shared); + mem::forget(*shared); + + // Buffer already promoted to shared storage, so increment ref + // count. + shallow_clone_arc(actual as _, offset, len) + } } - - // The upgrade failed, a concurrent clone happened. Release - // the allocation that was made in this thread, it will not - // be needed. - let shared = Box::from_raw(shared); - mem::forget(*shared); - - // Buffer already promoted to shared storage, so increment ref - // count. - shallow_clone_arc(actual as _, offset, len) } unsafe fn release_shared(ptr: *mut Shared) { |