// Copyright (c) 2020 Huawei Technologies Co.,Ltd. All rights reserved. // // StratoVirt is licensed under Mulan PSL v2. // You can use this software according to the terms and conditions of the Mulan // PSL v2. // You may obtain a copy of Mulan PSL v2 at: // http://license.coscl.org.cn/MulanPSL2 // THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY // KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO // NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. // See the Mulan PSL v2 for more details. /// We use Leaky Bucket Algorithm to limit iops of block device and qmp. use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Instant; use log::error; use vmm_sys_util::eventfd::EventFd; use crate::loop_context::EventLoopContext; /// Used to improve the accuracy of bucket level. const ACCURACY_SCALE: u64 = 1000; /// Nanoseconds per second. const NANOS_PER_SEC: u64 = 1_000_000_000; /// Structure used to describe a Leaky Bucket. pub struct LeakBucket { /// Indicate the capacity of bucket, which is config by user. capacity: u64, /// Current water level. level: u64, /// Internal used to calculate the delay of timer. prev_time: Instant, /// Indicate whether the timer started. timer_started: bool, /// When bucket is ready for allowing more IO operation, the internal callback will write this FD. /// This FD should be listened by IO thread. timer_wakeup: EventFd, } impl LeakBucket { /// Construct function /// /// # Arguments /// /// * `units_ps` - units per second. pub fn new(units_ps: u64) -> Self { LeakBucket { capacity: units_ps * ACCURACY_SCALE, level: 0, prev_time: Instant::now(), timer_started: false, timer_wakeup: EventFd::new(libc::EFD_NONBLOCK).unwrap(), } } /// Return true if the bucket is full, and caller must return directly instead of launching IO. /// Otherwise, caller should not be affected. /// /// # Arguments /// /// * `loop_context` - used for delay function call. pub fn throttled(&mut self, loop_context: &mut EventLoopContext, need_units: u64) -> bool { // capacity value is zero, indicating that there is no need to limit if self.capacity == 0 { return false; } if self.timer_started { return true; } // update the water level let now = Instant::now(); let nanos = (now - self.prev_time).as_nanos(); if nanos > (self.level * NANOS_PER_SEC / self.capacity) as u128 { self.level = 0; } else { self.level -= nanos as u64 * self.capacity / NANOS_PER_SEC; } self.prev_time = now; // need to be throttled if self.level > self.capacity { let wakeup_clone = self.timer_wakeup.try_clone().unwrap(); let func = Box::new(move || { wakeup_clone .write(1) .unwrap_or_else(|e| error!("LeakBucket send event to device failed {}", e)); }); loop_context.delay_call( func, (self.level - self.capacity) * NANOS_PER_SEC / self.capacity, ); self.timer_started = true; return true; } self.level += need_units * ACCURACY_SCALE; false } /// Clear the timer state. pub fn clear_timer(&mut self) { self.timer_started = false; } /// Get raw fd of wakeup event. pub fn as_raw_fd(&self) -> RawFd { self.timer_wakeup.as_raw_fd() } }