aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Cody Schuffelen <schuffelen@google.com>2024-01-06 19:56:11 -0800
committercrosvm LUCI <crosvm-scoped@luci-project-accounts.iam.gserviceaccount.com>2024-04-12 02:10:07 +0000
commitf59edf616b8ff4e433415dc54e43c38cce2d4d77 (patch)
treee382f31cf8ef87538bdf9cc44a8652dc7b5f70c6
parent8407e842919e6de47b14580cbaceb3e9a2f6c8b5 (diff)
downloadcrosvm-f59edf616b8ff4e433415dc54e43c38cce2d4d77.tar.gz
cros_async: Add AsyncFd to Tokio linux implementation
This uses tokio's utilities to re-schedule IO that would fail with a "would block" error. The `IoSource` is chosen by the Executor based on whether the file descriptor can be successfully added to the internal epoll scheduler. Specifically, AsyncFd::async_io is used for system calls that can fail with EWOULDBLOCK, and spawn_blocking is used for system calls that will always block. Bug: b/320603688 Test: tools/presubmit Test: cargo build --features=cros_async/tokio --target=x86_64-unknown-linux-gnu Change-Id: Ibafbbe65d67cb88b5fcd0f18b00f504430c785c6 Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5174967 Reviewed-by: Frederick Mayle <fmayle@google.com> Commit-Queue: Cody Schuffelen <schuffelen@google.com> Reviewed-by: Noah Gold <nkgold@google.com>
-rw-r--r--cros_async/src/sys/linux/tokio_source.rs205
1 files changed, 161 insertions, 44 deletions
diff --git a/cros_async/src/sys/linux/tokio_source.rs b/cros_async/src/sys/linux/tokio_source.rs
index e6f4d330c..4a8471296 100644
--- a/cros_async/src/sys/linux/tokio_source.rs
+++ b/cros_async/src/sys/linux/tokio_source.rs
@@ -5,13 +5,16 @@
use std::io;
use std::os::fd::AsRawFd;
use std::os::fd::OwnedFd;
+use std::os::fd::RawFd;
use std::sync::Arc;
+use base::add_fd_flags;
use base::clone_descriptor;
use base::linux::fallocate;
use base::linux::FallocateMode;
use base::AsRawDescriptor;
use base::VolatileSlice;
+use tokio::io::unix::AsyncFd;
use crate::mem::MemRegion;
use crate::AsyncError;
@@ -30,8 +33,14 @@ pub enum Error {
Fsync(io::Error),
#[error("Failed to join task: '{0}'")]
Join(tokio::task::JoinError),
+ #[error("Cannot wait on file descriptor")]
+ NonWaitable,
#[error("Failed to read: '{0}'")]
Read(io::Error),
+ #[error("Failed to set nonblocking: '{0}'")]
+ SettingNonBlocking(base::Error),
+ #[error("Tokio Async FD error: '{0}'")]
+ TokioAsyncFd(io::Error),
#[error("Failed to write: '{0}'")]
Write(io::Error),
}
@@ -45,12 +54,29 @@ impl From<Error> for io::Error {
Fdatasync(e) => e,
Fsync(e) => e,
Join(e) => io::Error::new(io::ErrorKind::Other, e),
+ NonWaitable => io::Error::new(io::ErrorKind::Other, e),
Read(e) => e,
+ SettingNonBlocking(e) => e.into(),
+ TokioAsyncFd(e) => e,
Write(e) => e,
}
}
}
+enum FdType {
+ Async(AsyncFd<Arc<OwnedFd>>),
+ Blocking(Arc<OwnedFd>),
+}
+
+impl AsRawFd for FdType {
+ fn as_raw_fd(&self) -> RawFd {
+ match self {
+ FdType::Async(async_fd) => async_fd.as_raw_fd(),
+ FdType::Blocking(blocking) => blocking.as_raw_fd(),
+ }
+ }
+}
+
impl From<Error> for AsyncError {
fn from(e: Error) -> AsyncError {
AsyncError::SysVariants(e.into())
@@ -153,24 +179,44 @@ fn do_write_from_mem(
}
pub struct TokioSource<T> {
- fd: Arc<OwnedFd>,
+ fd: FdType,
inner: T,
runtime: tokio::runtime::Handle,
}
impl<T: AsRawDescriptor> TokioSource<T> {
pub fn new(inner: T, runtime: tokio::runtime::Handle) -> Result<TokioSource<T>, Error> {
+ let _guard = runtime.enter(); // Required for AsyncFd
let safe_fd = clone_descriptor(&inner).map_err(Error::DuplicatingFd)?;
- let fd = Arc::new(safe_fd.into());
+ let fd_arc: Arc<OwnedFd> = Arc::new(safe_fd.into());
+ let fd = match AsyncFd::new(fd_arc.clone()) {
+ Ok(async_fd) => {
+ add_fd_flags(async_fd.get_ref().as_raw_descriptor(), libc::O_NONBLOCK)
+ .map_err(Error::SettingNonBlocking)?;
+ FdType::Async(async_fd)
+ }
+ Err(e) if e.kind() == io::ErrorKind::PermissionDenied => FdType::Blocking(fd_arc),
+ Err(e) => return Err(Error::TokioAsyncFd(e)),
+ };
Ok(TokioSource { fd, inner, runtime })
}
+
pub fn as_source(&self) -> &T {
&self.inner
}
+
pub fn as_source_mut(&mut self) -> &mut T {
&mut self.inner
}
+
+ fn clone_fd(&self) -> Arc<OwnedFd> {
+ match &self.fd {
+ FdType::Async(async_fd) => async_fd.get_ref().clone(),
+ FdType::Blocking(blocking) => blocking.clone(),
+ }
+ }
+
pub async fn fdatasync(&self) -> AsyncResult<()> {
- let fd = self.fd.clone();
+ let fd = self.clone_fd();
Ok(self
.runtime
.spawn_blocking(move || do_fdatasync(fd))
@@ -178,8 +224,9 @@ impl<T: AsRawDescriptor> TokioSource<T> {
.map_err(Error::Join)?
.map_err(Error::Fdatasync)?)
}
+
pub async fn fsync(&self) -> AsyncResult<()> {
- let fd = self.fd.clone();
+ let fd = self.clone_fd();
Ok(self
.runtime
.spawn_blocking(move || do_fsync(fd))
@@ -187,25 +234,40 @@ impl<T: AsRawDescriptor> TokioSource<T> {
.map_err(Error::Join)?
.map_err(Error::Fsync)?)
}
+
pub fn into_source(self) -> T {
self.inner
}
+
pub async fn read_to_vec(
&self,
file_offset: Option<u64>,
mut vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
- let fd = self.fd.clone();
- Ok(self
- .runtime
- .spawn_blocking(move || {
- let size = do_read_to_vec(fd, file_offset, &mut vec)?;
- Ok((size, vec))
- })
- .await
- .map_err(Error::Join)?
- .map_err(Error::Read)?)
+ Ok(match &self.fd {
+ FdType::Async(async_fd) => {
+ let res = async_fd
+ .async_io(tokio::io::Interest::READABLE, |fd| {
+ do_read_to_vec(fd.clone(), file_offset, &mut vec)
+ })
+ .await
+ .map_err(AsyncError::Io)?;
+ (res, vec)
+ }
+ FdType::Blocking(blocking) => {
+ let fd = blocking.clone();
+ self.runtime
+ .spawn_blocking(move || {
+ let size = do_read_to_vec(fd, file_offset, &mut vec)?;
+ Ok((size, vec))
+ })
+ .await
+ .map_err(Error::Join)?
+ .map_err(Error::Read)?
+ }
+ })
}
+
pub async fn read_to_mem(
&self,
file_offset: Option<u64>,
@@ -213,22 +275,38 @@ impl<T: AsRawDescriptor> TokioSource<T> {
mem_offsets: impl IntoIterator<Item = MemRegion>,
) -> AsyncResult<usize> {
let mem_offsets_vec: Vec<MemRegion> = mem_offsets.into_iter().collect();
- let fd = self.fd.clone();
- Ok(self
- .runtime
- .spawn_blocking(move || {
+ Ok(match &self.fd {
+ FdType::Async(async_fd) => {
let iovecs = mem_offsets_vec
.into_iter()
.filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok())
.collect::<Vec<VolatileSlice>>();
- do_read_to_mem(fd, file_offset, &iovecs)
- })
- .await
- .map_err(Error::Join)?
- .map_err(Error::Read)?)
+ async_fd
+ .async_io(tokio::io::Interest::READABLE, |fd| {
+ do_read_to_mem(fd.clone(), file_offset, &iovecs)
+ })
+ .await
+ .map_err(AsyncError::Io)?
+ }
+ FdType::Blocking(blocking) => {
+ let fd = blocking.clone();
+ self.runtime
+ .spawn_blocking(move || {
+ let iovecs = mem_offsets_vec
+ .into_iter()
+ .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok())
+ .collect::<Vec<VolatileSlice>>();
+ do_read_to_mem(fd, file_offset, &iovecs)
+ })
+ .await
+ .map_err(Error::Join)?
+ .map_err(Error::Read)?
+ }
+ })
}
+
pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
- let fd = self.fd.clone();
+ let fd = self.clone_fd();
Ok(self
.runtime
.spawn_blocking(move || fallocate(&*fd, FallocateMode::PunchHole, file_offset, len))
@@ -236,9 +314,19 @@ impl<T: AsRawDescriptor> TokioSource<T> {
.map_err(Error::Join)?
.map_err(Error::Fallocate)?)
}
+
pub async fn wait_readable(&self) -> AsyncResult<()> {
+ match &self.fd {
+ FdType::Async(async_fd) => async_fd
+ .readable()
+ .await
+ .map_err(crate::AsyncError::Io)?
+ .retain_ready(),
+ FdType::Blocking(_) => return Err(Error::NonWaitable.into()),
+ }
Ok(())
}
+
pub async fn write_from_mem(
&self,
file_offset: Option<u64>,
@@ -246,38 +334,67 @@ impl<T: AsRawDescriptor> TokioSource<T> {
mem_offsets: impl IntoIterator<Item = MemRegion>,
) -> AsyncResult<usize> {
let mem_offsets_vec: Vec<MemRegion> = mem_offsets.into_iter().collect();
- let fd = self.fd.clone();
- Ok(self
- .runtime
- .spawn_blocking(move || {
+ Ok(match &self.fd {
+ FdType::Async(async_fd) => {
let iovecs = mem_offsets_vec
.into_iter()
.filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok())
.collect::<Vec<VolatileSlice>>();
- do_write_from_mem(fd, file_offset, &iovecs)
- })
- .await
- .map_err(Error::Join)?
- .map_err(Error::Write)?)
+ async_fd
+ .async_io(tokio::io::Interest::WRITABLE, |fd| {
+ do_write_from_mem(fd.clone(), file_offset, &iovecs)
+ })
+ .await
+ .map_err(AsyncError::Io)?
+ }
+ FdType::Blocking(blocking) => {
+ let fd = blocking.clone();
+ self.runtime
+ .spawn_blocking(move || {
+ let iovecs = mem_offsets_vec
+ .into_iter()
+ .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok())
+ .collect::<Vec<VolatileSlice>>();
+ do_write_from_mem(fd, file_offset, &iovecs.clone())
+ })
+ .await
+ .map_err(Error::Join)?
+ .map_err(Error::Read)?
+ }
+ })
}
+
pub async fn write_from_vec(
&self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
- let fd = self.fd.clone();
- Ok(self
- .runtime
- .spawn_blocking(move || {
- let size = do_write_from_vec(fd, file_offset, &vec)?;
- Ok((size, vec))
- })
- .await
- .map_err(Error::Join)?
- .map_err(Error::Write)?)
+ Ok(match &self.fd {
+ FdType::Async(async_fd) => {
+ let res = async_fd
+ .async_io(tokio::io::Interest::WRITABLE, |fd| {
+ do_write_from_vec(fd.clone(), file_offset, &vec)
+ })
+ .await
+ .map_err(AsyncError::Io)?;
+ (res, vec)
+ }
+ FdType::Blocking(blocking) => {
+ let fd = blocking.clone();
+ self.runtime
+ .spawn_blocking(move || {
+ let size = do_write_from_vec(fd.clone(), file_offset, &vec)?;
+ Ok((size, vec))
+ })
+ .await
+ .map_err(Error::Join)?
+ .map_err(Error::Read)?
+ }
+ })
}
+
pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
- let fd = self.fd.clone();
+ let fd = self.clone_fd();
Ok(self
.runtime
.spawn_blocking(move || fallocate(&*fd, FallocateMode::ZeroRange, file_offset, len))