diff options
Diffstat (limited to 'src/txbuf.rs')
-rw-r--r-- | src/txbuf.rs | 233 |
1 files changed, 233 insertions, 0 deletions
diff --git a/src/txbuf.rs b/src/txbuf.rs new file mode 100644 index 0000000..ef718d7 --- /dev/null +++ b/src/txbuf.rs @@ -0,0 +1,233 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +use std::{io::Write, num::Wrapping}; + +use vm_memory::{bitmap::BitmapSlice, VolatileSlice}; + +use crate::vhu_vsock::{Error, Result}; + +#[derive(Debug)] +pub(crate) struct LocalTxBuf { + /// Buffer holding data to be forwarded to a host-side application + buf: Vec<u8>, + /// Index into buffer from which data can be consumed from the buffer + head: Wrapping<u32>, + /// Index into buffer from which data can be added to the buffer + tail: Wrapping<u32>, +} + +impl LocalTxBuf { + /// Create a new instance of LocalTxBuf. + pub fn new(buf_size: u32) -> Self { + Self { + buf: vec![0; buf_size as usize], + head: Wrapping(0), + tail: Wrapping(0), + } + } + + /// Get the buffer size + pub fn get_buf_size(&self) -> u32 { + self.buf.len() as u32 + } + + /// Check if the buf is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Add new data to the tx buffer, push all or none. + /// Returns LocalTxBufFull error if space not sufficient. + pub fn push<B: BitmapSlice>(&mut self, data_buf: &VolatileSlice<B>) -> Result<()> { + if self.get_buf_size() as usize - self.len() < data_buf.len() { + // Tx buffer is full + return Err(Error::LocalTxBufFull); + } + + // Get index into buffer at which data can be inserted + let tail_idx = self.tail.0 as usize % self.get_buf_size() as usize; + + // Check if we can fit the data buffer between head and end of buffer + let len = std::cmp::min(self.get_buf_size() as usize - tail_idx, data_buf.len()); + let txbuf = &mut self.buf[tail_idx..tail_idx + len]; + data_buf.copy_to(txbuf); + + // Check if there is more data to be wrapped around + if len < data_buf.len() { + let remain_txbuf = &mut self.buf[..(data_buf.len() - len)]; + data_buf.copy_to(remain_txbuf); + } + + // Increment tail by the amount of data that has been added to the buffer + self.tail += Wrapping(data_buf.len() as u32); + + Ok(()) + } + + /// Flush buf data to stream. + pub fn flush_to<S: Write>(&mut self, stream: &mut S) -> Result<usize> { + if self.is_empty() { + // No data to be flushed + return Ok(0); + } + + // Get index into buffer from which data can be read + let head_idx = self.head.0 as usize % self.get_buf_size() as usize; + + // First write from head to end of buffer + let len = std::cmp::min(self.get_buf_size() as usize - head_idx, self.len()); + let written = stream + .write(&self.buf[head_idx..(head_idx + len)]) + .map_err(Error::LocalTxBufFlush)?; + + // Increment head by amount of data that has been flushed to the stream + self.head += Wrapping(written as u32); + + // If written length is less than the expected length we can try again in the future + if written < len { + return Ok(written); + } + + // The head index has wrapped around the end of the buffer, we call self again + Ok(written + self.flush_to(stream).unwrap_or(0)) + } + + /// Return amount of data in the buffer. + fn len(&self) -> usize { + (self.tail - self.head).0 as usize + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + + #[test] + fn test_txbuf_len() { + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); + + // Zero length tx buf + assert_eq!(loc_tx_buf.len(), 0); + + // finite length tx buf + loc_tx_buf.head = Wrapping(0); + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE); + assert_eq!(loc_tx_buf.len(), CONN_TX_BUF_SIZE as usize); + + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE / 2); + assert_eq!(loc_tx_buf.len(), (CONN_TX_BUF_SIZE / 2) as usize); + + loc_tx_buf.head = Wrapping(256); + assert_eq!(loc_tx_buf.len(), 32512); + } + + #[test] + fn test_txbuf_is_empty() { + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); + + // empty tx buffer + assert!(loc_tx_buf.is_empty()); + + // non empty tx buffer + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE); + assert!(!loc_tx_buf.is_empty()); + } + + #[test] + fn test_txbuf_push() { + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); + let mut buf = [0; CONN_TX_BUF_SIZE as usize]; + // SAFETY: Safe as the buffer is guaranteed to be valid here. + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + + // push data into empty tx buffer + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + assert_eq!(loc_tx_buf.head, Wrapping(0)); + assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE)); + + // push data into full tx buffer + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_err()); + + // head and tail wrap at full + loc_tx_buf.head = Wrapping(CONN_TX_BUF_SIZE); + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE * 2)); + + // only tail wraps at full + let mut buf = vec![1; 4]; + // SAFETY: Safe as the buffer is guaranteed to be valid here. + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + let mut cmp_data = vec![1; 4]; + cmp_data.append(&mut vec![0; (CONN_TX_BUF_SIZE - 4) as usize]); + loc_tx_buf.head = Wrapping(4); + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE); + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + assert_eq!(loc_tx_buf.head, Wrapping(4)); + assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE + 4)); + assert_eq!(loc_tx_buf.buf, cmp_data); + } + + #[test] + fn test_txbuf_flush_to() { + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); + + // data to be flushed + let mut buf = vec![1; CONN_TX_BUF_SIZE as usize]; + // SAFETY: Safe as the buffer is guaranteed to be valid here. + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + + // target to which data is flushed + let mut cmp_vec = Vec::with_capacity(data.len()); + + // flush no data + let res_flush = loc_tx_buf.flush_to(&mut cmp_vec); + assert!(res_flush.is_ok()); + assert_eq!(res_flush.unwrap(), 0); + + // flush data of CONN_TX_BUF_SIZE amount + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + let res_flush = loc_tx_buf.flush_to(&mut cmp_vec); + if let Ok(n) = res_flush { + assert_eq!(loc_tx_buf.head, Wrapping(n as u32)); + assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE)); + assert_eq!(n, cmp_vec.len()); + assert_eq!(cmp_vec, buf[..n]); + } + + // wrapping head flush + let mut buf = vec![0; (CONN_TX_BUF_SIZE / 2) as usize]; + buf.append(&mut vec![1; (CONN_TX_BUF_SIZE / 2) as usize]); + // SAFETY: Safe as the buffer is guaranteed to be valid here. + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + + loc_tx_buf.head = Wrapping(0); + loc_tx_buf.tail = Wrapping(0); + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + cmp_vec.clear(); + loc_tx_buf.head = Wrapping(CONN_TX_BUF_SIZE / 2); + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2)); + let res_flush = loc_tx_buf.flush_to(&mut cmp_vec); + if let Ok(n) = res_flush { + assert_eq!( + loc_tx_buf.head, + Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2)) + ); + assert_eq!( + loc_tx_buf.tail, + Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2)) + ); + assert_eq!(n, cmp_vec.len()); + let mut data = vec![1; (CONN_TX_BUF_SIZE / 2) as usize]; + data.append(&mut vec![0; (CONN_TX_BUF_SIZE / 2) as usize]); + assert_eq!(cmp_vec, data[..n]); + } + } +} |