// Copyright (c) 2020 Huawei Technologies Co.,Ltd. All rights reserved.
//
// StratoVirt is licensed under Mulan PSL v2.
// You can use this software according to the terms and conditions of the Mulan
// PSL v2.
// You may obtain a copy of Mulan PSL v2 at:
//         http://license.coscl.org.cn/MulanPSL2
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
// KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
// NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PSL v2 for more details.

use std::fs::{read_link, File, OpenOptions};
use std::io::{Stdin, Stdout};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::{bail, Context, Result};
use log::{error, info, warn};
use nix::fcntl::{fcntl, FcntlArg, OFlag};
use nix::pty::openpty;
use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg, Termios};
use vmm_sys_util::epoll::EventSet;

use machine_manager::event_loop::EventLoop;
use machine_manager::machine::{PathInfo, PTY_PATH};
use machine_manager::{
    config::{ChardevConfig, ChardevType},
    temp_cleaner::TempCleaner,
};
use util::file::clear_file;
use util::loop_context::{
    gen_delete_notifiers, EventNotifier, EventNotifierHelper, NotifierCallback, NotifierOperation,
};
use util::set_termi_raw_mode;
use util::socket::{SocketListener, SocketStream};
use util::unix::limit_permission;

/// Provide the trait that helps handle the input data.
pub trait InputReceiver: Send {
    /// Handle the input data and trigger interrupt if necessary.
    fn receive(&mut self, buffer: &[u8]);
    /// Return the remain space size of receiver buffer.
    /// 0 if receiver is not ready or no space in FIFO
    fn remain_size(&mut self) -> usize;
    /// Tell receiver that RX is paused and receiver
    /// must unpause it when it becomes ready
    fn set_paused(&mut self);
}

/// Provide the trait that notifies device the socket is opened or closed.
pub trait ChardevNotifyDevice: Send {
    fn chardev_notify(&mut self, status: ChardevStatus);
}

pub enum ChardevStatus {
    Close,
    Open,
}

/// Character device structure.
pub struct Chardev {
    /// Id of chardev.
    id: String,
    /// Type of backend device.
    backend: ChardevType,
    /// Socket listener for chardev of socket type.
    listener: Option<SocketListener>,
    /// Chardev input.
    input: Option<Arc<Mutex<dyn CommunicatInInterface>>>,
    /// Chardev output.
    pub output: Option<Arc<Mutex<dyn CommunicatOutInterface>>>,
    /// Fd of socket stream.
    stream_fd: Option<i32>,
    /// Input receiver.
    receiver: Option<Arc<Mutex<dyn InputReceiver>>>,
    /// Used to notify device the socket is opened or closed.
    dev: Option<Arc<Mutex<dyn ChardevNotifyDevice>>>,
    /// Whether event-handling of device is initialized
    /// and we wait for port to become available
    wait_port: bool,
    /// Scheduled DPC to unpause input stream.
    /// Unpause must be done inside event-loop
    unpause_timer: Option<u64>,
}

impl Chardev {
    pub fn new(chardev_cfg: ChardevConfig) -> Self {
        Chardev {
            id: chardev_cfg.id,
            backend: chardev_cfg.backend,
            listener: None,
            input: None,
            output: None,
            stream_fd: None,
            receiver: None,
            dev: None,
            wait_port: false,
            unpause_timer: None,
        }
    }

    pub fn realize(&mut self) -> Result<()> {
        match &self.backend {
            ChardevType::Stdio => {
                set_termi_raw_mode().with_context(|| "Failed to set terminal to raw mode")?;
                self.input = Some(Arc::new(Mutex::new(std::io::stdin())));
                self.output = Some(Arc::new(Mutex::new(std::io::stdout())));
            }
            ChardevType::Pty => {
                let (master, path) =
                    set_pty_raw_mode().with_context(|| "Failed to set pty to raw mode")?;
                info!("Pty path is: {:?}", path);
                let path_info = PathInfo {
                    path: format!("pty:{:?}", &path),
                    label: self.id.clone(),
                };
                PTY_PATH.lock().unwrap().push(path_info);
                // SAFETY: master was created in the function of set_pty_raw_mode,
                // the value can be guaranteed to be legal.
                let master_arc = Arc::new(Mutex::new(unsafe { File::from_raw_fd(master) }));
                self.input = Some(master_arc.clone());
                self.output = Some(master_arc);
            }
            ChardevType::UnixSocket {
                path,
                server,
                nowait,
            } => {
                if !*server || !*nowait {
                    bail!(
                        "Argument \'server\' and \'nowait\' are both required for chardev \'{}\'",
                        &self.id
                    );
                }

                clear_file(path.clone())?;
                let listener = SocketListener::bind_by_uds(path).with_context(|| {
                    format!(
                        "Failed to bind socket for chardev \'{}\', path: {}",
                        &self.id, path
                    )
                })?;
                self.listener = Some(listener);

                // add file to temporary pool, so it could be cleaned when vm exit.
                TempCleaner::add_path(path.clone());
                limit_permission(path).with_context(|| {
                    format!(
                        "Failed to change file permission for chardev \'{}\', path: {}",
                        &self.id, path
                    )
                })?;
            }
            ChardevType::TcpSocket {
                host,
                port,
                server,
                nowait,
            } => {
                if !*server || !*nowait {
                    bail!(
                        "Argument \'server\' and \'nowait\' are both required for chardev \'{}\'",
                        &self.id
                    );
                }

                let listener = SocketListener::bind_by_tcp(host, *port).with_context(|| {
                    format!(
                        "Failed to bind socket for chardev \'{}\', address: {}:{}",
                        &self.id, host, port
                    )
                })?;
                self.listener = Some(listener);
            }
            ChardevType::File(path) => {
                let file = Arc::new(Mutex::new(
                    OpenOptions::new()
                        .read(true)
                        .write(true)
                        .create(true)
                        .open(path)?,
                ));
                self.output = Some(file);
            }
        };
        Ok(())
    }

    pub fn set_receiver<T: 'static + InputReceiver>(&mut self, dev: &Arc<Mutex<T>>) {
        self.receiver = Some(dev.clone());
        if self.wait_port {
            warn!("Serial port for chardev \'{}\' appeared.", &self.id);
            self.wait_port = false;
            self.unpause_rx();
        }
    }

    fn wait_for_port(&mut self, input_fd: RawFd) -> EventNotifier {
        // set_receiver() will unpause rx
        warn!(
            "Serial port for chardev \'{}\' is not ready yet, waiting for port.",
            &self.id
        );

        self.wait_port = true;

        EventNotifier::new(
            NotifierOperation::Modify,
            input_fd,
            None,
            EventSet::HANG_UP,
            vec![],
        )
    }

    pub fn set_device(&mut self, dev: Arc<Mutex<dyn ChardevNotifyDevice>>) {
        self.dev = Some(dev.clone());
    }

    pub fn unpause_rx(&mut self) {
        // Receiver calls this if it returned 0 from remain_size()
        // and now it's ready to accept rx-data again
        if self.input.is_none() {
            error!("unpause called for non-initialized device \'{}\'", &self.id);
            return;
        }
        if self.unpause_timer.is_some() {
            return; // already set
        }

        let input_fd = self.input.clone().unwrap().lock().unwrap().as_raw_fd();

        let unpause_fn = Box::new(move || {
            let res = EventLoop::update_event(
                vec![EventNotifier::new(
                    NotifierOperation::Modify,
                    input_fd,
                    None,
                    EventSet::IN | EventSet::HANG_UP,
                    vec![],
                )],
                None,
            );
            if let Err(e) = res {
                error!("Failed to unpause on fd {input_fd}: {e:?}");
            }
        });
        let main_loop = EventLoop::get_ctx(None).unwrap();
        let timer_id = main_loop.timer_add(unpause_fn, Duration::ZERO);
        self.unpause_timer = Some(timer_id);
    }

    fn cancel_unpause_timer(&mut self) {
        if let Some(timer_id) = self.unpause_timer {
            let main_loop = EventLoop::get_ctx(None).unwrap();
            main_loop.timer_del(timer_id);
            self.unpause_timer = None;
        }
    }
}

fn set_pty_raw_mode() -> Result<(i32, PathBuf)> {
    let (master, slave) = match openpty(None, None) {
        Ok(res) => (res.master, res.slave),
        Err(e) => bail!("Failed to open pty, error is {:?}", e),
    };

    let proc_path = PathBuf::from(format!("/proc/self/fd/{}", slave));
    let path = read_link(proc_path).with_context(|| "Failed to read slave pty link")?;

    let mut new_termios: Termios = match tcgetattr(slave) {
        Ok(tm) => tm,
        Err(e) => bail!("Failed to get mode of pty, error is {:?}", e),
    };

    cfmakeraw(&mut new_termios);

    if let Err(e) = tcsetattr(slave, SetArg::TCSAFLUSH, &new_termios) {
        bail!("Failed to set pty to raw mode, error is {:?}", e);
    }

    let fcnt_arg = FcntlArg::F_SETFL(OFlag::from_bits(libc::O_NONBLOCK).unwrap());
    if let Err(e) = fcntl(master, fcnt_arg) {
        bail!(
            "Failed to set pty master to nonblocking mode, error is {:?}",
            e
        );
    }

    Ok((master, path))
}

// Notification handling in case of stdio or pty usage.
fn get_terminal_notifier(chardev: Arc<Mutex<Chardev>>) -> Option<EventNotifier> {
    let locked_chardev = chardev.lock().unwrap();
    let input = locked_chardev.input.clone();
    if input.is_none() {
        // Method `realize` expected to be called before we get here because to build event
        // notifier we need already valid file descriptors here.
        error!(
            "Failed to initialize input events for chardev \'{}\', chardev not initialized",
            &locked_chardev.id
        );
        return None;
    }

    let cloned_chardev = chardev.clone();
    let input_fd = input.unwrap().lock().unwrap().as_raw_fd();

    let event_handler: Rc<NotifierCallback> = Rc::new(move |_, _| {
        let mut locked_chardev = cloned_chardev.lock().unwrap();
        if locked_chardev.receiver.is_none() {
            let wait_port = locked_chardev.wait_for_port(input_fd);
            return Some(vec![wait_port]);
        }

        locked_chardev.cancel_unpause_timer(); // it will be rescheduled if needed

        let receiver = locked_chardev.receiver.clone().unwrap();
        let input = locked_chardev.input.clone().unwrap();
        drop(locked_chardev);

        let mut locked_receiver = receiver.lock().unwrap();
        let buff_size = locked_receiver.remain_size();
        if buff_size == 0 {
            locked_receiver.set_paused();

            return Some(vec![EventNotifier::new(
                NotifierOperation::Modify,
                input_fd,
                None,
                EventSet::HANG_UP,
                vec![],
            )]);
        }

        let mut buffer = vec![0_u8; buff_size];
        if let Ok(bytes_count) = input.lock().unwrap().chr_read_raw(&mut buffer) {
            locked_receiver.receive(&buffer[..bytes_count]);
        } else {
            let os_error = std::io::Error::last_os_error();
            let locked_chardev = cloned_chardev.lock().unwrap();
            error!(
                "Failed to read input data from chardev \'{}\', {}",
                &locked_chardev.id, &os_error
            );
        }
        None
    });

    Some(EventNotifier::new(
        NotifierOperation::AddShared,
        input_fd,
        None,
        EventSet::IN,
        vec![event_handler],
    ))
}

// Notification handling in case of listening (server) socket.
fn get_socket_notifier(chardev: Arc<Mutex<Chardev>>) -> Option<EventNotifier> {
    let locked_chardev = chardev.lock().unwrap();
    let listener = &locked_chardev.listener;
    if listener.is_none() {
        // Method `realize` expected to be called before we get here because to build event
        // notifier we need already valid file descriptors here.
        error!(
            "Failed to setup io-event notifications for chardev \'{}\', device not initialized",
            &locked_chardev.id
        );
        return None;
    }

    let cloned_chardev = chardev.clone();
    let event_handler: Rc<NotifierCallback> = Rc::new(move |_, _| {
        let mut locked_chardev = cloned_chardev.lock().unwrap();

        let stream = locked_chardev.listener.as_ref().unwrap().accept().unwrap();
        let connection_info = stream.link_description();
        info!(
            "Chardev \'{}\' event, connection opened: {}",
            &locked_chardev.id, connection_info
        );
        let stream_fd = stream.as_raw_fd();
        let stream_arc = Arc::new(Mutex::new(stream));
        let listener_fd = locked_chardev.listener.as_ref().unwrap().as_raw_fd();
        let notify_dev = locked_chardev.dev.clone();

        locked_chardev.stream_fd = Some(stream_fd);
        locked_chardev.input = Some(stream_arc.clone());
        locked_chardev.output = Some(stream_arc.clone());
        drop(locked_chardev);

        if let Some(dev) = notify_dev {
            dev.lock().unwrap().chardev_notify(ChardevStatus::Open);
        }

        let handling_chardev = cloned_chardev.clone();
        let close_connection = Rc::new(move || {
            let mut locked_chardev = handling_chardev.lock().unwrap();
            let notify_dev = locked_chardev.dev.clone();
            locked_chardev.input = None;
            locked_chardev.output = None;
            locked_chardev.stream_fd = None;
            locked_chardev.cancel_unpause_timer();
            info!(
                "Chardev \'{}\' event, connection closed: {}",
                &locked_chardev.id, connection_info
            );
            drop(locked_chardev);

            if let Some(dev) = notify_dev {
                dev.lock().unwrap().chardev_notify(ChardevStatus::Close);
            }

            // Note: we use stream_arc variable here because we want to capture it and prolongate
            // its lifetime with this notifier callback lifetime. It allows us to ensure
            // that socket fd be valid until we unregister it from epoll_fd subscription.
            let stream_fd = stream_arc.lock().unwrap().as_raw_fd();
            Some(gen_delete_notifiers(&[stream_fd]))
        });

        let handling_chardev = cloned_chardev.clone();
        let input_handler: Rc<NotifierCallback> = Rc::new(move |event, _| {
            let mut locked_chardev = handling_chardev.lock().unwrap();

            let peer_disconnected = event & EventSet::HANG_UP == EventSet::HANG_UP;
            if peer_disconnected && locked_chardev.receiver.is_none() {
                drop(locked_chardev);
                return close_connection();
            }

            let input_ready = event & EventSet::IN == EventSet::IN;
            if input_ready {
                locked_chardev.cancel_unpause_timer();

                if locked_chardev.receiver.is_none() {
                    let wait_port = locked_chardev.wait_for_port(stream_fd);
                    return Some(vec![wait_port]);
                }

                let receiver = locked_chardev.receiver.clone().unwrap();
                let input = locked_chardev.input.clone().unwrap();
                drop(locked_chardev);

                let mut locked_receiver = receiver.lock().unwrap();
                let buff_size = locked_receiver.remain_size();
                if buff_size == 0 {
                    locked_receiver.set_paused();

                    return Some(vec![EventNotifier::new(
                        NotifierOperation::Modify,
                        stream_fd,
                        None,
                        EventSet::HANG_UP,
                        vec![],
                    )]);
                }

                let mut buffer = vec![0_u8; buff_size];
                let mut locked_input = input.lock().unwrap();
                if let Ok(bytes_count) = locked_input.chr_read_raw(&mut buffer) {
                    if bytes_count > 0 {
                        locked_receiver.receive(&buffer[..bytes_count]);
                    } else {
                        drop(locked_receiver);
                        drop(locked_input);
                        return close_connection();
                    }
                } else {
                    let os_error = std::io::Error::last_os_error();
                    if os_error.kind() != std::io::ErrorKind::WouldBlock {
                        let locked_chardev = handling_chardev.lock().unwrap();
                        error!(
                            "Failed to read input data from chardev \'{}\', {}",
                            &locked_chardev.id, &os_error
                        );
                    }
                }
            }

            None
        });

        Some(vec![EventNotifier::new(
            NotifierOperation::AddShared,
            stream_fd,
            Some(listener_fd),
            EventSet::IN | EventSet::HANG_UP,
            vec![input_handler],
        )])
    });

    let listener_fd = listener.as_ref().unwrap().as_raw_fd();
    Some(EventNotifier::new(
        NotifierOperation::AddShared,
        listener_fd,
        None,
        EventSet::IN,
        vec![event_handler],
    ))
}

impl EventNotifierHelper for Chardev {
    fn internal_notifiers(chardev: Arc<Mutex<Self>>) -> Vec<EventNotifier> {
        let notifier = {
            let backend = chardev.lock().unwrap().backend.clone();
            match backend {
                ChardevType::Stdio => get_terminal_notifier(chardev),
                ChardevType::Pty => get_terminal_notifier(chardev),
                ChardevType::UnixSocket { .. } => get_socket_notifier(chardev),
                ChardevType::TcpSocket { .. } => get_socket_notifier(chardev),
                ChardevType::File(_) => None,
            }
        };
        notifier.map_or(Vec::new(), |value| vec![value])
    }
}

/// Provide backend trait object receiving the input from the guest.
pub trait CommunicatInInterface: std::marker::Send + std::os::unix::io::AsRawFd {
    fn chr_read_raw(&mut self, buf: &mut [u8]) -> Result<usize> {
        match nix::unistd::read(self.as_raw_fd(), buf) {
            Err(e) => bail!("Failed to read buffer: {:?}", e),
            Ok(bytes) => Ok(bytes),
        }
    }
}

/// Provide backend trait object processing the output from the guest.
pub trait CommunicatOutInterface: std::io::Write + std::marker::Send {}

impl CommunicatInInterface for SocketStream {}
impl CommunicatInInterface for File {}
impl CommunicatInInterface for Stdin {}

impl CommunicatOutInterface for SocketStream {}
impl CommunicatOutInterface for File {}
impl CommunicatOutInterface for Stdout {}