1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/openeuler-stratovirt

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Это зеркальный репозиторий, синхронизируется ежедневно с исходного репозитория.
В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
loop_context.rs 24 КБ
Копировать Редактировать Исходные данные Просмотреть построчно История
ace-yan Отправлено 3 лет назад 76f1d18
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
// Copyright (c) 2020 Huawei Technologies Co.,Ltd. All rights reserved.
//
// StratoVirt is licensed under Mulan PSL v2.
// You can use this software according to the terms and conditions of the Mulan
// PSL v2.
// You may obtain a copy of Mulan PSL v2 at:
// http://license.coscl.org.cn/MulanPSL2
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
// KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
// NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PSL v2 for more details.
use std::collections::BTreeMap;
use std::os::unix::io::RawFd;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use libc::{c_void, read};
use log::{error, warn};
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use crate::errors::{ErrorKind, Result, ResultExt};
const READY_EVENT_MAX: usize = 256;
const AIO_PRFETCH_CYCLE_TIME: usize = 100;
#[derive(Debug)]
pub enum NotifierOperation {
/// Add a file descriptor to the event table, and bind a notifier to
/// it, when some event happened on it, notice the only one notifiers.
AddExclusion = 1,
/// Try to add a notifier to a file descriptor, when some event
/// also notice me, the file descriptor must be read.
AddShared = 2,
/// Change the settings associated with a file descriptor.
Modify = 4,
/// Delete a file descriptor from the event table, if has one more notifiers,
/// file descriptor not closed.
Delete = 8,
/// Park a file descriptor from the event table
Park = 16,
/// Resume a file descriptor from the event table
Resume = 32,
}
enum EventStatus {
/// Event is currently monitored in epoll.
Alive = 0,
/// Event is parked, temporarily not monitored.
Parked = 1,
/// Event is removed.
Removed = 2,
}
pub type NotifierCallback = dyn Fn(EventSet, RawFd) -> Option<Vec<EventNotifier>>;
/// Epoll Event Notifier Entry.
pub struct EventNotifier {
/// Raw file descriptor
pub raw_fd: i32,
/// Notifier operation
pub op: NotifierOperation,
/// Parked fd, temporarily removed from epoll
pub parked_fd: Option<i32>,
/// The types of events for which we use this fd
pub event: EventSet,
/// Event Handler List, one fd event may have many handlers
pub handlers: Vec<Arc<Mutex<Box<NotifierCallback>>>>,
/// Event status
status: EventStatus,
/// The flag representing whether pre polling is required
pub io_poll: bool,
}
impl EventNotifier {
/// Constructs a new `EventNotifier`.
pub fn new(
op: NotifierOperation,
raw_fd: i32,
parked_fd: Option<i32>,
event: EventSet,
handlers: Vec<Arc<Mutex<Box<NotifierCallback>>>>,
) -> Self {
EventNotifier {
raw_fd,
op,
parked_fd,
event,
handlers,
status: EventStatus::Alive,
io_poll: false,
}
}
}
/// `EventNotifier` Factory
///
/// When an object have some `EventNotifier` wants
/// to add to event loop, the object need to implement
/// `InternalNotifiers` trait, so `EventLoop` would be
/// easy to get notifiers, and add to epoll context.
pub trait EventNotifierHelper {
fn internal_notifiers(_: Arc<Mutex<Self>>) -> Vec<EventNotifier>;
}
/// EventLoop manager, advise continue running or stop running
pub trait EventLoopManager: Send + Sync {
fn loop_should_exit(&self) -> bool;
fn loop_cleanup(&self) -> Result<()>;
}
/// Timer structure is used for delay function execution.
struct Timer {
/// Given the function that will be called.
func: Box<dyn Fn()>,
/// Given the real time when the `func` will be called.
expire_time: Instant,
}
impl Timer {
/// Construct function
///
/// # Arguments
///
/// * `func` - the function will be called later.
/// * `nsec` - delay time in nanosecond.
pub fn new(func: Box<dyn Fn()>, nsec: u64) -> Self {
Timer {
func,
expire_time: Instant::now() + Duration::new(0, nsec as u32),
}
}
}
/// Epoll Loop Context
#[allow(clippy::vec_box)]
pub struct EventLoopContext {
/// Epoll file descriptor.
epoll: Epoll,
/// Control epoll loop running.
manager: Option<Arc<Mutex<dyn EventLoopManager>>>,
/// Fds registered to the `EventLoop`.
events: Arc<RwLock<BTreeMap<RawFd, Box<EventNotifier>>>>,
/// Events abandoned are stored in garbage collector.
gc: Arc<RwLock<Vec<Box<EventNotifier>>>>,
/// Temp events vector, store wait returned events.
ready_events: Vec<EpollEvent>,
/// Timer list
timers: Vec<Timer>,
}
unsafe impl Sync for EventLoopContext {}
unsafe impl Send for EventLoopContext {}
impl EventLoopContext {
/// Constructs a new `EventLoopContext`.
pub fn new() -> Self {
EventLoopContext {
epoll: Epoll::new().unwrap(),
manager: None,
events: Arc::new(RwLock::new(BTreeMap::new())),
gc: Arc::new(RwLock::new(Vec::new())),
ready_events: vec![EpollEvent::default(); READY_EVENT_MAX],
timers: Vec::new(),
}
}
pub fn set_manager(&mut self, manager: Arc<Mutex<dyn EventLoopManager>>) {
self.manager = Some(manager);
}
fn clear_gc(&mut self) {
let mut gc = self.gc.write().unwrap();
gc.clear();
}
fn add_event(&mut self, event: EventNotifier) -> Result<()> {
// If there is one same alive event monitored, update the handlers.
// If there is one same parked event, update the handlers but warn.
// If there is no event in the map, insert the event and park the related.
let mut events_map = self.events.write().unwrap();
if let Some(notifier) = events_map.get_mut(&event.raw_fd) {
if let NotifierOperation::AddExclusion = event.op {
return Err(ErrorKind::BadNotifierOperation.into());
}
let mut event = event;
notifier.handlers.append(&mut event.handlers);
if let EventStatus::Parked = notifier.status {
warn!("Parked event updated!");
}
return Ok(());
}
let raw_fd = event.raw_fd;
events_map.insert(raw_fd, Box::new(event));
let event = events_map.get(&raw_fd).unwrap();
self.epoll.ctl(
ControlOperation::Add,
event.raw_fd,
EpollEvent::new(event.event, &**event as *const _ as u64),
)?;
if let Some(parked_fd) = event.parked_fd {
if let Some(parked) = events_map.get_mut(&parked_fd) {
self.epoll
.ctl(ControlOperation::Delete, parked_fd, EpollEvent::default())?;
parked.status = EventStatus::Parked;
} else {
return Err(ErrorKind::NoParkedFd(parked_fd).into());
}
}
Ok(())
}
fn rm_event(&mut self, event: &EventNotifier) -> Result<()> {
// If there is one same parked event, return Ok.
// If there is no event in the map, return Error.
// If there is one same alive event monitored, put the event in gc and reactivate the parked event.
let mut events_map = self.events.write().unwrap();
match events_map.get_mut(&event.raw_fd) {
Some(notifier) => {
if let EventStatus::Alive = notifier.status {
// No need to delete fd if status is Parked, it's done in park_event.
if let Err(error) = self.epoll.ctl(
ControlOperation::Delete,
notifier.raw_fd,
EpollEvent::default(),
) {
let error_num = error.raw_os_error().unwrap();
if error_num != libc::EBADF && error_num != libc::ENOENT {
return Err(ErrorKind::BadSyscall(error).into());
}
}
}
notifier.status = EventStatus::Removed;
if let Some(parked_fd) = notifier.parked_fd {
if let Some(parked) = events_map.get_mut(&parked_fd) {
self.epoll.ctl(
ControlOperation::Add,
parked_fd,
EpollEvent::new(parked.event, &**parked as *const _ as u64),
)?;
parked.status = EventStatus::Alive;
} else {
return Err(ErrorKind::NoParkedFd(parked_fd).into());
}
}
let event = events_map.remove(&event.raw_fd).unwrap();
self.gc.write().unwrap().push(event);
}
_ => {
return Err(ErrorKind::NoRegisterFd(event.raw_fd).into());
}
}
Ok(())
}
fn park_event(&mut self, event: &EventNotifier) -> Result<()> {
let mut events_map = self.events.write().unwrap();
match events_map.get_mut(&event.raw_fd) {
Some(notifier) => {
self.epoll
.ctl(
ControlOperation::Delete,
notifier.raw_fd,
EpollEvent::default(),
)
.chain_err(|| format!("Failed to park event, event fd:{}", notifier.raw_fd))?;
notifier.status = EventStatus::Parked;
}
_ => {
return Err(ErrorKind::NoRegisterFd(event.raw_fd).into());
}
}
Ok(())
}
fn resume_event(&mut self, event: &EventNotifier) -> Result<()> {
let mut events_map = self.events.write().unwrap();
match events_map.get_mut(&event.raw_fd) {
Some(notifier) => {
self.epoll
.ctl(
ControlOperation::Add,
notifier.raw_fd,
EpollEvent::new(notifier.event, &**notifier as *const _ as u64),
)
.chain_err(|| {
format!("Failed to resume event, event fd: {}", notifier.raw_fd)
})?;
notifier.status = EventStatus::Alive;
}
_ => {
return Err(ErrorKind::NoRegisterFd(event.raw_fd).into());
}
}
Ok(())
}
/// update fds registered to `EventLoop` according to the operation type.
///
/// # Arguments
///
/// * `notifiers` - event notifiers wanted to add to or remove from `EventLoop`.
pub fn update_events(&mut self, notifiers: Vec<EventNotifier>) -> Result<()> {
for en in notifiers {
match en.op {
NotifierOperation::AddExclusion | NotifierOperation::AddShared => {
self.add_event(en)?;
}
NotifierOperation::Delete => {
self.rm_event(&en)?;
}
NotifierOperation::Park => {
self.park_event(&en)?;
}
NotifierOperation::Resume => {
self.resume_event(&en)?;
}
_ => {
return Err(ErrorKind::UnExpectedOperationType.into());
}
}
}
Ok(())
}
/// Executes `epoll.wait()` to wait for events, and call the responding callbacks.
pub fn run(&mut self) -> Result<bool> {
if let Some(manager) = &self.manager {
if manager.lock().unwrap().loop_should_exit() {
manager.lock().unwrap().loop_cleanup()?;
return Ok(false);
}
}
self.epoll_wait_manager(self.timers_min_timeout())
}
pub fn iothread_run(&mut self) -> Result<bool> {
if let Some(manager) = &self.manager {
if manager.lock().unwrap().loop_should_exit() {
manager.lock().unwrap().loop_cleanup()?;
return Ok(false);
}
}
let timeout = self.timers_min_timeout();
if timeout == -1 {
for _i in 0..AIO_PRFETCH_CYCLE_TIME {
for (_fd, notifer) in self.events.read().unwrap().iter() {
if notifer.io_poll {
if let EventStatus::Alive = notifer.status {
let handle = notifer.handlers[1].lock().unwrap();
match handle(self.ready_events[1].event_set(), notifer.raw_fd) {
None => {}
Some(_) => {
break;
}
}
}
}
}
}
}
self.epoll_wait_manager(timeout)
}
/// Call the function given by `func` after `nsec` nanoseconds.
///
/// # Arguments
///
/// * `func` - the function will be called later.
/// * `nsec` - delay time in nanoseconds.
pub fn delay_call(&mut self, func: Box<dyn Fn()>, nsec: u64) {
let timer = Timer::new(func, nsec);
// insert in order of expire_time
let mut index = self.timers.len();
for (i, t) in self.timers.iter().enumerate() {
if timer.expire_time < t.expire_time {
index = i;
break;
}
}
self.timers.insert(index, timer);
}
/// Get the expire_time of the soonest Timer, and then translate it to timeout.
fn timers_min_timeout(&self) -> i32 {
if self.timers.is_empty() {
return -1;
}
let now = Instant::now();
if self.timers[0].expire_time <= now {
return 0;
}
let timeout = (self.timers[0].expire_time - now).as_millis();
if timeout >= i32::MAX as u128 {
i32::MAX - 1
} else {
timeout as i32
}
}
/// Call function of the timers which have already expired.
fn run_timers(&mut self) {
let now = Instant::now();
let mut expired_nr = 0;
for timer in &self.timers {
if timer.expire_time > now {
break;
}
expired_nr += 1;
(timer.func)();
}
self.timers.drain(0..expired_nr);
}
fn epoll_wait_manager(&mut self, time_out: i32) -> Result<bool> {
let ev_count = match self
.epoll
.wait(READY_EVENT_MAX, time_out, &mut self.ready_events[..])
{
Ok(ev_count) => ev_count,
Err(e) if e.raw_os_error() == Some(libc::EINTR) => 0,
Err(e) => return Err(ErrorKind::EpollWait(e).into()),
};
for i in 0..ev_count {
// It`s safe because elements in self.events_map never get released in other functions
let event = unsafe {
let event_ptr = self.ready_events[i].data() as *const EventNotifier;
&*event_ptr as &EventNotifier
};
if let EventStatus::Alive = event.status {
let mut notifiers = Vec::new();
for i in 0..event.handlers.len() {
let handle = event.handlers[i].lock().unwrap();
match handle(self.ready_events[i].event_set(), event.raw_fd) {
None => {}
Some(mut notifier) => {
notifiers.append(&mut notifier);
}
}
}
self.update_events(notifiers)?;
}
}
self.run_timers();
self.clear_gc();
Ok(true)
}
}
impl Default for EventLoopContext {
fn default() -> Self {
Self::new()
}
}
pub fn read_fd(fd: RawFd) -> u64 {
let mut value: u64 = 0;
let ret = unsafe {
read(
fd,
&mut value as *mut u64 as *mut c_void,
std::mem::size_of::<u64>(),
)
};
if ret == -1 {
error!("Failed to read fd");
}
value
}
#[cfg(test)]
mod test {
use super::*;
use libc::*;
use std::os::unix::io::{AsRawFd, RawFd};
use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
impl EventLoopContext {
fn check_existence(&self, fd: RawFd) -> Option<bool> {
let events_map = self.events.read().unwrap();
match events_map.get(&fd) {
None => {
return None;
}
Some(notifier) => {
if let EventStatus::Alive = notifier.status {
Some(true)
} else {
Some(false)
}
}
}
}
fn create_event(&mut self) -> i32 {
let fd = EventFd::new(EFD_NONBLOCK).unwrap();
let result = fd.as_raw_fd();
let event = EventNotifier::new(
NotifierOperation::AddShared,
fd.as_raw_fd(),
None,
EventSet::OUT,
Vec::new(),
);
self.update_events(vec![event]).unwrap();
result
}
}
fn generate_handler(related_fd: i32) -> Box<NotifierCallback> {
Box::new(move |_, _| {
let mut notifiers = Vec::new();
let event = EventNotifier::new(
NotifierOperation::AddShared,
related_fd,
None,
EventSet::IN,
Vec::new(),
);
notifiers.push(event);
Some(notifiers)
})
}
#[test]
fn basic_test() {
let mut mainloop = EventLoopContext::new();
let mut notifiers = Vec::new();
let fd1 = EventFd::new(EFD_NONBLOCK).unwrap();
let fd1_related = EventFd::new(EFD_NONBLOCK).unwrap();
let handler1 = generate_handler(fd1_related.as_raw_fd());
let mut handlers = Vec::new();
handlers.push(Arc::new(Mutex::new(handler1)));
let event1 = EventNotifier::new(
NotifierOperation::AddShared,
fd1.as_raw_fd(),
None,
EventSet::OUT,
handlers.clone(),
);
notifiers.push(event1);
mainloop.update_events(notifiers).unwrap();
mainloop.run().unwrap();
// Event1 is OUT event, so its handler would be executed immediately.
// Event1's handler is to add a fd1_related event, thus checking fd1 and fd1_relate would
// make a basic function test.
assert!(mainloop.check_existence(fd1.as_raw_fd()).unwrap());
assert!(mainloop.check_existence(fd1_related.as_raw_fd()).unwrap());
}
#[test]
fn parked_event_test() {
let mut mainloop = EventLoopContext::new();
let mut notifiers = Vec::new();
let fd1 = EventFd::new(EFD_NONBLOCK).unwrap();
let fd2 = EventFd::new(EFD_NONBLOCK).unwrap();
let event1 = EventNotifier::new(
NotifierOperation::AddShared,
fd1.as_raw_fd(),
None,
EventSet::OUT,
Vec::new(),
);
let event2 = EventNotifier::new(
NotifierOperation::AddShared,
fd2.as_raw_fd(),
Some(fd1.as_raw_fd()),
EventSet::OUT,
Vec::new(),
);
notifiers.push(event1);
notifiers.push(event2);
mainloop.update_events(notifiers).unwrap();
mainloop.run().unwrap();
// For the reason that event1 is the parked event of event2, when event2 added, event1 would
// be set to parked.
assert!(!mainloop.check_existence(fd1.as_raw_fd()).unwrap());
assert!(mainloop.check_existence(fd2.as_raw_fd()).unwrap());
let event2_remove = EventNotifier::new(
NotifierOperation::Delete,
fd2.as_raw_fd(),
Some(fd1.as_raw_fd()),
EventSet::OUT,
Vec::new(),
);
mainloop.update_events(vec![event2_remove]).unwrap();
// Then we remove event2, event1 will be re-activated and event2 will be deleted (removed
// from events_map to gc).
assert!(mainloop.check_existence(fd1.as_raw_fd()).unwrap());
assert!(mainloop.check_existence(fd2.as_raw_fd()).is_none());
}
#[test]
fn event_handler_test() {
let mut mainloop = EventLoopContext::new();
let mut notifiers = Vec::new();
let fd1 = EventFd::new(EFD_NONBLOCK).unwrap();
let fd1_related = EventFd::new(EFD_NONBLOCK).unwrap();
let fd1_related_update = EventFd::new(EFD_NONBLOCK).unwrap();
let handler1 = generate_handler(fd1_related.as_raw_fd());
let handler1_update = generate_handler(fd1_related_update.as_raw_fd());
let event1 = EventNotifier::new(
NotifierOperation::AddShared,
fd1.as_raw_fd(),
None,
EventSet::OUT,
vec![Arc::new(Mutex::new(handler1))],
);
let event1_update = EventNotifier::new(
NotifierOperation::AddShared,
fd1.as_raw_fd(),
None,
EventSet::OUT,
vec![Arc::new(Mutex::new(handler1_update))],
);
notifiers.push(event1);
notifiers.push(event1_update);
mainloop.update_events(notifiers).unwrap();
mainloop.run().unwrap();
// Firstly, event1 with handler1 would be added. Then, event1's handlers would append
// handler1_update, which would register fd1_related_update in mainloop.
assert!(mainloop.check_existence(fd1_related.as_raw_fd()).unwrap());
assert!(mainloop
.check_existence(fd1_related_update.as_raw_fd())
.unwrap());
}
#[test]
fn error_operation_test() {
let mut mainloop = EventLoopContext::new();
let fd1 = EventFd::new(EFD_NONBLOCK).unwrap();
let leisure_fd = EventFd::new(EFD_NONBLOCK).unwrap();
// Delete unexist event
let event1 = EventNotifier::new(
NotifierOperation::Delete,
fd1.as_raw_fd(),
None,
EventSet::OUT,
Vec::new(),
);
assert!(mainloop.update_events(vec![event1]).is_err());
// Add event with unexist parked event
let event1 = EventNotifier::new(
NotifierOperation::AddShared,
fd1.as_raw_fd(),
Some(leisure_fd.as_raw_fd()),
EventSet::OUT,
Vec::new(),
);
assert!(mainloop.update_events(vec![event1]).is_err());
// Delete event with unexist parked event
let event1_delete = EventNotifier::new(
NotifierOperation::Delete,
fd1.as_raw_fd(),
Some(leisure_fd.as_raw_fd()),
EventSet::OUT,
Vec::new(),
);
assert!(mainloop.update_events(vec![event1_delete]).is_err());
}
#[test]
fn error_parked_operation_test() {
let mut mainloop = EventLoopContext::new();
let fd1 = EventFd::new(EFD_NONBLOCK).unwrap();
let fd2 = EventFd::new(EFD_NONBLOCK).unwrap();
let event1 = EventNotifier::new(
NotifierOperation::AddShared,
fd1.as_raw_fd(),
None,
EventSet::OUT,
Vec::new(),
);
mainloop.update_events(vec![event1]).unwrap();
let event2 = EventNotifier::new(
NotifierOperation::AddShared,
fd2.as_raw_fd(),
Some(fd1.as_raw_fd()),
EventSet::OUT,
Vec::new(),
);
mainloop.update_events(vec![event2]).unwrap();
// Delete parked event
let event1 = EventNotifier::new(
NotifierOperation::Delete,
fd1.as_raw_fd(),
None,
EventSet::OUT,
Vec::new(),
);
assert!(mainloop.update_events(vec![event1]).is_ok());
}
#[test]
fn fd_released_test() {
let mut mainloop = EventLoopContext::new();
let fd = mainloop.create_event();
// In this case, fd is already closed. But program was wrote to ignore the error.
let event = EventNotifier::new(
NotifierOperation::Delete,
fd,
None,
EventSet::OUT,
Vec::new(),
);
assert!(mainloop.update_events(vec![event]).is_ok());
}
}

Комментарий ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://gitlife.ru/oschina-mirror/openeuler-stratovirt.git
git@gitlife.ru:oschina-mirror/openeuler-stratovirt.git
oschina-mirror
openeuler-stratovirt
openeuler-stratovirt
v2.2.0-rc1