# coding: utf8 """yiwa后台-语音、浏览器、数据库等处理""" import time from importlib import import_module from fuzzywuzzy import fuzz from asr import play_audio_file from asr.awake import read_keywords from asr.stt import listening, wakeup from nlp import comparison from util.string import StringUtil from yiwa.browser import create as create_browser from yiwa.db import DataConveyor from yiwa.log import Log from yiwa.memory import SystemCache from yiwa.rpi import dpms_on from yiwa.settings import HOST, PORT, TIME, CappedFailAmount logger = Log().logger ROOT = f"http://{HOST}:{PORT}" if __name__ == "__main__": WAKEUP = False FAILURE = 0 # 指令匹配失败次数 data_conveyor = DataConveyor() data_conveyor._init() keywords = read_keywords() browser = create_browser() browser.get(ROOT) def _todo(browser, action: str, param=""): """ 做action对应的动作 :param browser: 浏览器对象 :param action: 动作路径, 例如: apps.self_discipline.action.encourage :param param: 动作命令参数名, 例如: '放学写作业' :return: 无 """ # 解析action路径 if not action.startswith("apps."): action = f"apps.{action}" mothed_path = action.split(".") package_path = ".".join(mothed_path[:-1]) # 包 mothed_name = mothed_path[-1] # 方法 # 执行action if package_path and mothed_name: try: package = import_module(package_path) # 动态导入包 todo = package.__getattribute__(mothed_name) # 获取包中的方法 if param: todo(browser, param) else: todo(browser) except Exception as e: logger.error(f"动态执行页面动作失败:{e}") def _command_filter(command): """过滤指令,缩小遍历范围""" # 相同指令 same_commands = data_conveyor.filter_command(command) return same_commands or [] def _exec(): """执行指令""" voice2text = listening() SystemCache().say(voice2text) # 去除标点符号(也可以采用jieba分词中的去除停用词实现) voice2text = StringUtil.remove_punctuation(voice2text) logger.info(f"发出指令>>> {voice2text}") data_conveyor.stt(voice2text) if voice2text is None: data_conveyor.stt("我没听到声音") return False if StringUtil.same_word(voice2text): data_conveyor.stt("声音太小啦") return False def __matched(command, action, appid): """已匹配到指令""" play_audio_file() logger.info(f"指令命中: {command}") data_conveyor.access(command) if action.startswith("/"): # 更新当前所在一级目录 data_conveyor.update_cur_appid(appid) # 页面访问 browser.get(ROOT + action) else: # 页面动作,部分动作带有动态参数 if ":" in action: # 带参数 _todo(browser, action.split(":")[0], command) else: # 不带参数 _todo(browser, action) data_conveyor.hot(action) # +指令热度 # sql关键词筛选指令(限定app) filtered_commands = _command_filter(voice2text) if len(filtered_commands) >= 1: commands, action, appid = filtered_commands[0] __matched(commands, action, appid) return True # 如果过滤不到指令,则采用text2vec搜索,限定当前app指令或yiwa全局指令 cur_appid = data_conveyor.cur_appid() sim_cmd = comparison.search(cur_appid, voice2text, only_one=True) if not sim_cmd: sim_cmd = comparison.search("yiwa", voice2text, only_one=True) if sim_cmd and len(sim_cmd) == 3 and isinstance(sim_cmd[0], str): command, action, appid = sim_cmd # 使用fuzzywuzzy库中的token_sort_ratio函数计算相似度,基本无关则跳过 similarity_ratio = fuzz.token_sort_ratio(voice2text, command) if similarity_ratio > 5: __matched(command, action, appid) return True logger.info("~_~ 指令不匹配") data_conveyor.info("~_~ 指令不匹配") return False while True: try: if WAKEUP: FAILURE = 0 if _exec() else (FAILURE + 1) else: word, up = wakeup(keywords) data_conveyor.stt(word) if up: WAKEUP = True logger.info("^_^ 唤醒成功") dpms_on() data_conveyor.wakeup() FAILURE = 0 if _exec() else (FAILURE + 1) else: data_conveyor.stt("") # 达到匹配上限次数则睡眠 if FAILURE >= CappedFailAmount: logger.info("睡眠待命") data_conveyor.sleep() WAKEUP = False FAILURE = 0 except Exception as e: logger.error(f"接收语音错误,{e}") data_conveyor.error() time.sleep(TIME) browser.close()