// Copyright (c) 2020 Huawei Technologies Co.,Ltd. All rights reserved.
//
// StratoVirt is licensed under Mulan PSL v2.
// You can use this software according to the terms and conditions of the Mulan
// PSL v2.
// You may obtain a copy of Mulan PSL v2 at:
//         http://license.coscl.org.cn/MulanPSL2
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
// KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
// NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PSL v2 for more details.

/// We use Leaky Bucket Algorithm to limit iops of block device and qmp.
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::Result;
use log::error;
use vmm_sys_util::eventfd::EventFd;

use crate::clock::get_current_time;
use crate::loop_context::EventLoopContext;
use crate::time::NANOSECONDS_PER_SECOND;

/// Used to improve the accuracy of bucket level.
const ACCURACY_SCALE: u64 = 1000;

/// Structure used to describe a Leaky Bucket.
pub struct LeakBucket {
    /// Indicate the capacity of bucket, which is config by user.
    capacity: u64,
    /// Current water level.
    level: u64,
    /// Internal used to calculate the delay of timer.
    prev_time: Instant,
    /// Indicate whether the timer started.
    timer_started: bool,
    /// When bucket is ready for allowing more IO operation, the internal callback will write this
    /// FD. This FD should be listened by IO thread.
    timer_wakeup: Arc<EventFd>,
}

impl LeakBucket {
    /// Construct function
    ///
    /// # Arguments
    ///
    /// * `units_ps` - units per second.
    pub fn new(units_ps: u64) -> Result<Self> {
        Ok(LeakBucket {
            capacity: units_ps * ACCURACY_SCALE,
            level: 0,
            prev_time: get_current_time(),
            timer_started: false,
            timer_wakeup: Arc::new(EventFd::new(libc::EFD_NONBLOCK)?),
        })
    }

    /// Return true if the bucket is full, and caller must return directly instead of launching IO.
    /// Otherwise, caller should not be affected.
    ///
    /// # Arguments
    ///
    /// * `loop_context` - used for delay function call.
    pub fn throttled(&mut self, loop_context: &mut EventLoopContext, need_units: u64) -> bool {
        // capacity value is zero, indicating that there is no need to limit
        if self.capacity == 0 {
            return false;
        }
        if self.timer_started {
            return true;
        }

        // update the water level
        let now = get_current_time();
        let nanos = (now - self.prev_time).as_nanos();
        if nanos > (self.level * NANOSECONDS_PER_SECOND / self.capacity) as u128 {
            self.level = 0;
        } else {
            self.level -= nanos as u64 * self.capacity / NANOSECONDS_PER_SECOND;
        }

        self.prev_time = now;

        // need to be throttled
        if self.level > self.capacity {
            let wakeup_clone = self.timer_wakeup.clone();
            let func = Box::new(move || {
                wakeup_clone
                    .write(1)
                    .unwrap_or_else(|e| error!("LeakBucket send event to device failed {:?}", e));
            });

            loop_context.timer_add(
                func,
                Duration::from_nanos(
                    (self.level - self.capacity) * NANOSECONDS_PER_SECOND / self.capacity,
                ),
            );

            self.timer_started = true;

            return true;
        }

        self.level += need_units * ACCURACY_SCALE;

        false
    }

    /// Clear the timer state.
    pub fn clear_timer(&mut self) {
        self.timer_started = false;
    }

    /// Get raw fd of wakeup event.
    pub fn as_raw_fd(&self) -> RawFd {
        self.timer_wakeup.as_raw_fd()
    }
}