diff options
author | A. Cody Schuffelen <schuffelen@google.com> | 2024-01-06 19:56:11 -0800 |
---|---|---|
committer | crosvm LUCI <crosvm-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2024-04-12 02:10:07 +0000 |
commit | f59edf616b8ff4e433415dc54e43c38cce2d4d77 (patch) | |
tree | e382f31cf8ef87538bdf9cc44a8652dc7b5f70c6 | |
parent | 8407e842919e6de47b14580cbaceb3e9a2f6c8b5 (diff) | |
download | crosvm-f59edf616b8ff4e433415dc54e43c38cce2d4d77.tar.gz |
cros_async: Add AsyncFd to Tokio linux implementation
This uses tokio's utilities to re-schedule IO that would fail with a
"would block" error. The `IoSource` is chosen by the Executor based
on whether the file descriptor can be successfully added to the internal
epoll scheduler.
Specifically, AsyncFd::async_io is used for system calls that can fail
with EWOULDBLOCK, and spawn_blocking is used for system calls that will
always block.
Bug: b/320603688
Test: tools/presubmit
Test: cargo build --features=cros_async/tokio --target=x86_64-unknown-linux-gnu
Change-Id: Ibafbbe65d67cb88b5fcd0f18b00f504430c785c6
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5174967
Reviewed-by: Frederick Mayle <fmayle@google.com>
Commit-Queue: Cody Schuffelen <schuffelen@google.com>
Reviewed-by: Noah Gold <nkgold@google.com>
-rw-r--r-- | cros_async/src/sys/linux/tokio_source.rs | 205 |
1 files changed, 161 insertions, 44 deletions
diff --git a/cros_async/src/sys/linux/tokio_source.rs b/cros_async/src/sys/linux/tokio_source.rs index e6f4d330c..4a8471296 100644 --- a/cros_async/src/sys/linux/tokio_source.rs +++ b/cros_async/src/sys/linux/tokio_source.rs @@ -5,13 +5,16 @@ use std::io; use std::os::fd::AsRawFd; use std::os::fd::OwnedFd; +use std::os::fd::RawFd; use std::sync::Arc; +use base::add_fd_flags; use base::clone_descriptor; use base::linux::fallocate; use base::linux::FallocateMode; use base::AsRawDescriptor; use base::VolatileSlice; +use tokio::io::unix::AsyncFd; use crate::mem::MemRegion; use crate::AsyncError; @@ -30,8 +33,14 @@ pub enum Error { Fsync(io::Error), #[error("Failed to join task: '{0}'")] Join(tokio::task::JoinError), + #[error("Cannot wait on file descriptor")] + NonWaitable, #[error("Failed to read: '{0}'")] Read(io::Error), + #[error("Failed to set nonblocking: '{0}'")] + SettingNonBlocking(base::Error), + #[error("Tokio Async FD error: '{0}'")] + TokioAsyncFd(io::Error), #[error("Failed to write: '{0}'")] Write(io::Error), } @@ -45,12 +54,29 @@ impl From<Error> for io::Error { Fdatasync(e) => e, Fsync(e) => e, Join(e) => io::Error::new(io::ErrorKind::Other, e), + NonWaitable => io::Error::new(io::ErrorKind::Other, e), Read(e) => e, + SettingNonBlocking(e) => e.into(), + TokioAsyncFd(e) => e, Write(e) => e, } } } +enum FdType { + Async(AsyncFd<Arc<OwnedFd>>), + Blocking(Arc<OwnedFd>), +} + +impl AsRawFd for FdType { + fn as_raw_fd(&self) -> RawFd { + match self { + FdType::Async(async_fd) => async_fd.as_raw_fd(), + FdType::Blocking(blocking) => blocking.as_raw_fd(), + } + } +} + impl From<Error> for AsyncError { fn from(e: Error) -> AsyncError { AsyncError::SysVariants(e.into()) @@ -153,24 +179,44 @@ fn do_write_from_mem( } pub struct TokioSource<T> { - fd: Arc<OwnedFd>, + fd: FdType, inner: T, runtime: tokio::runtime::Handle, } impl<T: AsRawDescriptor> TokioSource<T> { pub fn new(inner: T, runtime: tokio::runtime::Handle) -> Result<TokioSource<T>, Error> { + let _guard = runtime.enter(); // Required for AsyncFd let safe_fd = clone_descriptor(&inner).map_err(Error::DuplicatingFd)?; - let fd = Arc::new(safe_fd.into()); + let fd_arc: Arc<OwnedFd> = Arc::new(safe_fd.into()); + let fd = match AsyncFd::new(fd_arc.clone()) { + Ok(async_fd) => { + add_fd_flags(async_fd.get_ref().as_raw_descriptor(), libc::O_NONBLOCK) + .map_err(Error::SettingNonBlocking)?; + FdType::Async(async_fd) + } + Err(e) if e.kind() == io::ErrorKind::PermissionDenied => FdType::Blocking(fd_arc), + Err(e) => return Err(Error::TokioAsyncFd(e)), + }; Ok(TokioSource { fd, inner, runtime }) } + pub fn as_source(&self) -> &T { &self.inner } + pub fn as_source_mut(&mut self) -> &mut T { &mut self.inner } + + fn clone_fd(&self) -> Arc<OwnedFd> { + match &self.fd { + FdType::Async(async_fd) => async_fd.get_ref().clone(), + FdType::Blocking(blocking) => blocking.clone(), + } + } + pub async fn fdatasync(&self) -> AsyncResult<()> { - let fd = self.fd.clone(); + let fd = self.clone_fd(); Ok(self .runtime .spawn_blocking(move || do_fdatasync(fd)) @@ -178,8 +224,9 @@ impl<T: AsRawDescriptor> TokioSource<T> { .map_err(Error::Join)? .map_err(Error::Fdatasync)?) } + pub async fn fsync(&self) -> AsyncResult<()> { - let fd = self.fd.clone(); + let fd = self.clone_fd(); Ok(self .runtime .spawn_blocking(move || do_fsync(fd)) @@ -187,25 +234,40 @@ impl<T: AsRawDescriptor> TokioSource<T> { .map_err(Error::Join)? .map_err(Error::Fsync)?) } + pub fn into_source(self) -> T { self.inner } + pub async fn read_to_vec( &self, file_offset: Option<u64>, mut vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)> { - let fd = self.fd.clone(); - Ok(self - .runtime - .spawn_blocking(move || { - let size = do_read_to_vec(fd, file_offset, &mut vec)?; - Ok((size, vec)) - }) - .await - .map_err(Error::Join)? - .map_err(Error::Read)?) + Ok(match &self.fd { + FdType::Async(async_fd) => { + let res = async_fd + .async_io(tokio::io::Interest::READABLE, |fd| { + do_read_to_vec(fd.clone(), file_offset, &mut vec) + }) + .await + .map_err(AsyncError::Io)?; + (res, vec) + } + FdType::Blocking(blocking) => { + let fd = blocking.clone(); + self.runtime + .spawn_blocking(move || { + let size = do_read_to_vec(fd, file_offset, &mut vec)?; + Ok((size, vec)) + }) + .await + .map_err(Error::Join)? + .map_err(Error::Read)? + } + }) } + pub async fn read_to_mem( &self, file_offset: Option<u64>, @@ -213,22 +275,38 @@ impl<T: AsRawDescriptor> TokioSource<T> { mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize> { let mem_offsets_vec: Vec<MemRegion> = mem_offsets.into_iter().collect(); - let fd = self.fd.clone(); - Ok(self - .runtime - .spawn_blocking(move || { + Ok(match &self.fd { + FdType::Async(async_fd) => { let iovecs = mem_offsets_vec .into_iter() .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok()) .collect::<Vec<VolatileSlice>>(); - do_read_to_mem(fd, file_offset, &iovecs) - }) - .await - .map_err(Error::Join)? - .map_err(Error::Read)?) + async_fd + .async_io(tokio::io::Interest::READABLE, |fd| { + do_read_to_mem(fd.clone(), file_offset, &iovecs) + }) + .await + .map_err(AsyncError::Io)? + } + FdType::Blocking(blocking) => { + let fd = blocking.clone(); + self.runtime + .spawn_blocking(move || { + let iovecs = mem_offsets_vec + .into_iter() + .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok()) + .collect::<Vec<VolatileSlice>>(); + do_read_to_mem(fd, file_offset, &iovecs) + }) + .await + .map_err(Error::Join)? + .map_err(Error::Read)? + } + }) } + pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> { - let fd = self.fd.clone(); + let fd = self.clone_fd(); Ok(self .runtime .spawn_blocking(move || fallocate(&*fd, FallocateMode::PunchHole, file_offset, len)) @@ -236,9 +314,19 @@ impl<T: AsRawDescriptor> TokioSource<T> { .map_err(Error::Join)? .map_err(Error::Fallocate)?) } + pub async fn wait_readable(&self) -> AsyncResult<()> { + match &self.fd { + FdType::Async(async_fd) => async_fd + .readable() + .await + .map_err(crate::AsyncError::Io)? + .retain_ready(), + FdType::Blocking(_) => return Err(Error::NonWaitable.into()), + } Ok(()) } + pub async fn write_from_mem( &self, file_offset: Option<u64>, @@ -246,38 +334,67 @@ impl<T: AsRawDescriptor> TokioSource<T> { mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize> { let mem_offsets_vec: Vec<MemRegion> = mem_offsets.into_iter().collect(); - let fd = self.fd.clone(); - Ok(self - .runtime - .spawn_blocking(move || { + Ok(match &self.fd { + FdType::Async(async_fd) => { let iovecs = mem_offsets_vec .into_iter() .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok()) .collect::<Vec<VolatileSlice>>(); - do_write_from_mem(fd, file_offset, &iovecs) - }) - .await - .map_err(Error::Join)? - .map_err(Error::Write)?) + async_fd + .async_io(tokio::io::Interest::WRITABLE, |fd| { + do_write_from_mem(fd.clone(), file_offset, &iovecs) + }) + .await + .map_err(AsyncError::Io)? + } + FdType::Blocking(blocking) => { + let fd = blocking.clone(); + self.runtime + .spawn_blocking(move || { + let iovecs = mem_offsets_vec + .into_iter() + .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok()) + .collect::<Vec<VolatileSlice>>(); + do_write_from_mem(fd, file_offset, &iovecs.clone()) + }) + .await + .map_err(Error::Join)? + .map_err(Error::Read)? + } + }) } + pub async fn write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)> { - let fd = self.fd.clone(); - Ok(self - .runtime - .spawn_blocking(move || { - let size = do_write_from_vec(fd, file_offset, &vec)?; - Ok((size, vec)) - }) - .await - .map_err(Error::Join)? - .map_err(Error::Write)?) + Ok(match &self.fd { + FdType::Async(async_fd) => { + let res = async_fd + .async_io(tokio::io::Interest::WRITABLE, |fd| { + do_write_from_vec(fd.clone(), file_offset, &vec) + }) + .await + .map_err(AsyncError::Io)?; + (res, vec) + } + FdType::Blocking(blocking) => { + let fd = blocking.clone(); + self.runtime + .spawn_blocking(move || { + let size = do_write_from_vec(fd.clone(), file_offset, &vec)?; + Ok((size, vec)) + }) + .await + .map_err(Error::Join)? + .map_err(Error::Read)? + } + }) } + pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> { - let fd = self.fd.clone(); + let fd = self.clone_fd(); Ok(self .runtime .spawn_blocking(move || fallocate(&*fd, FallocateMode::ZeroRange, file_offset, len)) |