1
0
Fork 0
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
12 KiB

// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::cell::RefCell;
use std::cmp::{max, min};
use std::fs::OpenOptions;
use std::rc::Rc;
use std::sync::{atomic::AtomicU64, atomic::Ordering, Arc};
use anyhow::{anyhow, bail, Context};
use futures::future::{AbortHandle, Abortable};
use getopts::Options;
use once_cell::sync::OnceCell;
use sync::Mutex;
use vmm_vhost::vhost_user::message::*;
use base::{error, iov_max, warn, Event, Timer};
use cros_async::{sync::Mutex as AsyncMutex, EventAsync, Executor, TimerAsync};
use data_model::DataInit;
use devices::virtio;
use devices::virtio::block::asynchronous::{flush_disk, process_one_chain};
use devices::virtio::block::*;
use devices::virtio::{base_features, copy_config, Queue};
use devices::ProtectionType;
use disk::{create_async_disk_file, ToAsyncDisk};
use vhost_user_devices::{CallEvent, DeviceRequestHandler, VhostUserBackend};
use vm_memory::GuestMemory;
static BLOCK_EXECUTOR: OnceCell<Executor> = OnceCell::new();
const QUEUE_SIZE: u16 = 256;
const NUM_QUEUES: u16 = 16;
struct BlockBackend {
disk_state: Rc<AsyncMutex<DiskState>>,
disk_size: Arc<AtomicU64>,
block_size: u32,
seg_max: u32,
avail_features: u64,
acked_features: u64,
acked_protocol_features: VhostUserProtocolFeatures,
flush_timer: Rc<RefCell<TimerAsync>>,
flush_timer_armed: Rc<RefCell<bool>>,
workers: [Option<AbortHandle>; Self::MAX_QUEUE_NUM],
}
impl BlockBackend {
pub fn new(
disk_image: Box<dyn ToAsyncDisk>,
base_features: u64,
read_only: bool,
sparse: bool,
block_size: u32,
id: Option<BlockId>,
) -> anyhow::Result<BlockBackend> {
if block_size % SECTOR_SIZE as u32 != 0 {
bail!(
"Block size {} is not a multiple of {}.",
block_size,
SECTOR_SIZE,
);
}
let disk_size = disk_image.get_len()?;
if disk_size % block_size as u64 != 0 {
warn!(
"Disk size {} is not a multiple of block size {}; \
the remainder will not be visible to the guest.",
disk_size, block_size,
);
}
let avail_features = build_avail_features(base_features, read_only, sparse, true)
| VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
let seg_max = min(max(iov_max(), 1), u32::max_value() as usize) as u32;
// Since we do not currently support indirect descriptors, the maximum
// number of segments must be smaller than the queue size.
// In addition, the request header and status each consume a descriptor.
let seg_max = min(seg_max, u32::from(QUEUE_SIZE) - 2);
// Safe because the executor is initialized in main() below.
let ex = BLOCK_EXECUTOR.get().expect("Executor not initialized");
let async_image = disk_image.to_async_disk(&ex)?;
let disk_size = Arc::new(AtomicU64::new(disk_size));
let disk_state = Rc::new(AsyncMutex::new(DiskState::new(
async_image,
Arc::clone(&disk_size),
read_only,
sparse,
id,
)));
let timer = Timer::new().context("Failed to create a timer")?;
let flush_timer_write = Rc::new(RefCell::new(
TimerAsync::new(
// Call try_clone() to share the same underlying FD with the `flush_disk` task.
timer.0.try_clone().context("Failed to clone flush_timer")?,
&ex,
)
.context("Failed to create an async timer")?,
));
// Create a separate TimerAsync with the same backing kernel timer. This allows the
// `flush_disk` task to borrow its copy waiting for events while the queue handlers can
// still borrow their copy momentarily to set timeouts.
// Call try_clone() to share the same underlying FD with the `flush_disk` task.
let flush_timer_read = timer
.0
.try_clone()
.context("Failed to clone flush_timer")
.and_then(|t| TimerAsync::new(t, &ex).context("Failed to create an async timer"))?;
let flush_timer_armed = Rc::new(RefCell::new(false));
ex.spawn_local(flush_disk(
Rc::clone(&disk_state),
flush_timer_read,
Rc::clone(&flush_timer_armed),
))
.detach();
Ok(BlockBackend {
disk_state,
disk_size,
block_size,
seg_max,
avail_features,
acked_features: 0,
acked_protocol_features: VhostUserProtocolFeatures::empty(),
flush_timer: flush_timer_write,
flush_timer_armed,
workers: Default::default(),
})
}
}
impl VhostUserBackend for BlockBackend {
const MAX_QUEUE_NUM: usize = NUM_QUEUES as usize;
const MAX_VRING_LEN: u16 = QUEUE_SIZE;
type Error = anyhow::Error;
fn features(&self) -> u64 {
self.avail_features
}
fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
let unrequested_features = value & !self.avail_features;
if unrequested_features != 0 {
bail!("invalid features are given: {:#x}", unrequested_features);
}
self.acked_features |= value;
Ok(())
}
fn acked_features(&self) -> u64 {
self.acked_features
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
VhostUserProtocolFeatures::CONFIG | VhostUserProtocolFeatures::MQ
}
fn ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()> {
let features = VhostUserProtocolFeatures::from_bits(features)
.ok_or_else(|| anyhow!("invalid protocol features are given: {:#x}", features))?;
let supported = self.protocol_features();
self.acked_protocol_features = features & supported;
Ok(())
}
fn acked_protocol_features(&self) -> u64 {
self.acked_protocol_features.bits()
}
fn read_config(&self, offset: u64, data: &mut [u8]) {
let config_space = {
let disk_size = self.disk_size.load(Ordering::Relaxed);
build_config_space(disk_size, self.seg_max, self.block_size, NUM_QUEUES)
};
copy_config(data, 0, config_space.as_slice(), offset);
}
fn reset(&mut self) {
panic!("Unsupported call to reset");
}
fn start_queue(
&mut self,
idx: usize,
mut queue: virtio::Queue,
mem: GuestMemory,
call_evt: Arc<Mutex<CallEvent>>,
kick_evt: Event,
) -> anyhow::Result<()> {
if let Some(handle) = self.workers.get_mut(idx).and_then(Option::take) {
warn!("Starting new queue handler without stopping old handler");
handle.abort();
}
// Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
queue.ack_features(self.acked_features);
// Safe because the executor is initialized in main() below.
let ex = BLOCK_EXECUTOR.get().expect("Executor not initialized");
let kick_evt =
EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?;
let (handle, registration) = AbortHandle::new_pair();
let disk_state = Rc::clone(&self.disk_state);
let timer = Rc::clone(&self.flush_timer);
let timer_armed = Rc::clone(&self.flush_timer_armed);
ex.spawn_local(Abortable::new(
handle_queue(
mem,
disk_state,
Rc::new(RefCell::new(queue)),
kick_evt,
call_evt,
timer,
timer_armed,
),
registration,
))
.detach();
self.workers[idx] = Some(handle);
Ok(())
}
fn stop_queue(&mut self, idx: usize) {
if let Some(handle) = self.workers.get_mut(idx).and_then(Option::take) {
handle.abort();
}
}
}
// There is one async task running `handle_queue` per virtio queue in use.
// Receives messages from the guest and queues a task to complete the operations with the async
// executor.
async fn handle_queue(
mem: GuestMemory,
disk_state: Rc<AsyncMutex<DiskState>>,
queue: Rc<RefCell<Queue>>,
evt: EventAsync,
interrupt: Arc<Mutex<CallEvent>>,
flush_timer: Rc<RefCell<TimerAsync>>,
flush_timer_armed: Rc<RefCell<bool>>,
) {
loop {
if let Err(e) = evt.next_val().await {
error!("Failed to read the next queue event: {}", e);
continue;
}
// Safe because the executor is initialized in main() below.
let ex = BLOCK_EXECUTOR.get().expect("Executor not initialized");
while let Some(descriptor_chain) = queue.borrow_mut().pop(&mem) {
let queue = Rc::clone(&queue);
let disk_state = Rc::clone(&disk_state);
let mem = mem.clone();
let interrupt = Arc::clone(&interrupt);
let flush_timer = Rc::clone(&flush_timer);
let flush_timer_armed = Rc::clone(&flush_timer_armed);
ex.spawn_local(async move {
process_one_chain(
queue,
descriptor_chain,
disk_state,
mem,
&interrupt,
flush_timer,
flush_timer_armed,
)
.await
})
.detach();
}
}
}
fn main() -> anyhow::Result<()> {
let mut opts = Options::new();
opts.optopt(
"",
"file",
"path and options of the disk file",
"PATH<:read-only>",
);
opts.optflag("h", "help", "print this help menu");
opts.optopt("", "socket", "path to a socket", "PATH");
let mut args = std::env::args();
let program_name = args.next().expect("empty args");
let matches = match opts.parse(args) {
Ok(m) => m,
Err(e) => {
println!("{}", e);
println!("{}", opts.short_usage(&program_name));
return Ok(());
}
};
if matches.opt_present("h") {
println!("{}", opts.usage(&program_name));
return Ok(());
}
if !matches.opt_present("file") {
println!("Must specify the file for the block device.");
println!("{}", opts.usage(&program_name));
return Ok(());
}
if !matches.opt_present("socket") {
println!("Must specify the socket for the vhost user device.");
println!("{}", opts.usage(&program_name));
return Ok(());
}
base::syslog::init().context("failed to initialize syslog")?;
let ex = Executor::new().context("failed to create executor")?;
// We can unwrap after `opt_str()` safely because they are required options.
let socket = matches.opt_str("socket").unwrap();
let filearg = matches.opt_str("file").unwrap();
let fileopts = filearg.split(':').collect::<Vec<&str>>();
let filename = fileopts.get(0).context("Must specify the filename")?;
let read_only = fileopts.contains(&"read-only");
let sparse = false;
let block_size = 512;
let f = OpenOptions::new()
.read(true)
.write(!read_only)
.create(false)
.open(filename)
.context("Failed to open disk file")?;
let async_file = create_async_disk_file(f).context("Failed to create async file")?;
let _ = BLOCK_EXECUTOR.set(ex.clone());
let base_features = base_features(ProtectionType::Unprotected);
let block = BlockBackend::new(
async_file,
base_features,
read_only,
sparse,
block_size,
None, // id: Option<BlockId>,
)?;
let handler = DeviceRequestHandler::new(block);
if let Err(e) = ex.run_until(handler.run(socket, &ex)) {
error!("error occurred: {}", e);
}
Ok(())
}