diff --git a/README.md b/README.md index c872b40..abe82d1 100644 --- a/README.md +++ b/README.md @@ -82,4 +82,8 @@ 本项目当前并不完善,还有很多需要优化的地方,并且尚未接入Yosuga_embedded。 -欢迎大家为本项目贡献代码。 \ No newline at end of file +欢迎大家为本项目贡献代码。 + +## Star History + +[](https://www.star-history.com/#Misakityan/Yosuga_server&Date) \ No newline at end of file diff --git a/Test/embeddedServerTest.py b/Test/embeddedServerTest.py new file mode 100644 index 0000000..a55f5bf --- /dev/null +++ b/Test/embeddedServerTest.py @@ -0,0 +1,269 @@ + +import socket +import struct +import threading +import json +import os +from typing import Optional + +from openai import OpenAI # 确保 pip install openai + +from src.server_core.yosuga_embedded_server.device_manager import DeviceManager, DeviceInfo +from src.server_core.yosuga_embedded_server.function_registry import FunctionRegistry +from src.server_core.yosuga_embedded_server.ai_prompt import AIPromptBuilder +from src.server_core.yosuga_embedded_server.json_rpc import JSONRPCHandler, RPCError + + +class DeviceConnection: + """管理一个 TCP 设备连接""" + def __init__(self, sock, addr, server): + self.sock = sock + self.addr = addr + self.server = server + self.device_id: Optional[str] = None + + def send_msg(self, data: str): + """发送带长度前缀的消息""" + encoded = data.encode('utf-8') + self.sock.sendall(struct.pack('!I', len(encoded))) + self.sock.sendall(encoded) + + def recv_msg(self) -> Optional[str]: + try: + # 读满 4 字节长度前缀 + raw_len = b'' + while len(raw_len) < 4: + chunk = self.sock.recv(4 - len(raw_len)) + if not chunk: # 对方关闭连接 + return None + raw_len += chunk + msg_len = struct.unpack('!I', raw_len)[0] + + # 读消息体 + data = b'' + while len(data) < msg_len: + chunk = self.sock.recv(msg_len - len(data)) + if not chunk: + return None + data += chunk + return data.decode('utf-8') + except Exception as e: + print(f"recv error: {e}") + return None + + def handle(self): + try: + # 只接收第一条能力广告 + caps_str = self.recv_msg() + if not caps_str: + return + print(f"[{self.addr}] Capabilities received") + device = self.server.register_device(caps_str) + self.device_id = device.device_id + self.server.set_connection(device.device_id, self) + print(f"[{self.addr}] Registered as {device.name} (id={device.device_id})") + + # 此处直接返回,线程结束。后续通信由 call_device 接管。 + except Exception as e: + print(f"Registration error: {e}") + finally: + # 注意:不要关闭 socket!它还要用于后续 call_device + pass + + +class YosugaServer: + """整合设备管理、函数注册、AI 交互的服务端""" + def __init__(self, deepseek_api_key: str): + self.device_manager = DeviceManager() + self.function_registry = FunctionRegistry() + self.ai_prompt = AIPromptBuilder() + self._lock = threading.Lock() + self._call_id_counter = 0 + + # 关联设备变更 + self.device_manager.on_device_change = self._on_device_change + self.function_registry.on_change = self._on_functions_change + + # DeepSeek 客户端 + self.ai_client = OpenAI( + api_key=deepseek_api_key, + base_url="https://api.deepseek.com" + ) + + # 设备连接表:device_id -> DeviceConnection + self._connections: dict[str, DeviceConnection] = {} + + def _on_device_change(self, event: str, device: DeviceInfo): + print(f"Device event: {event} {device.device_id}") + if event == "removed": + self.function_registry.remove_device_functions(device.device_id) + conn = self._connections.pop(device.device_id, None) + if conn and conn.sock: + try: + conn.sock.close() + except Exception: + pass + elif event in ("added", "updated"): + if device.state.value == "registered": + self.function_registry.add_device_functions( + device.device_id, + device.name, + device.functions or [], + ) + + def _on_functions_change(self): + print(f"Functions updated, total: {self.function_registry.function_count()}") + + def register_device(self, caps_json: str) -> DeviceInfo: + data = json.loads(caps_json) + return self.device_manager.register_from_json(data) + + def remove_device(self, device_id: str): + self.device_manager.remove_device(device_id) + + def set_connection(self, device_id: str, conn: DeviceConnection): + with self._lock: + self._connections[device_id] = conn + + def call_device(self, device_id: str, method: str, params: dict, call_id: int) -> dict: + conn = self._connections.get(device_id) + if not conn: + return {"error": {"code": RPCError.DEVICE_NOT_FOUND, + "message": f"Device {device_id} not connected"}} + + req = JSONRPCHandler.build_call(method, params, call_id) + try: + conn.send_msg(req) + resp_str = conn.recv_msg_with_timeout(timeout=5.0) + if resp_str is None: + # 超时或连接断开 + self.remove_device(device_id) # 自动清理 + return {"error": {"code": RPCError.TIMEOUT, "message": "Device timeout or disconnected"}} + + resp = JSONRPCHandler.parse_response(resp_str) + if resp and resp.is_success(): + return {"result": resp.result} + elif resp and resp.error: + return {"error": resp.error.to_dict()} + else: + return {"error": {"code": RPCError.PARSE_ERROR, "message": "Invalid response"}} + except Exception as e: + self.remove_device(device_id) + return {"error": {"code": RPCError.DEVICE_ERROR, "message": str(e)}} + + def process_device_message(self, device_id: str, message: str) -> str: + """处理从设备主动发来的消息(如 RPC 响应)""" + # 在这个架构中,设备不会主动发消息,所有交互都由服务端发起 + # 这里仅作为占位,返回空 + return "" + + def handle_user_request(self, user_input: str) -> str: + """处理用户自然语言请求:调用 AI,执行函数,返回最终答案""" + # 1. 构建系统提示(包含当前所有函数) + func_list = self.function_registry.to_function_list() + system_prompt = self.ai_prompt.build_system_prompt(func_list) + + # 2. 调用 DeepSeek + print("Calling DeepSeek...") + try: + response = self.ai_client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_input} + ], + temperature=0.0, # 确保输出稳定 + ) + ai_text = response.choices[0].message.content.strip() + print(f"AI response:\n{ai_text}") + except Exception as e: + return f"AI error: {e}" + + # 3. 解析 AI 返回的调用 + calls = self.ai_prompt.parse_ai_response(ai_text) + if not calls: + return "Failed to parse AI response as JSON-RPC calls" + + # 4. 逐个执行调用 + results = [] + for idx, call in enumerate(calls): + method = call.get("method") + params = call.get("params", {}) + call_id = idx + 1 # 简单自增 ID + + # 找到该函数所属设备 + func_info = self.function_registry.get_function(method) + if not func_info: + results.append(f"❌ Unknown function: {method}") + continue + + device_id = func_info.device_id + print(f"Routing '{method}' to device {device_id}") + + # 发送给设备 + dev_resp = self.call_device(device_id, method, params, call_id) + if "error" in dev_resp: + results.append(f"❌ {method}: {dev_resp['error']['message']}") + else: + results.append(f"✅ {method}: {dev_resp.get('result', 'ok')}") + + # 5. 汇总结果返回 + summary = "\n".join(results) + print(f"Task result:\n{summary}") + return summary + + +# ========== 修改 DeviceConnection 的 recv_msg 支持超时 ========== + +def recv_msg_with_timeout(self, timeout: float = 5.0) -> Optional[str]: + """接收带长度前缀的消息,带超时""" + self.sock.settimeout(timeout) + try: + return self.recv_msg() + except socket.timeout: + return None + finally: + self.sock.settimeout(None) + +DeviceConnection.recv_msg_with_timeout = recv_msg_with_timeout # monkey-patch + + +# ========== 主函数 ========== +def main(): + + + server = YosugaServer("") + + # 启动 TCP 监听,接收设备连接 + def accept_connections(): + listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(('0.0.0.0', 9555)) + listener.listen(5) + print("Server listening on port 9555...") + while True: + sock, addr = listener.accept() + print(f"New connection from {addr}") + conn = DeviceConnection(sock, addr, server) + # 在连接注册后,关联 device_id 需要等到能力广告,暂存一下 + # 我们在 handle 中收到能力后再设置 server.set_connection + threading.Thread(target=conn.handle, daemon=True).start() + + threading.Thread(target=accept_connections, daemon=True).start() + + # 简单交互循环 + print("\nEnter your requests (type 'quit' to exit):") + while True: + try: + user_input = input("> ") + except EOFError: + break + if user_input.lower() in ('quit', 'exit'): + break + if not user_input.strip(): + continue + result = server.handle_user_request(user_input) + print("---\n" + result + "\n---") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/main.py b/main.py index 0cccc0c..ac49928 100644 --- a/main.py +++ b/main.py @@ -41,5 +41,5 @@ if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: - print("\nYosuga服务器已停止喵~~~") + print("\nYosuga服务端已停止喵~~~") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 9f1eb20..c725da3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,10 +7,15 @@ requires-python = ">=3.11" dependencies = [ "aiofiles>=25.1.0", "aiohttp>=3.13.3", + "eventlet>=0.40.4", "fastapi>=0.128.0", "faster-whisper>=1.2.1", + "flask>=3.1.3", + "flask-cors>=6.0.2", + "flask-socketio>=5.6.1", "loguru>=0.7.3", "openai>=2.16.0", + "psutil>=7.2.2", "pyautogui>=0.9.54", "pydantic>=2.12.5", "pydantic-settings>=2.12.0", diff --git a/src/config/config.py b/src/config/config.py index 071e3a7..4552e86 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -29,7 +29,7 @@ class AIConfig: model_name: str = "qwen/qwen3-4b-2507" timeout: int = 30 temperature: float = 0.4 - max_tokens: int = 8192 + max_tokens: int = 4096 @dataclass @@ -98,12 +98,22 @@ class AppConfig: # 基础字段 self.version = version self.debug = debug - self.ai = ai if ai is not None else AIConfig() - self.tts = tts if tts is not None else TTSConfig() - self.asr = asr if asr is not None else ASRConfig() - self.auto_agent = auto_agent if auto_agent is not None else AutoAgentConfig() - self.llm_core = llm_core if llm_core is not None else LLMConfig() - self.paths = paths if paths is not None else PathsConfig() + + # self.ai = ai if ai is not None else AIConfig() + # self.tts = tts if tts is not None else TTSConfig() + # self.asr = asr if asr is not None else ASRConfig() + # self.auto_agent = auto_agent if auto_agent is not None else AutoAgentConfig() + # self.llm_core = llm_core if llm_core is not None else LLMConfig() + # self.paths = paths if paths is not None else PathsConfig() + + # 如果是字典则转换,否则使用默认值 + self.ai = AIConfig(**ai) if isinstance(ai, dict) else (ai or AIConfig()) + self.tts = TTSConfig(**tts) if isinstance(tts, dict) else (tts or TTSConfig()) + self.asr = ASRConfig(**asr) if isinstance(asr, dict) else (asr or ASRConfig()) + self.auto_agent = AutoAgentConfig(**auto_agent) if isinstance(auto_agent, dict) else ( + auto_agent or AutoAgentConfig()) + self.llm_core = LLMConfig(**llm_core) if isinstance(llm_core, dict) else (llm_core or LLMConfig()) + self.paths = PathsConfig(**paths) if isinstance(paths, dict) else (paths or PathsConfig()) # 内部状态(非 dataclass 字段,不会被序列化) self._config_path = _config_path @@ -395,6 +405,12 @@ class _LazyConfig: def using_dir(self) -> Path: return _ensure_initialized().using_dir +def ensure_config_initialized(): + """ + 强制立即初始化配置(用于多线程环境) + 返回真正的 AppConfig 实例而非代理 + """ + return _ensure_initialized() # 全局配置对象:导入即用,自动初始化 cfg: AppConfig = _LazyConfig() # type: ignore diff --git a/src/modules/asr_module/asr_core/fast_whisper/asr_interface.py b/src/modules/asr_module/asr_core/fast_whisper/asr_interface.py index 6eeab10..6b2855b 100644 --- a/src/modules/asr_module/asr_core/fast_whisper/asr_interface.py +++ b/src/modules/asr_module/asr_core/fast_whisper/asr_interface.py @@ -39,7 +39,7 @@ class ASRInterface: self.sample_rate = 16000 self._initialized = True - logger.info("🎤 ASR接口初始化完成") + logger.info("ASR接口初始化完成") @classmethod def get_instance(cls, config: Optional[ASRConfig] = None) -> 'ASRInterface': @@ -68,7 +68,7 @@ class ASRInterface: import time start_time = time.time() - logger.info(f"🎵 开始识别: {wav_path.name}") + logger.info(f"开始识别: {wav_path.name}") # 执行识别... audio = self._load_audio(wav_path) @@ -78,14 +78,14 @@ class ASRInterface: # 计算耗时 processing_time = time.time() - start_time logger.info( - f"✅ 识别完成: {lang} | {len(text)}字符 | 置信度:{confidence:.2f} | " + f"识别完成: {lang} | {len(text)}字符 | 置信度:{confidence:.2f} | " f"耗时:{processing_time:.3f}s | RTF:{processing_time/(len(audio)/self.sample_rate):.3f}" ) return text, lang, confidence except Exception as e: - logger.error(f"❌ 识别失败 {wav_path}: {e}") + logger.error(f"识别失败 {wav_path}: {e}") raise RuntimeError(f"Transcription failed: {e}") def _load_audio(self, wav_path: Path) -> numpy.ndarray: @@ -180,5 +180,5 @@ class ASRInterface: def shutdown(self): """优雅关闭""" - logger.info("🛑 关闭ASR接口...") + logger.info("关闭ASR接口...") self.model_manager.unload() \ No newline at end of file diff --git a/src/modules/asr_module/asr_core/fast_whisper/model_manager.py b/src/modules/asr_module/asr_core/fast_whisper/model_manager.py index f1bbaab..2a2cc2e 100644 --- a/src/modules/asr_module/asr_core/fast_whisper/model_manager.py +++ b/src/modules/asr_module/asr_core/fast_whisper/model_manager.py @@ -34,8 +34,8 @@ class ModelManager: def _load_model(self): """加载模型""" - logger.info(f"🚀 初始化模型: {self.config.model_name}") - logger.info(f"📦 设备: {self.config.device}, 计算类型: {self.config.compute_type}") + logger.info(f"初始化模型: {self.config.model_name}") + logger.info(f"设备: {self.config.device}, 计算类型: {self.config.compute_type}") try: self._model = WhisperModel( @@ -52,15 +52,15 @@ class ModelManager: "model_size": self.config.model_name.split("-")[-2] } - logger.info("✅ 模型加载成功") + logger.info("模型加载成功") except Exception as e: - logger.error(f"❌ 模型加载失败: {e}") + logger.error(f"模型加载失败: {e}") raise RuntimeError(f"Failed to load ASR model: {e}") def reload(self, new_config: ASRConfig): """热重载模型""" - logger.info("🔄 热重载模型...") + logger.info("热重载模型...") self.unload() self.config = new_config self._load_model() @@ -68,7 +68,7 @@ class ModelManager: def unload(self): """卸载模型释放资源""" if self._model is not None: - logger.info("🗑️ 卸载模型...") + logger.info("卸载模型...") del self._model self._model = None @@ -77,7 +77,7 @@ class ModelManager: gc.collect() - logger.info("✅ 模型已卸载") + logger.info("模型已卸载") def get_device_info(self) -> dict: """获取设备信息""" diff --git a/src/modules/asr_module/asr_core/fast_whisper/utils.py b/src/modules/asr_module/asr_core/fast_whisper/utils.py index 6fe2a58..bdf1e2f 100644 --- a/src/modules/asr_module/asr_core/fast_whisper/utils.py +++ b/src/modules/asr_module/asr_core/fast_whisper/utils.py @@ -42,4 +42,4 @@ class PerformanceProfiler: if len(self.stats) % 10 == 0: avg_rtf = sum(s["rtf"] for s in self.stats[-10:]) / 10 - logger.info(f"📊 最近10次平均RTF: {avg_rtf:.3f}") \ No newline at end of file + logger.info(f"最近10次平均RTF: {avg_rtf:.3f}") \ No newline at end of file diff --git a/src/modules/asr_module/client/asr_client.py b/src/modules/asr_module/client/asr_client.py index 041c83e..5af4bc4 100644 --- a/src/modules/asr_module/client/asr_client.py +++ b/src/modules/asr_module/client/asr_client.py @@ -82,7 +82,7 @@ class ASRClientSync: if not file_path.exists(): raise FileNotFoundError(f"文件不存在: {file_path}") - logger.info(f"📤 上传文件: {file_path.name}") + logger.info(f"上传文件: {file_path.name}") with open(file_path, 'rb') as f: files = {'file': (file_path.name, f, 'audio/wav')} @@ -106,7 +106,7 @@ class ASRClientSync: audio_bytes = f.read() result = client.transcribe_bytes(audio_bytes) """ - logger.info(f"📤 上传字节流 ({len(audio_data)} bytes)") + logger.info(f"上传字节流 ({len(audio_data)} bytes)") files = {'file': (filename, audio_data, 'audio/wav')} result = self._request('POST', '/transcribe', files=files) @@ -171,7 +171,7 @@ class ASRClientAsync: if not file_path.exists(): raise FileNotFoundError(f"文件不存在: {file_path}") - logger.info(f"📤 上传文件: {file_path.name}") + logger.info(f"上传文件: {file_path.name}") async with aiofiles.open(file_path, 'rb') as f: audio_data = await f.read() @@ -180,7 +180,7 @@ class ASRClientAsync: async def transcribe_bytes(self, audio_data: bytes, filename: str = "audio.wav") -> ASRResponse: """异步转录音频字节流""" - logger.info(f"📤 上传字节流 ({len(audio_data)} bytes)") + logger.info(f"上传字节流 ({len(audio_data)} bytes)") await self._ensure_session() # 确保session已创建 form = aiohttp.FormData() # 创建表单数据 form.add_field('file', audio_data, filename=filename, content_type='audio/wav') # 添加文件字段 diff --git a/src/modules/asr_module/start_api.py b/src/modules/asr_module/start_api.py index 2618309..b19eee8 100644 --- a/src/modules/asr_module/start_api.py +++ b/src/modules/asr_module/start_api.py @@ -20,7 +20,7 @@ def first_test() -> None: """首次启动测试""" time.sleep(5) # 给服务器一些启动时间 # 构造一个测试请求以验证初始化模型加载成功 - logger.info("🚀 测试模型是否加载成功...") + logger.info("测试模型是否加载成功...") import requests from pathlib import Path url = "http://localhost:20260/transcribe" @@ -52,7 +52,7 @@ def first_test() -> None: logger.error(f"测试过程中发生错误: {e}") if __name__ == "__main__": - logger.info("🚀 启动 ASR API 服务...") + logger.info("启动 ASR API 服务...") # 在后台线程启动服务器 server_thread = threading.Thread(target=start_server, daemon=True) diff --git a/src/modules/text_ai_module/text_ai_core/general_text_ai_req.py b/src/modules/text_ai_module/text_ai_core/general_text_ai_req.py index 2a0f89a..8642ed6 100644 --- a/src/modules/text_ai_module/text_ai_core/general_text_ai_req.py +++ b/src/modules/text_ai_module/text_ai_core/general_text_ai_req.py @@ -234,7 +234,7 @@ class OpenAIClient(BaseLLMClient): def _normal_chat_completion(self, params): """非流式响应处理""" - logger.info("📡 发送非流式请求...") + logger.info("发送非流式请求...") response = self.client.chat.completions.create(**params) raw_usage = response.usage normalized_usage = normalize_usage( @@ -251,7 +251,7 @@ class OpenAIClient(BaseLLMClient): def _stream_chat_completion(self, params): """流式响应处理""" - logger.info("📡 发送流式请求...") + logger.info("发送流式请求...") response_stream = self.client.chat.completions.create(**params) full_content = "" diff --git a/src/modules/tts_module/tts_core/async_audio_player.py b/src/modules/tts_module/tts_core/async_audio_player.py index fd42b5a..38aa61c 100644 --- a/src/modules/tts_module/tts_core/async_audio_player.py +++ b/src/modules/tts_module/tts_core/async_audio_player.py @@ -49,7 +49,7 @@ class AsyncAudioPlayer: # 读取PCM数据(去掉头部) pcm_data = wav_file.readframes(wav_file.getnframes()) - logger.info(f"📊 解析WAV头: {self.sample_rate}Hz, {self.channels}ch, {self.sampwidth * 8}bit") + logger.info(f"解析WAV头: {self.sample_rate}Hz, {self.channels}ch, {self.sampwidth * 8}bit") # 转换为numpy数组 if self.sampwidth == 2: @@ -68,7 +68,7 @@ class AsyncAudioPlayer: except wave.Error: # 可能是不完整的WAV头,尝试直接播放 - logger.warning("⚠️ WAV头解析失败,尝试直接播放") + logger.warning("WAV头解析失败,尝试直接播放") await self._play_raw(audio_data) return else: @@ -76,7 +76,7 @@ class AsyncAudioPlayer: await self._play_raw(audio_data) except Exception as e: - logger.error(f"❌ 音频块处理失败: {e}") + logger.error(f"音频块处理失败: {e}") async def _play_raw(self, audio_data: bytes): """播放RAW PCM数据""" @@ -90,11 +90,11 @@ class AsyncAudioPlayer: await self.audio_queue.put(audio_array) except Exception as e: - logger.error(f"❌ RAW音频处理失败: {e}") + logger.error(f"RAW音频处理失败: {e}") async def play_worker(self): """后台播放任务""" - logger.info("🎧 音频播放任务启动") + logger.info("音频播放任务启动") while self.is_playing or not self.audio_queue.empty(): try: @@ -103,7 +103,7 @@ class AsyncAudioPlayer: # 延迟初始化音频流(直到获得第一个数据块) if self.stream is None: - logger.info(f"🔊 打开音频输出流: {self.sample_rate}Hz") + logger.info(f"打开音频输出流: {self.sample_rate}Hz") self.stream = sd.OutputStream( samplerate=self.sample_rate, channels=1, @@ -122,10 +122,10 @@ class AsyncAudioPlayer: except asyncio.TimeoutError: continue except Exception as e: - logger.error(f"❌ 播放任务异常: {e}") + logger.error(f"播放任务异常: {e}") break - logger.info("🛑 音频播放任务结束") + logger.info("音频播放任务结束") async def start(self): """启动播放系统""" @@ -154,7 +154,7 @@ class AsyncAudioPlayer: except: break - logger.info("✅ 音频播放已停止") + logger.info("音频播放已停止") async def __aenter__(self): await self.start() diff --git a/src/modules/websocket_base_module/dto/dto_templates/device_command_dto.py b/src/modules/websocket_base_module/dto/dto_templates/device_command_dto.py new file mode 100644 index 0000000..8d290f6 --- /dev/null +++ b/src/modules/websocket_base_module/dto/dto_templates/device_command_dto.py @@ -0,0 +1,19 @@ +""" +设备命令 DTO - 服务端向客户端发送嵌入式设备控制指令 +""" + +from pydantic import Field, BaseModel +from datetime import datetime, timezone + + +class DeviceCommandDataTransferObject(BaseModel): + """设备命令数据传输对象""" + device_id: str = Field(default="", description="目标设备ID") + payload: str = Field(default="", description="JSON-RPC 调用字符串") + + def to_json(self) -> dict: + return { + "type": "device_command", + "timestamp": datetime.now(timezone.utc).timestamp(), + "data": self.model_dump() + } diff --git a/src/modules/websocket_base_module/dto/second_dtos.py b/src/modules/websocket_base_module/dto/second_dtos.py index 822cd85..af6a8dd 100644 --- a/src/modules/websocket_base_module/dto/second_dtos.py +++ b/src/modules/websocket_base_module/dto/second_dtos.py @@ -38,7 +38,9 @@ class JsonDTO(MessageDTO): super().__init__(ws_server) self.receivers : Dict[str, List[ReceiveCallback]] = { 'audio_data' : [], # 音频数据 - 'screenshot_data' : [] # 截图数据 + 'screenshot_data' : [], # 截图数据 + 'device_data' : [], # 嵌入式设备数据(注册/响应/事件) + 'device_command' : [], # 设备控制指令(服务端→客户端) } # 注册json处理callback function ws_server.register_receiver('json', self._handle_json) diff --git a/src/server_core/core.py b/src/server_core/core.py index e54722e..d83ecfd 100644 --- a/src/server_core/core.py +++ b/src/server_core/core.py @@ -25,6 +25,7 @@ import asyncio from typing import Optional, List, Dict, Any from loguru import logger +import json from src.modules.websocket_base_module.dto.third_dtos import ( AudioDataDTO, AudioDataTransferObject, ScreenShotDataDTO, ScreenShotDataTransferObject @@ -43,9 +44,15 @@ from src.server_core.llm_core.llm_core import ( YosugaLLMCore, ModelProvider, LLMCoreAnalysisBase, YosugaAudioResponseData, YosugaUITARSResponseData, - YosugaUITARSRequestData + YosugaUITARSRequestData, YosugaEmbeddedResponseData ) +from src.server_core.yosuga_embedded_server import ( + YosugaServer, ServerConfig +) +from src.server_core.yosuga_embedded_server.device_dto import DeviceDataDTO +from src.server_core.llm_core.llm_core_prompt_manager import YosugaEmbedded + from src.modules.websocket_base_module.dto.dto_templates.auto_agent_data_dto import AutoAgentDataTransferObject from src.config.config import cfg @@ -70,6 +77,66 @@ class YosugaServerCore: llm_core: YosugaLLMCore = None # llm core + embedded_server: YosugaServer # 嵌入式设备管理框架 + device_dto: DeviceDataDTO # 设备数据分发器 + + # @classmethod + # async def get_instance(cls) -> "YosugaServerCore": + # """异步单例工厂""" + # if cls._instance is None: + # async with cls._lock: + # if cls._instance is None: + # logger.info("Initializing YosugaServerCore...") + # # 创建实例 + # instance = cls.__new__(cls) + # + # # 按依赖顺序初始化数据分发器 + # instance.ws_server = await get_ws_server() + # instance.json_dto = await get_json_dto_instance(instance.ws_server) + # instance.audio_dto = AudioDataDTO(instance.json_dto) # 音频分发器 + # instance.audio_dto.register_audio_callback(instance._handle_audio_data) # 注册音频处理函数 + # instance.screenshot_dto = ScreenShotDataDTO(instance.json_dto) # 截图分发器 + # instance.screenshot_dto.register_screenshot_callback(instance._handle_screenshot_data) # 注册截图处理函数 + # + # instance.asr_client = create_asr_client(use_async=True, base_url=cfg.asr.url) + # instance.tts_client = GPTSoVITSClient(host=cfg.tts.host, port=cfg.tts.port, debug=True) + # # 切换GPT_SoVITS模型 + # await instance.tts_client.set_gpt_weights(cfg.tts.gpt_model_name) + # await instance.tts_client.set_sovits_weights(cfg.tts.sovits_model_name) + # + # instance.auto_agent_client = UITarsClient(UITarsClientConfig( + # deployment_type=cfg.auto_agent.deployment_type, + # base_url=cfg.auto_agent.base_url, + # model_name=cfg.auto_agent.model_name, + # temperature=cfg.auto_agent.temperature, + # max_tokens=cfg.auto_agent.max_tokens + # )) + # + # instance.llm_core = YosugaLLMCore( + # model_config=ModelConfig( # TODO 同上 + # provider=ModelProvider.OPENAI, + # model_name=cfg.ai.model_name, + # base_url=cfg.ai.base_url, + # api_key=cfg.ai.api_key, + # temperature=cfg.ai.temperature, + # max_tokens=cfg.ai.max_tokens + # ), + # core_config=LLMCoreConfig( # TODO 同上 + # max_context_tokens=cfg.llm_core.max_context_tokens, + # enable_history=cfg.llm_core.enable_history, + # role_setting=cfg.llm_core.role_character, + # language=cfg.llm_core.language, # 回复使用语言 + # auto_dispatch=True, + # dispatch_async=True # 启用异步分发 + # ) + # ) + # instance.register_llm_core_analysis() # 注册解析器 + # instance.register_llm_core_action() # 注册分发器 + # instance.llm_core.register_overflow_handler(instance._handle_overflow_logger) # 注册上下文溢出处理回调 + # + # cls._instance = instance + # logger.success("YosugaServerCore initialized") + # return cls._instance @classmethod async def get_instance(cls) -> "YosugaServerCore": """异步单例工厂""" @@ -77,57 +144,141 @@ class YosugaServerCore: async with cls._lock: if cls._instance is None: logger.info("Initializing YosugaServerCore...") + + # 强制初始化配置 + from src.config.config import _ensure_initialized + from dataclasses import asdict, is_dataclass + + real_cfg = _ensure_initialized() + + # 辅助函数:递归转换为 dict + def to_dict(obj): + if isinstance(obj, dict): + return obj + if is_dataclass(obj) and not isinstance(obj, type): + return asdict(obj) + return {} + + # 提取各个配置段并转换为 dict(关键修复) + cfg_dict = { + 'ai': to_dict(getattr(real_cfg, 'ai', {})), + 'tts': to_dict(getattr(real_cfg, 'tts', {})), + 'asr': to_dict(getattr(real_cfg, 'asr', {})), + 'auto_agent': to_dict(getattr(real_cfg, 'auto_agent', {})), + 'llm_core': to_dict(getattr(real_cfg, 'llm_core', {})), + } + + logger.debug(f"配置提取完成: ai={type(cfg_dict['ai'])}, tts={type(cfg_dict['tts'])}") + # 创建实例 instance = cls.__new__(cls) # 按依赖顺序初始化数据分发器 instance.ws_server = await get_ws_server() instance.json_dto = await get_json_dto_instance(instance.ws_server) - instance.audio_dto = AudioDataDTO(instance.json_dto) # 音频分发器 - instance.audio_dto.register_audio_callback(instance._handle_audio_data) # 注册音频处理函数 - instance.screenshot_dto = ScreenShotDataDTO(instance.json_dto) # 截图分发器 - instance.screenshot_dto.register_screenshot_callback(instance._handle_screenshot_data) # 注册截图处理函数 + instance.audio_dto = AudioDataDTO(instance.json_dto) + instance.audio_dto.register_audio_callback(instance._handle_audio_data) + instance.screenshot_dto = ScreenShotDataDTO(instance.json_dto) + instance.screenshot_dto.register_screenshot_callback(instance._handle_screenshot_data) - instance.asr_client = create_asr_client(use_async=True, base_url=cfg.asr.url) - instance.tts_client = GPTSoVITSClient(host=cfg.tts.host, port=cfg.tts.port, debug=True) - # 切换GPT_SoVITS模型 - await instance.tts_client.set_gpt_weights(cfg.tts.gpt_model_name) - await instance.tts_client.set_sovits_weights(cfg.tts.sovits_model_name) + # ASR 客户端 + asr_cfg = cfg_dict.get('asr', {}) + instance.asr_client = create_asr_client( + use_async=True, + base_url=asr_cfg.get('url', 'http://localhost:20260/') + ) + # TTS 客户端 + tts_cfg = cfg_dict.get('tts', {}) + instance.tts_client = GPTSoVITSClient( + host=tts_cfg.get('host', 'localhost'), + port=tts_cfg.get('port', 20261), + debug=True + ) + + # 切换 GPT_SoVITS 模型 + # await instance.tts_client.set_gpt_weights( + # tts_cfg.get('gpt_model_name', 'GPT_weights_v2Pro/Yosuga_Airi-e32.ckpt') + # ) + # await instance.tts_client.set_sovits_weights( + # tts_cfg.get('sovits_model_name', 'SoVITS_weights_v2Pro/Yosuga_Airi_e16_s864.pth') + # ) + + # Auto Agent 客户端 + auto_cfg = cfg_dict.get('auto_agent', {}) instance.auto_agent_client = UITarsClient(UITarsClientConfig( - deployment_type=cfg.auto_agent.deployment_type, - base_url=cfg.auto_agent.base_url, - model_name=cfg.auto_agent.model_name, - temperature=cfg.auto_agent.temperature, - max_tokens=cfg.auto_agent.max_tokens + deployment_type=auto_cfg.get('deployment_type', 'lmstudio'), + base_url=auto_cfg.get('base_url', 'http://localhost:1234/v1'), + model_name=auto_cfg.get('model_name', 'ui-tars-1.5-7b@q4_k_m'), + temperature=auto_cfg.get('temperature', 0.1), + max_tokens=auto_cfg.get('max_tokens', 16384) )) + # LLM Core + ai_cfg = cfg_dict.get('ai', {}) + llm_cfg = cfg_dict.get('llm_core', {}) + instance.llm_core = YosugaLLMCore( - model_config=ModelConfig( # TODO 同上 + model_config=ModelConfig( provider=ModelProvider.OPENAI, - model_name=cfg.ai.model_name, - base_url=cfg.ai.base_url, - api_key=cfg.ai.api_key, - temperature=cfg.ai.temperature, - max_tokens=cfg.ai.max_tokens + model_name=ai_cfg.get('model_name', 'qwen/qwen3-4b-2507'), + base_url=ai_cfg.get('base_url', 'http://localhost:1234/v1'), + api_key=ai_cfg.get('api_key'), + temperature=ai_cfg.get('temperature', 0.4), + max_tokens=ai_cfg.get('max_tokens', 8192) ), - core_config=LLMCoreConfig( # TODO 同上 - max_context_tokens=cfg.llm_core.max_context_tokens, - enable_history=cfg.llm_core.enable_history, - role_setting=cfg.llm_core.role_character, - language=cfg.llm_core.language, # 回复使用语言 + core_config=LLMCoreConfig( + max_context_tokens=llm_cfg.get('max_context_tokens', 2048), + enable_history=llm_cfg.get('enable_history', True), + role_setting=llm_cfg.get('role_character', + '你是由Misakiotoha开发的助手稲葉愛理ちゃん,可以和用户一起玩游戏,聊天,做各种事情,性格抽象,没事爱整整活。'), + language=llm_cfg.get('language', '中文'), auto_dispatch=True, - dispatch_async=True # 启用异步分发 + dispatch_async=True ) ) - instance.register_llm_core_analysis() # 注册解析器 - instance.register_llm_core_action() # 注册分发器 - instance.llm_core.register_overflow_handler(instance._handle_overflow_logger) # 注册上下文溢出处理回调 + + # 注册 YosugaEmbedded 提示词模块 + instance.llm_core.register_prompt_module(YosugaEmbedded()) + logger.info("[Core] 嵌入式设备提示词模块已注册") + + # 初始化嵌入式设备管理框架 + instance.embedded_server = YosugaServer( + config=ServerConfig( + device_conflict_strategy="rename", + max_concurrent_calls=10, + device_timeout=30.0, + ) + ) + instance.device_dto = DeviceDataDTO( + instance.json_dto, instance.embedded_server + ) + # 当 YosugaServer 需要发送 RPC 到设备时,通过 WebSocket 发出 device_command + instance.embedded_server.on_device_message = ( + instance._on_device_message + ) + # 当设备能力变更时,更新 LLM 系统提示词中的状态表 + instance.embedded_server.on_capabilities_changed = ( + instance._on_capabilities_changed + ) + logger.success("[Core] 嵌入式设备管理框架已初始化") + + # 注册设备 RPC 响应回调(设备结果回来后喂回 LLM) + instance.device_dto.register_device_callback( + instance._on_device_rpc_response + ) + instance._pending_rpc: Optional[dict] = None + + instance.register_llm_core_analysis() + instance.register_llm_core_action() + instance.llm_core.register_overflow_handler(instance._handle_overflow_logger) cls._instance = instance logger.success("YosugaServerCore initialized") + return cls._instance + def register_llm_core_action(self): """ 注册llm_core的分发器 @@ -137,6 +288,7 @@ class YosugaServerCore: self.llm_core.register_action_handler("audio_text", self._handle_audio_response, is_async=True) self.llm_core.register_action_handler("auto_agent", self._handle_auto_agent, is_async=True) self.llm_core.register_action_handler("call_auto_agent", self._handle_call_auto_agent, is_async=True) + self.llm_core.register_action_handler("embedded_control", self._handle_embedded_control, is_async=True) self.llm_core.set_fallback_handler(self._handle_fallback) def register_llm_core_analysis(self): @@ -148,6 +300,7 @@ class YosugaServerCore: self.llm_core.register_analysis_model(YosugaAudioResponseData) self.llm_core.register_analysis_model(YosugaUITARSResponseData) self.llm_core.register_analysis_model(YosugaUITARSRequestData) + self.llm_core.register_analysis_model(YosugaEmbeddedResponseData) def _handle_overflow_logger(self, history: List[Any], metadata: Dict[str, Any]): """上下文溢出记录,仅打印日志""" @@ -211,14 +364,23 @@ class YosugaServerCore: try: # 使用最快模式流式输出 chunk_count = 0 + # async for chunk in await self.tts_client.tts( + # text=data.response_text, + # ref_audio_path="uploaded_audio/test_voice.wav", # TODO 需要替换成config或者后续设计情感系统 + # text_lang="ja", + # prompt_lang="ja", + # prompt_text="もう!こんなところで何やってるんだよ!", # 参考语音的真实文本 + # streaming_mode=StreamingMode.FASTEST, # 模式3:快速流式 + # media_type="wav" + # ): async for chunk in await self.tts_client.tts( - text=data.response_text, - ref_audio_path="uploaded_audio/test_voice.wav", # TODO 需要替换成config或者后续设计情感系统 - text_lang="ja", - prompt_lang="ja", - prompt_text="もう!こんなところで何やってるんだよ!", # 参考语音的真实文本 - streaming_mode=StreamingMode.FASTEST, # 模式3:快速流式 - media_type="wav" + text=data.response_text, + ref_audio_path="uploaded_audio/kq.wav", # TODO 需要替换成config或者后续设计情感系统 + text_lang="zh", + prompt_lang="zh", + prompt_text="电闪雷鸣虽然有点吓人,但璃月港的防雷防火工事是一流的,不用担心。", # 参考语音的真实文本 + streaming_mode=StreamingMode.FASTEST, # 模式3:快速流式 + media_type="wav" ): chunk_count += 1 # print(f"🎵 收到音频块 #{chunk_count}: {len(chunk.audio_data)} bytes") @@ -245,7 +407,7 @@ class YosugaServerCore: text=data.response_text ) ) - print(f"✅ 流式TTS完成!共{chunk_count}个音频块") + print(f"流式TTS完成!共{chunk_count}个音频块") # 构造音频尾包发送给客户端(虚假的音频数据) await self.audio_dto.send_audio_data( AudioDataTransferObject( @@ -258,7 +420,7 @@ class YosugaServerCore: ) ) except Exception as e: - print(f"❌ 流式错误: {e}") + print(f"流式错误: {e}") return {"status": "success", "executed": data.response_text} return None @@ -284,6 +446,125 @@ class YosugaServerCore: await self.screenshot_dto.send_screenshot_data(ScreenShotDataTransferObject(LLMResponse=data.llm_translation)) return {"status": "success", "executed": data.type} + async def _handle_embedded_control(self, data: YosugaEmbeddedResponseData): + """ + llm_core异步处理器:嵌入式设备控制 + 将LLM输出的 JSON-RPC 调用列表交由 YosugaServer 框架处理并路由到对应设备 + """ + logger.info(f"Handling embedded control: {len(data.calls)} calls") + + results = self.embedded_server.process_ai_response(json.dumps(data.calls)) + logger.info(f"Embedded control results: {results}") + + # 保存 pending RPC 信息,等设备异步响应回来后喂回 LLM + if results and len(results) > 0: + first_call = results[0] + self._pending_rpc = { + "device_id": first_call.get("device_id"), + "method": first_call.get("method"), + "call_id": first_call.get("id"), + "original_response_text": data.response_text or "", + } + + # 如果 LLM 同时返回了需要回复用户的文本,通过 TTS 播报 + if data.response_text: + try: + chunk_count = 0 + # async for chunk in await self.tts_client.tts( + # text=data.response_text, + # ref_audio_path="uploaded_audio/test_voice.wav", + # text_lang="ja", + # prompt_lang="ja", + # prompt_text="もう!こんなところで何やってるんだよ!", + # streaming_mode=StreamingMode.FASTEST, + # media_type="wav" + # ): + async for chunk in await self.tts_client.tts( + text=data.response_text, + ref_audio_path="uploaded_audio/kq.wav", # TODO 需要替换成config或者后续设计情感系统 + text_lang="zh", + prompt_lang="zh", + prompt_text="电闪雷鸣虽然有点吓人,但璃月港的防雷防火工事是一流的,不用担心。", # 参考语音的真实文本 + streaming_mode=StreamingMode.FASTEST, # 模式3:快速流式 + media_type="wav" + ): + chunk_count += 1 + if chunk_count == 1: + await self.audio_dto.send_audio_data( + AudioDataTransferObject( + data=chunk.audio_data, + isStream=True, isStart=True, + sequence=chunk_count, isEnd=False, + text=data.response_text + ) + ) + else: + await self.audio_dto.send_audio_data( + AudioDataTransferObject( + data=chunk.audio_data, + isStream=True, isStart=False, + sequence=chunk_count, isEnd=False, + text=data.response_text + ) + ) + await self.audio_dto.send_audio_data( + AudioDataTransferObject( + data=b"0", + isStream=True, isStart=False, + sequence=chunk_count + 1, isEnd=True, + text=data.response_text + ) + ) + except Exception as e: + logger.error(f"Embedded control TTS error: {e}") + + return {"status": "success", "calls": len(data.calls)} + + def _on_device_rpc_response(self, device_id: str, payload: dict): + """DeviceDataDTO 回调:设备 RPC 响应回来时触发,喂回 LLM""" + if self._pending_rpc and self._pending_rpc.get("device_id") == device_id: + call_id = payload.get("id") + if call_id is None or call_id == self._pending_rpc.get("call_id"): + pending = self._pending_rpc + self._pending_rpc = None + asyncio.create_task(self._continue_with_device_result(device_id, payload, pending)) + + async def _continue_with_device_result(self, device_id: str, payload: dict, pending: dict): + """设备 RPC 结果回来后,喂回 LLM 生成最终回复并 TTS""" + method = pending.get("method", "unknown") + original_text = pending.get("original_response_text", "") + + result_str = json.dumps(payload.get("result", payload), ensure_ascii=False) + followup_input = ( + f"你之前请求设备 {device_id} 执行了 {method} 操作," + f"现在设备返回了结果:{result_str}。\n" + f"你之前的回复是:'{original_text}'\n" + f"请基于设备返回的实际结果,用自然语言重新组织回复,告诉用户结果。" + ) + + try: + llm_result = await self.llm_core.interact(user_input={"text": followup_input}) + logger.info(f"[Core] 设备结果回送 LLM 完成: {llm_result}") + except Exception as e: + logger.error(f"[Core] 设备结果回送 LLM 失败: {e}") + + def _on_device_message(self, device_id: str, rpc_call: str) -> Optional[str]: + """YosugaServer 的设备消息回调:通过 WebSocket 发送 RPC 到客户端""" + logger.info(f"[Core] 发送设备命令到 {device_id}") + asyncio.create_task(self.device_dto.send_device_command(device_id, rpc_call)) + return None + + def _on_capabilities_changed(self, capabilities: dict): + """设备能力变更回调:更新 LLM 系统提示词中的状态表""" + functions_str = json.dumps(capabilities.get("functions", []), ensure_ascii=False, indent=2) + device_str = json.dumps(capabilities.get("devices", {}), ensure_ascii=False, indent=2) + state_table = ( + f"【当前在线设备】\n{device_str}\n\n" + f"【设备可用函数】\n{functions_str}" + ) + self.llm_core.core_config.system_state_table = state_table + logger.info(f"[Core] 系统状态表已更新 | 设备: {capabilities.get('device_count', 0)} 台 | 函数: {capabilities.get('function_count', 0)} 个") + def _handle_fallback(self, data: LLMCoreAnalysisBase): """ llm_core同步处理器:回退处理器 diff --git a/src/server_core/llm_core/llm_core.py b/src/server_core/llm_core/llm_core.py index 9390dab..c49110d 100644 --- a/src/server_core/llm_core/llm_core.py +++ b/src/server_core/llm_core/llm_core.py @@ -17,6 +17,7 @@ from src.modules.text_ai_module.text_ai_core.general_text_ai_req import ( ) from src.server_core.llm_core.llm_core_analysis import ( LLMCoreAnalysisManager, LLMCoreAnalysisBase, YosugaAudioResponseData, YosugaUITARSResponseData, YosugaUITARSRequestData + , YosugaEmbeddedResponseData ) from src.server_core.llm_core.llm_core_dispatcher import LLMCoreActionDispatcher from src.server_core.llm_core.llm_core_prompt_manager import ( diff --git a/src/server_core/llm_core/llm_core_analysis.py b/src/server_core/llm_core/llm_core_analysis.py index f768423..0ef19fb 100644 --- a/src/server_core/llm_core/llm_core_analysis.py +++ b/src/server_core/llm_core/llm_core_analysis.py @@ -249,18 +249,25 @@ class YosugaLive2DResponseData(LLMCoreAnalysisBase): class YosugaEmbeddedResponseData(LLMCoreAnalysisBase): """ - 嵌入式设备场景的LLM输出数据模型 TODO + 嵌入式设备场景的LLM输出数据模型 + LLM输出 JSON-RPC 风格的函数调用,由服务端解析并路由到对应设备 """ type: str = Field(default="embedded_control", description="固定为embedded_control") - device_id: str = Field(..., description="设备ID") - command: str = Field(..., description="控制指令") - params: Optional[Dict[str, Any]] = Field(default=None, description="参数") + calls: list[dict] = Field(default_factory=list, description="JSON-RPC 调用列表,每项含 method/params/id") + response_text: str = Field(default="", description="同时回复给用户的文本(可选)") @classmethod def type_(cls) -> str: return "embedded_control" + def to_dict(self) -> dict: + return { + "type": self.type, + "calls": self.calls, + "response_text": self.response_text + } + # 使用示例 if __name__ == "__main__": from loguru import logger diff --git a/src/server_core/llm_core/llm_core_prompt_manager.py b/src/server_core/llm_core/llm_core_prompt_manager.py index 8adaed8..116efd5 100644 --- a/src/server_core/llm_core/llm_core_prompt_manager.py +++ b/src/server_core/llm_core/llm_core_prompt_manager.py @@ -9,8 +9,10 @@ from pydantic import BaseModel, Field, field_validator from typing import Callable, List, Optional, Coroutine, Any, ClassVar, Dict from src.server_core.llm_core.llm_core_prompts import YOSUGA_SYSTEM_PROMPT_SCH + class LLMCorePromptBase(BaseModel, ABC): """LLM 提示词基类:定义输入输出结构""" + @abstractmethod def type(self) -> str: """返回该提示词类型的唯一标识""" @@ -62,8 +64,10 @@ class LLMCorePromptManager(LLMCorePromptBase): for type_id, son in self._registry.items() ) + class YosugaAudioASRText(LLMCorePromptBase): """音频ASR文本输入场景""" + def type(self) -> str: return "用户语音asr信息" @@ -92,19 +96,50 @@ class YosugaAudioASRText(LLMCorePromptBase): - `action`: 触发的动作指令,如"wave_hand"、"nod"等,"none"表示无动作 ''' + class YosugaEmbedded(LLMCorePromptBase): - """嵌入式设备输入场景""" + """嵌入式设备控制场景""" + def type(self) -> str: - pass + return "嵌入式设备信息" def describe_input(self) -> str: - pass + return ''' + 当嵌入式设备有数据上报或客户端发来设备状态时,你会收到以下格式: + { + "device_event": { + "device_id": "设备ID", + "event": "事件内容" + } + } + 但大多数情况下,你只需参考系统状态表中的设备函数表来决策。 + ''' def describe_output(self) -> str: - pass + return ''' + 当你需要控制嵌入式设备时,按以下JSON格式返回: + { + "type": "固定为embedded_control", + "calls": [ + { + "method": "函数名(来自系统状态表中的设备能力表)", + "params": { "参数名": 参数值 } + } + ], + "response_text": "同时回复给用户的文本说明(可选,可留空)" + } + - `calls`: JSON-RPC 调用列表,每个调用对应一个设备函数 + - `method`: 必须来自系统状态表中列出的可用函数名 + - `params`: 按函数定义的参数传入,可省略无参数的调用 + - `response_text`: 可选,若同时需要回复用户可在此填写 + 注意:仅当用户意图涉及现实世界控制时,才需要返回 embedded_control。 + 如果只是聊天,只需返回 audio_text。 + ''' + class YosugaUITARS(LLMCorePromptBase): """自动化操作构建场景""" + def type(self) -> str: return "自动化操作信息" @@ -140,12 +175,14 @@ class YosugaUITARS(LLMCorePromptBase): } 自动化agent返回的操作信息不一定包括JSON的全部字段,例如某次返回只有key的内容,或者只有content的内容。 针对自动化agent操作输入的返回,若没有相关内容可以留空相关字段,请不要省略掉任何字段名称。 - + 注意:自动化agent的状态可见YosugaSystemState表。 ''' + class YosugaLive2DControl(LLMCorePromptBase): """对Yosuga Live2D控制场景""" + def type(self) -> str: return "Yosuga Live2D控制信息" @@ -168,7 +205,7 @@ if __name__ == "__main__": OutputInfo=manager.describe_output(), RoleSetting="...", Language="ja", - Memory = "", + Memory="", SystemStateTable="" ) print(system_prompt) \ No newline at end of file diff --git a/src/server_core/llm_core/llm_core_token.py b/src/server_core/llm_core/llm_core_token.py index 8ec422c..5a3e7ad 100644 --- a/src/server_core/llm_core/llm_core_token.py +++ b/src/server_core/llm_core/llm_core_token.py @@ -60,7 +60,7 @@ class TokenManager: ) def _get_tokenizer(self, model_name: str) -> tiktoken.Encoding: - """获取 tokenizer(与之前实现相同,省略重复代码)""" + """获取 tokenizer""" model_tokenizer_map = { "qwen": "gpt-3.5-turbo", "llama": "gpt-3.5-turbo", @@ -178,7 +178,7 @@ class TokenManager: ) def count_text_tokens(self, text: str) -> int: - """计算单段文本的 token 数量(与之前相同)""" + """计算单段文本的 token 数量""" if not isinstance(text, str) or not text: return 0 return len(self.tokenizer.encode(text)) @@ -188,7 +188,7 @@ class TokenManager: messages: List[Any], tokens_per_message: int = 3 ) -> int: - """计算消息列表的总 token 数量(与之前相同,优化实现)""" + """计算消息列表的总 token 数量""" if not messages: return 0 @@ -311,7 +311,7 @@ class TokenManager: ) def get_tokenizer_info(self) -> TokenizerInfo: - """获取当前 tokenizer 的详细信息(与之前相同)""" + """获取当前 tokenizer 的详细信息""" if "cl100k_base" in self.tokenizer.name and "gpt-3.5" not in self.model_name: accuracy = "low" elif self.model_name in self.tokenizer.name: @@ -332,7 +332,7 @@ class TokenManager: limit: int, threshold: float = 0.85 ) -> bool: - """判断 token 使用量是否接近限制(与之前相同)""" + """判断 token 使用量是否接近限制""" return current_tokens > limit * threshold def calculate_chunk_size( @@ -340,7 +340,7 @@ class TokenManager: available_tokens: int, safety_margin: float = 0.1 ) -> int: - """计算安全的消息块大小(与之前相同)""" + """计算安全的消息块大小""" return int(available_tokens * (1 - safety_margin)) def clear_api_usage_cache(self): diff --git a/src/server_core/yosuga_embedded_server/__init__.py b/src/server_core/yosuga_embedded_server/__init__.py new file mode 100644 index 0000000..f3ed54d --- /dev/null +++ b/src/server_core/yosuga_embedded_server/__init__.py @@ -0,0 +1,16 @@ +""" +Yosuga 服务端 - 面向AI的嵌入式设备JSON-RPC框架。 + +管理多个嵌入式设备,每个设备暴露AI可调用的函数。 +服务端维护全局函数注册表、生成描述可用能力的AI提示词, +并在AI和设备之间路由JSON-RPC调用。 +""" + +from .device_manager import DeviceManager, DeviceInfo, DeviceState +from .function_registry import FunctionRegistry, FunctionInfo, ParamInfo, FuncType +from .ai_prompt import AIPromptBuilder +from .json_rpc import JSONRPCHandler, RPCRequest, RPCResponse, RPCError +from .server import YosugaServer, ServerConfig +from .device_dto import DeviceDataDTO + +__version__ = "0.1.0" diff --git a/src/server_core/yosuga_embedded_server/ai_prompt.py b/src/server_core/yosuga_embedded_server/ai_prompt.py new file mode 100644 index 0000000..72c9fa2 --- /dev/null +++ b/src/server_core/yosuga_embedded_server/ai_prompt.py @@ -0,0 +1,169 @@ +""" +AI提示词构建器 - 构造用于AI函数调用的结构化提示词。 + +提示词引导AI输出有效的JSON-RPC调用, +服务端可将这些调用转发到相应的嵌入式设备。 +""" + +import json +from typing import Optional + + +# -- 约束AI输出的系统提示词模板 -- +SYSTEM_PROMPT_TEMPLATE = """你是一个通过函数调用来控制嵌入式设备的AI助手。 + +你可以访问所有已连接设备上的以下函数: + +{functions_table} + +指令: +1. 你可以同时调用一个或多个函数。 +2. 要调用函数,请以JSON数组格式返回RPC调用: + {calls_syntax_example} +3. 始终为所有必填参数提供有效值。 +4. 对于可选参数,仅在需要时包含它们。 +5. 如果函数返回数据,结果将回传给你。 + +规则: +- 只能调用上面列出的函数。 +- 不要虚构函数名或参数。 +- 如果需要的功能没有对应的函数支持,请说明你需要的功能。 +- 如果进行函数调用,只返回JSON数组(不要额外文字)。 +- 如果需要提问或提供信息,添加"_explanation"字段。 + +函数调用的响应格式: +```json +[ + {{ + "jsonrpc": "2.0", + "method": "function_name", + "params": {{ "param1": value1, "param2": value2 }}, + "id": 1 + }} +] +``` + +设备路由: 每个函数属于特定设备(显示为@device_name)。 +系统会自动将调用路由到正确的设备。 +""" + + +FUNCTION_ENTRY_TEMPLATE = """### {name} (类型: {type}) @ {device_name} +描述: {description} +参数: +{params} +""" + +PARAM_ENTRY_TEMPLATE = " - {name} ({type}{optional}): {description}" + + +CALLS_SYNTAX_EXAMPLE = """[ + { + "jsonrpc": "2.0", + "method": "function_name", + "params": { + "param1": "value1", + "param2": 42 + }, + "id": 1 + } +]""" + + +class AIPromptBuilder: + """从函数注册表内容构建AI提示词。 + + 提示词设计目标: + - 清晰列出所有可用函数及其描述 + - 显示每个函数由哪个设备提供 + - 约束AI输出有效的JSON-RPC + - 支持多调用响应 + """ + + def __init__(self): + self._system_prompt = SYSTEM_PROMPT_TEMPLATE + + def set_system_prompt(self, prompt: str): + self._system_prompt = prompt + + def build_system_prompt(self, functions: list[dict]) -> str: + """用当前函数表构建完整的系统提示词。 + + Args: + functions: 来自FunctionRegistry的函数信息字典列表 + + Returns: + 准备发送给AI的完整系统提示词字符串。 + """ + if not functions: + return "当前没有已连接的嵌入式设备,无可用函数。" + + table = self._format_functions_table(functions) + return self._system_prompt.format( + functions_table=table, + calls_syntax_example=CALLS_SYNTAX_EXAMPLE, + ) + + def _format_functions_table(self, functions: list[dict]) -> str: + """将函数列表格式化为可读的函数表。""" + entries = [] + for func in functions: + params_str = self._format_params(func.get("params", [])) + entry = FUNCTION_ENTRY_TEMPLATE.format( + name=func["name"], + type=func.get("type", "ctrl_noret"), + device_name=func.get("device_name", "unknown"), + description=func.get("description", "No description"), + params=params_str if params_str else " (none)", + ) + entries.append(entry) + return "\n".join(entries) + + def _format_params(self, params: list[dict]) -> str: + lines = [] + for p in params: + optional_str = ", optional" if p.get("optional") else "" + lines.append(PARAM_ENTRY_TEMPLATE.format( + name=p.get("name", "?"), + type=p.get("type", "unknown"), + optional=optional_str, + description=p.get("description", ""), + )) + return "\n".join(lines) + + def parse_ai_response(self, response_text: str) -> Optional[list[dict]]: + """从AI响应文本中提取JSON-RPC调用数组。 + + AI可能将JSON包裹在markdown代码块中,或直接输出原始JSON。 + 此方法处理两种情况。 + + Args: + response_text: AI返回的原始文本 + + Returns: + RPC请求字典列表,解析失败时返回None + """ + text = response_text.strip() + + # 尝试从markdown代码块中提取 + if "```json" in text: + start = text.find("```json") + 7 + end = text.find("```", start) + if end > start: + text = text[start:end].strip() + elif "```" in text: + start = text.find("```") + 3 + end = text.find("```", start) + if end > start: + text = text[start:end].strip() + + try: + data = json.loads(text) + except json.JSONDecodeError: + return None + + if isinstance(data, dict): + return [data] + if isinstance(data, list): + return data + return None diff --git a/src/server_core/yosuga_embedded_server/device_dto.py b/src/server_core/yosuga_embedded_server/device_dto.py new file mode 100644 index 0000000..d981b81 --- /dev/null +++ b/src/server_core/yosuga_embedded_server/device_dto.py @@ -0,0 +1,117 @@ +""" +设备数据 DTO - 处理客户端发来的嵌入式设备数据 +通过 WebSocket 的 device_data 类型消息与客户端通信 +""" + +import json +import time +from typing import Callable, Optional, Coroutine +from loguru import logger + + +class DeviceDataDTO: + """设备数据分发器,处理客户端发来的设备注册/响应/事件""" + + def __init__(self, json_dto, embedded_server): + """ + Args: + json_dto: JsonDTO 实例,用于注册接收器 + embedded_server: YosugaServer 实例(嵌入式框架) + """ + json_dto.register_receiver("device_data", self._handle_device_data) + logger.info("[DeviceDataDTO] 设备数据接收业务已注册") + self.json_dto = json_dto + self.embedded_server = embedded_server + self._device_callbacks: list[Callable] = [] + self.on_rpc_response: Optional[Callable[[str, dict], Coroutine]] = None + + async def _handle_device_data(self, data: dict): + """处理设备数据的入口 + + 客户端发来的 device_data JSON 格式: + { + "action": "register" | "rpc_response" | "event", + "device_id": "...", + "payload": { ... } # 根据 action 不同而不同 + } + """ + action = data.get("action", "") + logger.info(f"[DeviceDataDTO] 收到设备数据, action={action}") + + if action == "register": + device_id = data.get("device_id", "") + await self._handle_register(data.get("payload", {}), device_id) + elif action == "rpc_response": + await self._handle_rpc_response( + data.get("device_id", ""), + data.get("payload", {}) + ) + elif action == "event": + await self._handle_device_event( + data.get("device_id", ""), + data.get("payload", {}) + ) + else: + logger.warning(f"[DeviceDataDTO] 未知的 action: {action}") + + async def _handle_register(self, payload: dict, device_id: str = ""): + """处理设备注册 + + payload 格式同 YosugaServer 的设备能力描述: + { + "device": { "name": "...", "description": "..." }, + "functions": [ { "name": "...", "type": "...", ... } ] + } + """ + try: + device = self.embedded_server.register_device_from_dict(payload, device_id) + logger.success(f"[DeviceDataDTO] 设备注册成功: {device.name} ({device.device_id})") + except Exception as e: + logger.error(f"[DeviceDataDTO] 设备注册失败: {e}") + + async def _handle_rpc_response(self, device_id: str, payload: dict): + """处理设备返回的 RPC 响应""" + logger.info(f"[DeviceDataDTO] 收到设备 {device_id} 的 RPC 响应: {payload}") + + if self.on_rpc_response: + try: + await self.on_rpc_response(device_id, payload) + except Exception as e: + logger.error(f"[DeviceDataDTO] on_rpc_response 回调错误: {e}") + + for cb in self._device_callbacks: + try: + cb(device_id, payload) + except Exception as e: + logger.error(f"[DeviceDataDTO] 回调执行错误: {e}") + + async def _handle_device_event(self, device_id: str, payload: dict): + """处理设备主动上报的事件""" + logger.info(f"[DeviceDataDTO] 收到设备 {device_id} 的事件: {payload}") + + def register_device_callback(self, callback: Callable) -> None: + """注册设备消息回调""" + self._device_callbacks.append(callback) + + async def send_device_command(self, device_id: str, rpc_call: str) -> None: + """向客户端发送设备控制命令 + + 发送给客户端的 JSON 格式: + { + "type": "device_command", + "data": { + "device_id": "...", + "payload": "{\\"jsonrpc\\": \\"2.0\\", ...}" + } + } + """ + payload = { + "device_id": device_id, + "payload": rpc_call + } + await self.json_dto.send_json({ + "type": "device_command", + "timestamp": time.time(), + "data": payload + }) + logger.info(f"[DeviceDataDTO] 已发送设备命令到 {device_id}") diff --git a/src/server_core/yosuga_embedded_server/device_manager.py b/src/server_core/yosuga_embedded_server/device_manager.py new file mode 100644 index 0000000..c79bc53 --- /dev/null +++ b/src/server_core/yosuga_embedded_server/device_manager.py @@ -0,0 +1,233 @@ +""" +设备管理器 - 管理多个嵌入式设备的连接。 + +每个设备由唯一ID标识(自动生成或客户端提供)。 +管理器处理设备注册、重复检测,并维护设备ID与其功能表的映射。 +""" + +import time +import threading +from enum import Enum +from typing import Optional, Callable, Any + + +class DeviceState(Enum): + """设备连接的当前状态。""" + DISCONNECTED = "disconnected" + CONNECTED = "connected" + REGISTERED = "registered" + + +class DeviceInfo: + """嵌入式设备的信息。 + + 存储设备的能力、身份和连接元数据。 + 除 device_id 外,所有字段均由设备自身提供。 + """ + + def __init__( + self, + device_id: str, + name: str = "", + description: str = "", + firmware_version: str = "", + hardware_version: str = "", + functions: Optional[list] = None, + ): + self.device_id = device_id + self.name = name + self.description = description + self.firmware_version = firmware_version + self.hardware_version = hardware_version + self.functions = functions or [] + self.state = DeviceState.DISCONNECTED + self.last_seen: float = 0.0 + self.connected_at: float = 0.0 + + def to_dict(self) -> dict: + return { + "device_id": self.device_id, + "name": self.name, + "description": self.description, + "firmware_version": self.firmware_version, + "hardware_version": self.hardware_version, + "state": self.state.value, + "last_seen": self.last_seen, + "connected_at": self.connected_at, + "register_time": self.connected_at, + "functions": self.functions, + "function_count": len(self.functions), + } + + def __repr__(self) -> str: + return f"DeviceInfo({self.device_id}, {self.name}, state={self.state.value})" + + +class DeviceManager: + """管理所有连接到服务端的嵌入式设备。 + + 功能: + - 未提供时自动生成设备ID + - 可配置策略处理重复设备名 + - 跟踪连接状态和最后活跃时间戳 + - 提供设备状态变更的回调 + """ + + class ConflictStrategy(Enum): + """处理重复设备名的策略。""" + REJECT = "reject" + RENAME = "rename" + REPLACE = "replace" + + def __init__(self, conflict_strategy: str = "rename"): + self._devices: dict[str, "DeviceInfo"] = {} + self._lock = threading.Lock() + self._next_id = 1 + self._conflict_strategy = DeviceManager.ConflictStrategy(conflict_strategy) + self._on_device_change: Optional[Callable[[str, "DeviceInfo"], Any]] = None + + @property + def on_device_change(self) -> Optional[Callable[[str, "DeviceInfo"], Any]]: + """设备添加、更新或移除时的回调。 + 签名: callback(event_type: str, device: DeviceInfo) + event_type: 'added', 'updated', 'removed' + """ + return self._on_device_change + + @on_device_change.setter + def on_device_change(self, cb: Optional[Callable[[str, "DeviceInfo"], Any]]): + self._on_device_change = cb + + def _notify(self, event: str, device: "DeviceInfo"): + if self._on_device_change: + try: + self._on_device_change(event, device) + except Exception: + pass + + def _generate_id(self) -> str: + """生成唯一的设备ID。""" + while True: + dev_id = f"device_{self._next_id}" + self._next_id += 1 + if dev_id not in self._devices: + return dev_id + + def register_from_json(self, json_data: dict) -> "DeviceInfo": + """从设备的能力描述JSON注册或更新设备。 + + 期望的JSON格式: + { + "_device_id": "...", # 可选的显式设备 ID(来自客户端转发) + "device": { + "name": "...", + "description": "...", + "firmware_version": "...", + "hardware_version": "..." + }, + "functions": [ ... ] + } + + 返回 DeviceInfo 对象。 + 策略为 REJECT 时,发生冲突会抛出 ValueError。 + """ + device_data = json_data.get("device", {}) + name = device_data.get("name", "") + description = device_data.get("description", "") + + with self._lock: + explicit_id = json_data.get("_device_id", "") + device_id = explicit_id if explicit_id else self._resolve_identity(name) + device = DeviceInfo( + device_id=device_id, + name=name, + description=description, + firmware_version=device_data.get("firmware_version", ""), + hardware_version=device_data.get("hardware_version", ""), + functions=json_data.get("functions", []), + ) + device.state = DeviceState.REGISTERED + now = time.time() + device.last_seen = now + device.connected_at = now + + existing = self._find_by_name(name) + is_new = device_id not in self._devices + + if existing and existing.device_id != device_id: + if self._conflict_strategy == DeviceManager.ConflictStrategy.REJECT: + raise ValueError(f"Device name conflict: '{name}' already registered") + elif existing and existing.device_id == device_id: + is_new = False + + self._devices[device_id] = device + is_new_flag = is_new + device_ref = device + + self._notify("added" if is_new_flag else "updated", device_ref) + return device_ref + + def _resolve_identity(self, name: str) -> str: + """将设备名解析为唯一的设备ID。""" + existing = self._find_by_name(name) + if existing: + if self._conflict_strategy == DeviceManager.ConflictStrategy.RENAME: + return self._generate_id() + elif self._conflict_strategy == DeviceManager.ConflictStrategy.REJECT: + raise ValueError(f"Device name conflict: '{name}' already registered") + elif self._conflict_strategy == DeviceManager.ConflictStrategy.REPLACE: + return existing.device_id + return self._generate_id() + + def _find_by_name(self, name: str) -> Optional["DeviceInfo"]: + for dev in self._devices.values(): + if dev.name == name: + return dev + return None + + def get_device(self, device_id: str) -> Optional["DeviceInfo"]: + with self._lock: + return self._devices.get(device_id) + + def get_device_by_name(self, name: str) -> Optional["DeviceInfo"]: + return self._find_by_name(name) + + def remove_device(self, device_id: str) -> bool: + with self._lock: + device = self._devices.pop(device_id, None) + if device: + device.state = DeviceState.DISCONNECTED + self._notify("removed", device) + return True + return False + + def get_all_devices(self) -> list["DeviceInfo"]: + with self._lock: + return list(self._devices.values()) + + def device_count(self) -> int: + with self._lock: + return len(self._devices) + + def touch_device(self, device_id: str): + with self._lock: + device = self._devices.get(device_id) + if device: + device.last_seen = time.time() + + def to_dict(self) -> dict: + return { + "devices": [d.to_dict() for d in self.get_all_devices()], + "count": self.device_count(), + } + + def get_device_for_function(self, function_name: str) -> Optional[str]: + """Find which device provides a specific function name. + Returns device_id or None if not found. + """ + with self._lock: + for dev_id, dev in self._devices.items(): + for func in dev.functions: + if func.get("name") == function_name: + return dev_id + return None diff --git a/src/server_core/yosuga_embedded_server/function_registry.py b/src/server_core/yosuga_embedded_server/function_registry.py new file mode 100644 index 0000000..a6de060 --- /dev/null +++ b/src/server_core/yosuga_embedded_server/function_registry.py @@ -0,0 +1,190 @@ +""" +函数注册表 - 维护所有设备的全局函数表。 + +这是所有已连接设备的所有函数的汇总视图。 +注册表在运行时动态更新:随设备连接/断开而更新。 +""" + +import copy +import json +import threading +from enum import Enum +from typing import Optional, Callable + + +class FuncType(Enum): + CTRL_NORET = "ctrl_noret" + CTRL_RET = "ctrl_ret" + DATA_RET = "data_ret" + NORET_NODATA = "noret_nodata" + + @classmethod + def from_str(cls, s: str) -> "FuncType": + for ft in cls: + if ft.value == s: + return ft + return cls.CTRL_NORET + + +class ParamInfo: + """单个函数参数的描述符""" + + def __init__(self, name: str = "", description: str = "", + param_type: str = "int", optional: bool = False): + self.name = name + self.description = description + self.type = param_type + self.optional = optional + + @classmethod + def from_dict(cls, d: dict) -> "ParamInfo": + return cls( + name=d.get("name", ""), + description=d.get("description", ""), + param_type=d.get("type", "int"), + optional=d.get("optional", False), + ) + + def to_dict(self) -> dict: + return { + "name": self.name, + "description": self.description, + "type": self.type, + "optional": self.optional, + } + + def __repr__(self) -> str: + return f"ParamInfo({self.name}: {self.type})" + + +class FunctionInfo: + """整个系统中单个函数的完整描述符。 + + 包含函数名、描述、参数信息、类型以及提供该函数的设备。 + """ + + def __init__( + self, + name: str = "", + description: str = "", + func_type: FuncType = FuncType.CTRL_NORET, + params: Optional[list] = None, + device_id: str = "", + device_name: str = "", + ): + self.name = name + self.description = description + self.func_type = func_type + self.params = params or [] + self.device_id = device_id + self.device_name = device_name + + @classmethod + def from_device_func(cls, func_dict: dict, device_id: str, device_name: str) -> "FunctionInfo": + raw_params = func_dict.get("params", []) + params = [ParamInfo.from_dict(p) if isinstance(p, dict) else ParamInfo() + for p in raw_params] + return cls( + name=func_dict.get("name", ""), + description=func_dict.get("description", ""), + func_type=FuncType.from_str(func_dict.get("type", "ctrl_noret")), + params=params, + device_id=device_id, + device_name=device_name, + ) + + def to_dict(self) -> dict: + return { + "name": self.name, + "description": self.description, + "type": self.func_type.value, + "params": [p.to_dict() for p in self.params], + "device_id": self.device_id, + "device_name": self.device_name, + } + + def __repr__(self) -> str: + return f"FunctionInfo({self.name} @ {self.device_name})" + + +class FunctionRegistry: + """所有设备上所有函数的全局注册表。 + + 维护: + - functions_by_name: dict[str, FunctionInfo] - 按名称快速查找 + - functions_by_device: dict[str, list[FunctionInfo]] - 按设备查找 + + 线程安全。变更时触发回调。 + """ + + def __init__(self): + self._lock = threading.Lock() + self._functions_by_name: dict[str, FunctionInfo] = {} + self._functions_by_device: dict[str, list[FunctionInfo]] = {} + self._on_change: Optional[Callable] = None + + @property + def on_change(self) -> Optional[Callable]: + return self._on_change + + @on_change.setter + def on_change(self, cb: Optional[Callable]): + self._on_change = cb + + def _notify(self): + if self._on_change: + try: + self._on_change() + except Exception: + pass + + def add_device_functions(self, device_id: str, device_name: str, + func_list: list[dict]): + """添加或更新某个设备的所有函数。""" + with self._lock: + self._remove_device_functions_locked(device_id) + func_infos = [] + for func_dict in func_list: + fi = FunctionInfo.from_device_func(func_dict, device_id, device_name) + func_infos.append(fi) + self._functions_by_name[fi.name] = fi + self._functions_by_device[device_id] = func_infos + self._notify() + + def _remove_device_functions_locked(self, device_id: str): + """移除设备函数,不加锁(调用者必须持有锁)。""" + funcs = self._functions_by_device.pop(device_id, []) + for fi in funcs: + self._functions_by_name.pop(fi.name, None) + + def remove_device_functions(self, device_id: str): + """移除某个设备的所有函数。""" + with self._lock: + self._remove_device_functions_locked(device_id) + self._notify() + + def get_function(self, name: str) -> Optional[FunctionInfo]: + with self._lock: + return self._functions_by_name.get(name) + + def get_all_functions(self) -> list[FunctionInfo]: + with self._lock: + return list(self._functions_by_name.values()) + + def get_device_functions(self, device_id: str) -> list[FunctionInfo]: + with self._lock: + return list(self._functions_by_device.get(device_id, [])) + + def function_count(self) -> int: + with self._lock: + return len(self._functions_by_name) + + def to_function_list(self) -> list[dict]: + return [fi.to_dict() for fi in self.get_all_functions()] + + def to_json(self) -> str: + return json.dumps(self.to_function_list(), indent=2, ensure_ascii=False) + + def find_device_for_function(self, func_name: str) -> Optional[str]: + fi = self.get_function(func_name) + return fi.device_id if fi else None diff --git a/src/server_core/yosuga_embedded_server/json_rpc.py b/src/server_core/yosuga_embedded_server/json_rpc.py new file mode 100644 index 0000000..044b14f --- /dev/null +++ b/src/server_core/yosuga_embedded_server/json_rpc.py @@ -0,0 +1,232 @@ +""" +JSON-RPC 2.0 协议处理器。 + +处理AI、服务端和嵌入式设备之间JSON-RPC消息的 +解析、验证、构建和路由 +""" + +import json +from typing import Optional + + +class RPCError(Exception): + """JSON-RPC错误,包含标准错误码""" + + # 标准 JSON-RPC 错误码 + PARSE_ERROR = -32700 + INVALID_REQUEST = -32600 + METHOD_NOT_FOUND = -32601 + INVALID_PARAMS = -32602 + INTERNAL_ERROR = -32603 + + # 自定义错误码 + DEVICE_NOT_FOUND = -32000 + DEVICE_ERROR = -32001 + TIMEOUT = -32002 + + def __init__(self, code: int, message: str, data: Optional[dict] = None): + self.code = code + self.message = message + self.data = data + super().__init__(f"[{code}] {message}") + + def to_dict(self) -> dict: + err = {"code": self.code, "message": self.message} + if self.data: + err["data"] = self.data + return err + + +class RPCRequest: + """表示一个JSON-RPC 2.0请求""" + + def __init__(self, method: str, params: Optional[dict] = None, + request_id: Optional[int] = None): + self.method = method + self.params = params or {} + self.id = request_id + + def is_notification(self) -> bool: + return self.id is None + + def to_dict(self) -> dict: + req = { + "jsonrpc": "2.0", + "method": self.method, + } + if self.params: + req["params"] = self.params + if self.id is not None: + req["id"] = self.id + return req + + def to_json(self) -> str: + return json.dumps(self.to_dict(), ensure_ascii=False) + + @classmethod + def from_dict(cls, d: dict) -> "RPCRequest": + return cls( + method=d["method"], + params=d.get("params"), + request_id=d.get("id"), + ) + + def __repr__(self) -> str: + return f"RPCRequest(method={self.method}, id={self.id})" + + +class RPCResponse: + """表示一个JSON-RPC 2.0响应""" + + def __init__(self, result: Optional[dict] = None, + error: Optional[RPCError] = None, + request_id: Optional[int] = None): + self.result = result + self.error = error + self.id = request_id + + def is_success(self) -> bool: + return self.error is None + + def to_dict(self) -> dict: + resp = {"jsonrpc": "2.0"} + if self.id is not None: + resp["id"] = self.id + if self.error: + resp["error"] = self.error.to_dict() + else: + resp["result"] = self.result + return resp + + def to_json(self) -> str: + return json.dumps(self.to_dict(), ensure_ascii=False) + + @classmethod + def success(cls, result: Optional[dict], request_id: Optional[int]) -> "RPCResponse": + return cls(result=result, request_id=request_id) + + @classmethod + def error(cls, code: int, message: str, request_id: Optional[int] = None) -> "RPCResponse": + return cls(error=RPCError(code, message), request_id=request_id) + + def __repr__(self) -> str: + if self.error: + return f"RPCResponse(error={self.error.message}, id={self.id})" + return f"RPCResponse(result={self.result}, id={self.id})" + + +class JSONRPCHandler: + """处理JSON-RPC协议解析和响应构建""" + + @staticmethod + def parse_request(json_str: str) -> Optional[RPCRequest]: + """将JSON字符串解析为RPCRequest""" + try: + data = json.loads(json_str) + except json.JSONDecodeError: + return None + + if not isinstance(data, dict): + return None + if data.get("jsonrpc") != "2.0": + return None + if "method" not in data or not isinstance(data["method"], str): + return None + + return RPCRequest.from_dict(data) + + @staticmethod + def parse_request_batch(json_str: str) -> Optional[list[RPCRequest]]: + """将JSON字符串解析为RPCRequest列表(用于批量调用)""" + try: + data = json.loads(json_str) + except json.JSONDecodeError: + return None + + if isinstance(data, dict): + req = JSONRPCHandler.parse_request(json_str) + return [req] if req else None + + if isinstance(data, list): + results = [] + for item in data: + item_str = json.dumps(item) + req = JSONRPCHandler.parse_request(item_str) + if req: + results.append(req) + return results if results else None + + return None + + @staticmethod + def validate_request(request: dict) -> Optional[RPCError]: + """验证原始请求字典。无效时返回RPCError""" + if not isinstance(request, dict): + return RPCError(RPCError.INVALID_REQUEST, "request must be a JSON object") + if request.get("jsonrpc") != "2.0": + return RPCError(RPCError.INVALID_REQUEST, "jsonrpc must be '2.0'") + if "method" not in request: + return RPCError(RPCError.INVALID_REQUEST, "missing method") + if not isinstance(request["method"], str) or not request["method"]: + return RPCError(RPCError.INVALID_REQUEST, "method must be a non-empty string") + params = request.get("params") + if params is not None and not isinstance(params, dict): + return RPCError(RPCError.INVALID_PARAMS, "params must be a JSON object") + return None + + @staticmethod + def build_error_response(code: int, message: str, + request_id: Optional[int] = None) -> str: + """构建JSON-RPC错误响应字符串""" + resp = RPCResponse.error(code, message, request_id) + return resp.to_json() + + @staticmethod + def build_success_response(result: Optional[dict], + request_id: Optional[int] = None) -> str: + """构建JSON-RPC成功响应字符串""" + resp = RPCResponse.success(result, request_id) + return resp.to_json() + + @staticmethod + def build_call(method: str, params: Optional[dict] = None, + call_id: Optional[int] = None) -> str: + """构建JSON-RPC调用字符串(服务端 -> 设备)""" + req = RPCRequest(method, params, call_id) + return req.to_json() + + @staticmethod + def is_response(json_str: str) -> bool: + """检查JSON字符串是否为JSON-RPC响应""" + try: + data = json.loads(json_str) + if isinstance(data, dict): + return "result" in data or "error" in data + return False + except json.JSONDecodeError: + return False + + @staticmethod + def parse_response(json_str: str) -> Optional[RPCResponse]: + """将JSON字符串解析为RPCResponse""" + try: + data = json.loads(json_str) + except json.JSONDecodeError: + return None + + if not isinstance(data, dict): + return None + if data.get("jsonrpc") != "2.0": + return None + + request_id = data.get("id") + if "error" in data: + err_data = data["error"] + return RPCResponse.error( + err_data.get("code", RPCError.INTERNAL_ERROR), + err_data.get("message", "unknown error"), + request_id, + ) + elif "result" in data: + return RPCResponse.success(data["result"], request_id) + return None diff --git a/src/server_core/yosuga_embedded_server/server.py b/src/server_core/yosuga_embedded_server/server.py new file mode 100644 index 0000000..8d01bbd --- /dev/null +++ b/src/server_core/yosuga_embedded_server/server.py @@ -0,0 +1,270 @@ +""" +YosugaServer - 主服务端类,串联所有组件。 + +架构: + AI <-> YosugaServer <-> DeviceManager <-> 嵌入式设备 + FunctionRegistry + AIPromptBuilder + JSONRPCHandler + +流程: + 1. 设备连接,发送能力描述JSON + 2. 服务端注册设备+函数 + 3. 变更回调触发 -> 服务端更新AI提示词 + 4. 用户通过服务端向AI发送请求 + 5. 服务端用当前函数构建系统提示词 + 6. AI响应函数调用 + 7. 服务端解析调用并路由到设备 + 8. 设备响应被收集并返回 +""" + +import json +import logging +import threading +from typing import Optional, Callable, Any + +from .device_manager import DeviceManager, DeviceInfo +from .function_registry import FunctionRegistry +from .ai_prompt import AIPromptBuilder +from .json_rpc import JSONRPCHandler, RPCRequest, RPCResponse, RPCError + +logger = logging.getLogger(__name__) + + +class ServerConfig: + """服务端配置。""" + + def __init__( + self, + device_conflict_strategy: str = "rename", + max_concurrent_calls: int = 10, + device_timeout: float = 30.0, + ): + self.device_conflict_strategy = device_conflict_strategy + self.max_concurrent_calls = max_concurrent_calls + self.device_timeout = device_timeout + + +class YosugaServer: + """主服务端,协调设备、函数和AI交互。""" + + def __init__(self, config: Optional[ServerConfig] = None): + self.config = config or ServerConfig() + self.device_manager = DeviceManager( + conflict_strategy=self.config.device_conflict_strategy + ) + self.function_registry = FunctionRegistry() + self.ai_prompt = AIPromptBuilder() + self._lock = threading.Lock() + self._call_id_counter = 0 + + # 挂载变更通知 + self.device_manager.on_device_change = self._on_device_change + self.function_registry.on_change = self._on_functions_change + + # 外部集成回调 + self.on_capabilities_changed: Optional[Callable[[dict], Any]] = None + self.on_device_message: Optional[Callable[[str, str], Optional[str]]] = None + + def _on_device_change(self, event: str, device: DeviceInfo): + logger.info("Device %s: %s", event, device.device_id) + if event == "removed": + self.function_registry.remove_device_functions(device.device_id) + elif event in ("added", "updated"): + if device.state.value == "registered": + self.function_registry.add_device_functions( + device.device_id, + device.name, + device.functions or [], + ) + + def _on_functions_change(self): + if self.on_capabilities_changed: + try: + self.on_capabilities_changed(self.get_capabilities_summary()) + except Exception as e: + logger.error("capabilities callback error: %s", e) + + def _next_call_id(self) -> int: + with self._lock: + self._call_id_counter += 1 + return self._call_id_counter + + def register_device(self, device_json_str: str) -> DeviceInfo: + """从设备的JSON能力描述注册设备。 + + Args: + device_json_str: 设备发来的JSON字符串 + + Returns: + 已注册设备的 DeviceInfo + """ + data = json.loads(device_json_str) + return self.device_manager.register_from_json(data) + + def register_device_from_dict(self, device_json: dict, device_id: str = "") -> DeviceInfo: + """从字典注册设备(格式同JSON)。 + Args: + device_json: 设备能力描述字典 + device_id: 可选的显式设备 ID(来自客户端转发,覆盖自动生成) + """ + if device_id: + device_json = dict(device_json) + device_json["_device_id"] = device_id + return self.device_manager.register_from_json(device_json) + + def remove_device(self, device_id: str) -> bool: + """移除设备及其函数。""" + return self.device_manager.remove_device(device_id) + + def build_ai_system_prompt(self) -> str: + """构建当前AI系统提示词。""" + functions = self.function_registry.to_function_list() + return self.ai_prompt.build_system_prompt(functions) + + def process_ai_response(self, response_text: str) -> list[dict]: + """解析AI响应为RPC调用并路由到设备。 + + Args: + response_text: AI返回的原始文本 + + Returns: + 包含设备响应结果的字典列表 + """ + calls = self.ai_prompt.parse_ai_response(response_text) + if not calls: + return [{"error": "无法将AI响应解析为RPC调用"}] + + results = [] + for call in calls: + method = call.get("method") + params = call.get("params", {}) + call_id = call.get("id", self._next_call_id()) + + # 查找哪个设备提供此函数 + func_info = self.function_registry.get_function(method) + if not func_info: + results.append({ + "id": call_id, + "method": method, + "error": {"code": RPCError.METHOD_NOT_FOUND, + "message": f"未找到函数 '{method}'"}, + }) + continue + + device_id = func_info.device_id + device = self.device_manager.get_device(device_id) + if not device: + results.append({ + "id": call_id, + "method": method, + "error": {"code": RPCError.DEVICE_NOT_FOUND, + "message": f"设备 '{device_id}' 不可用"}, + }) + continue + + # 构建发送给设备的RPC调用 + rpc_call = JSONRPCHandler.build_call(method, params, call_id) + + # 如果有设备消息回调,使用它 + if self.on_device_message: + try: + response_str = self.on_device_message(device_id, rpc_call) + if response_str: + resp = JSONRPCHandler.parse_response(response_str) + if resp: + if resp.is_success(): + results.append({ + "id": call_id, + "method": method, + "device_id": device_id, + "result": resp.result, + }) + else: + results.append({ + "id": call_id, + "method": method, + "device_id": device_id, + "error": resp.error.to_dict(), + }) + else: + results.append({ + "id": call_id, + "method": method, + "error": {"code": RPCError.PARSE_ERROR, + "message": "Invalid response from device"}, + }) + else: + results.append({ + "id": call_id, + "method": method, + "device_id": device_id, + "result": None, + "note": "notification (no response expected)", + }) + except Exception as e: + results.append({ + "id": call_id, + "method": method, + "error": {"code": RPCError.DEVICE_ERROR, + "message": str(e)}, + }) + else: + results.append({ + "id": call_id, + "method": method, + "device_id": device_id, + "note": "No on_device_message callback set - call would be routed here", + }) + + return results + + def list_devices(self) -> list[dict]: + """获取所有在线设备的字典列表""" + return [d.to_dict() for d in self.device_manager.get_all_devices()] + + def send_rpc(self, device_id: str, rpc_call: str) -> Optional[str]: + """向指定设备发送 RPC 调用并返回响应""" + if self.on_device_message: + return self.on_device_message(device_id, rpc_call) + return None + + def get_capabilities_summary(self) -> dict: + """获取所有能力的摘要(供外部使用)。""" + return { + "device_count": self.device_manager.device_count(), + "function_count": self.function_registry.function_count(), + "devices": self.device_manager.to_dict(), + "functions": self.function_registry.to_function_list(), + } + + def process_device_message(self, device_id: str, message: str) -> str: + """处理设备发来的消息。 + + 设备发送: + - 能力描述(注册) + - RPC响应 + + 如有需要返回响应字符串。 + """ + try: + data = json.loads(message) + except json.JSONDecodeError: + return JSONRPCHandler.build_error_response( + RPCError.PARSE_ERROR, "Invalid JSON" + ) + + # 检查是否为能力描述(包含"device"和"functions") + if isinstance(data, dict) and "device" in data and "functions" in data: + device = self.register_device_from_dict(data) + return json.dumps({"status": "registered", "device_id": device.device_id}) + + # 检查是否为RPC响应 + if isinstance(data, dict) and ("result" in data or "error" in data): + # 仅确认 - 响应由回调处理 + return json.dumps({"status": "received"}) + + # 可能是转发调用或其他消息 + return JSONRPCHandler.build_error_response( + RPCError.INVALID_REQUEST, "Unknown message type" + ) diff --git a/src/server_view/backend/.gitignore b/src/server_view/backend/.gitignore new file mode 100644 index 0000000..df93c25 --- /dev/null +++ b/src/server_view/backend/.gitignore @@ -0,0 +1,17 @@ +# Python-generated files +__pycache__/ +*.py[oc] +build/ +dist/ +wheels/ +*.egg-info + +# from ide +.vscode +.idea + +# Virtual environments +.venv +venv + +user_preferences.json \ No newline at end of file diff --git a/src/server_view/backend/app.py b/src/server_view/backend/app.py new file mode 100644 index 0000000..67e36e9 --- /dev/null +++ b/src/server_view/backend/app.py @@ -0,0 +1,686 @@ +""" +Yosuga Server Web UI - FastAPI Backend with Socket.IO +""" +import asyncio +import json +import logging +import sys +import time +import psutil +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Dict, Any, Set + +from fastapi import FastAPI, HTTPException, UploadFile, File +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from fastapi.staticfiles import StaticFiles +import socketio + +# 项目根目录 +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from src.server_view.backend.core_manager import ( + start_core, stop_core, get_status, get_core_status +) +from src.server_view.backend.diagnostics import get_diagnostics, HealthStatus, CheckResult + +# Socket.IO 服务器 +sio = socketio.AsyncServer( + cors_allowed_origins="*", + async_mode="asgi", + logger=False, + engineio_logger=False +) + +# 跟踪已连接的客户端 +connected_clients: Set[str] = set() + +# RPC 响应转发回调标志(只注册一次) +_rpc_forwarder_registered: bool = False + +# 日志广播处理器 +class SocketIOLogHandler(logging.Handler): + """将Python标准日志发送到Socket.IO""" + def __init__(self, sio_server): + super().__init__() + self.sio = sio_server + self.setLevel(logging.DEBUG) + + def emit(self, record: logging.LogRecord): + try: + msg = self.format(record) + try: + loop = asyncio.get_running_loop() + loop.create_task(self._broadcast(msg, record.levelname)) + except RuntimeError: + pass + except Exception: + pass + + async def _broadcast(self, message: str, level: str): + try: + await self.sio.emit('log_line', { + 'line': message, + 'timestamp': time.time(), + 'level': level + }) + except Exception: + pass + +def setup_logging(sio_server): + """配置日志系统 - 减少HTTP访问日志噪音""" + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + + # 清除现有处理器 + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # 控制台处理器 - 过滤掉频繁的系统信息HTTP请求日志 + console = logging.StreamHandler(sys.stdout) + console.setLevel(logging.INFO) + console.setFormatter(logging.Formatter( + '%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d - %(message)s' + )) + + # 添加过滤器,排除频繁的HTTP轮询日志 + def filter_http_poll(record): + msg = record.getMessage() + # 过滤掉 /api/system/info 和 /api/core/status 的GET请求日志 + if 'GET /api/system/info' in msg or 'GET /api/core/status' in msg: + return False + return True + + console.addFilter(filter_http_poll) + root_logger.addHandler(console) + + # Socket.IO处理器 + sio_handler = SocketIOLogHandler(sio_server) + sio_handler.setLevel(logging.DEBUG) + sio_handler.setFormatter(logging.Formatter( + '%(asctime)s | %(levelname)-8s | %(name)s - %(message)s' + )) + root_logger.addHandler(sio_handler) + + # 设置yosuga logger + yosuga_logger = logging.getLogger("yosuga") + yosuga_logger.setLevel(logging.DEBUG) + + return root_logger + +# 系统状态监控 +async def system_monitor_task(): + """后台任务:定期采集并广播系统状态""" + while True: + try: + # 只有有客户端连接时才采集数据 + if connected_clients: + # 采集系统数据 + cpu = psutil.cpu_percent(interval=0.1) + mem = psutil.virtual_memory() + disk = psutil.disk_usage('/') + proc = psutil.Process() + + system_data = { + "cpu": {"percent": cpu, "count": psutil.cpu_count()}, + "memory": { + "total": mem.total, "available": mem.available, + "percent": mem.percent, "used": mem.used, "free": mem.free + }, + "disk": { + "total": disk.total, "used": disk.used, + "free": disk.free, "percent": (disk.used/disk.total)*100 + }, + "process": { + "memory_percent": proc.memory_percent(), + "cpu_percent": proc.cpu_percent(interval=0.1), + "threads": proc.num_threads(), + "uptime": time.time() - proc.create_time() + }, + "timestamp": time.time() + } + + # 广播给所有客户端 + await sio.emit('system_stats', { + 'success': True, + 'data': system_data + }) + + # 同时广播核心状态(实时推送) + core_status = get_core_status() + await sio.emit('core_status', { + 'success': True, + 'data': core_status + }) + + except Exception as e: + logging.error(f"系统监控任务异常: {e}") + + # 1秒间隔,比HTTP轮询更实时 + await asyncio.sleep(1) + +# FastAPI 应用 +@asynccontextmanager +async def lifespan(app: FastAPI): + """应用生命周期""" + logger = setup_logging(sio) + logger.info("Yosuga Server Web UI 启动") + + monitor_task = asyncio.create_task(system_monitor_task()) + logger.info("系统监控任务已启动") + + yield + + monitor_task.cancel() + try: + await monitor_task + except asyncio.CancelledError: + pass + + # 清理诊断模块 + try: + diag = await get_diagnostics() + # 诊断模块无需要关闭的资源,但保留钩子 + except: + pass + + logger.info("应用关闭") + stop_core() +fastapi_app = FastAPI( + title="Yosuga Server Web UI", + version="1.0.0", + lifespan=lifespan +) + +fastapi_app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@fastapi_app.post("/api/diagnostics/run") +async def run_diagnostics(): + """执行完整系统体检""" + try: + diag = await get_diagnostics() + report = await diag.run_full_diagnostics() + return report.to_dict() + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@fastapi_app.get("/api/diagnostics/check/{module}") +async def check_single_module(module: str): + """检查单个模块状态""" + try: + from src.config.config import cfg + diag = await get_diagnostics() + + # 获取对应配置 + config_map = { + 'asr': cfg.asr, + 'tts': cfg.tts, + 'ai': cfg.ai, + 'auto_agent': cfg.auto_agent, + 'llm_core': cfg.llm_core + } + + if module not in config_map: + raise HTTPException(status_code=400, detail=f"未知模块: {module}") + + # 转换为dict + config_dict = {} + if hasattr(config_map[module], '__dataclass_fields__'): + from dataclasses import asdict + config_dict = asdict(config_map[module]) + else: + config_dict = dict(config_map[module]) + + result = await diag.quick_check_module(module, config_dict) + return {"success": True, "data": result.to_dict()} + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@fastapi_app.get("/api/diagnostics/health") +async def quick_health_check(): + """快速健康检查(用于负载均衡/心跳)""" + try: + from src.server_view.backend.core_manager import get_status + status = get_status() + + # 简单检查WebSocket服务器是否活着 + ws_ok = len(connected_clients) >= 0 # 总是True,只要能响应 + + health = { + "status": "healthy" if status.is_running else "degraded", + "web_ui": "up", + "core_running": status.is_running, + "websocket_clients": len(connected_clients), + "timestamp": time.time() + } + + return health + + except Exception as e: + return {"status": "unhealthy", "error": str(e)} + +# Socket.IO ASGI应用 +app = socketio.ASGIApp(sio, fastapi_app) + +# Socket.IO 事件 +@sio.event +async def connect(sid, environ): + """客户端连接""" + connected_clients.add(sid) + print(f"客户端连接: {sid} (当前在线: {len(connected_clients)})") + + # 立即发送一次当前状态(避免前端等待) + await sio.emit('system', { + 'message': '连接成功', + 'timestamp': time.time(), + 'clients_count': len(connected_clients) + }, to=sid) + + # 立即推送一次系统状态(前端无需再发HTTP请求) + try: + core_status = get_core_status() + await sio.emit('core_status', { + 'success': True, + 'data': core_status + }, to=sid) + except Exception as e: + logging.error(f"推送初始状态失败: {e}") + +@sio.event +async def disconnect(sid): + """客户端断开""" + connected_clients.discard(sid) + print(f"客户端断开: {sid} (当前在线: {len(connected_clients)})") + +@sio.on('subscribe_logs') +async def handle_subscribe_logs(sid, data): + """订阅日志(可指定级别过滤)""" + level = data.get('level', 'ALL') if isinstance(data, dict) else 'ALL' + print(f"客户端 {sid} 订阅日志: {level}") + await sio.emit('system', {'message': f'已订阅日志: {level}'}, to=sid) + +@sio.on('control_core') +async def handle_control_core(sid, data): + """WebSocket方式控制核心""" + action = data.get('action') if isinstance(data, dict) else None + + if action == 'start': + try: + # 检查是否已运行 + status = get_status() + if status.is_running: + await sio.emit('core_control_result', { + 'success': True, + 'message': '核心已在运行', + 'data': status.to_dict() + }, to=sid) + return + + # 启动核心 + success, error = start_core(project_root) + if success: + # 等待一下确保启动成功 + await asyncio.sleep(0.5) + new_status = get_status() + # 广播给所有客户端(不仅仅是操作者) + await sio.emit('core_status', { + 'success': True, + 'data': new_status.to_dict(), + 'message': '核心启动成功' + }) + else: + await sio.emit('core_control_result', { + 'success': False, + 'error': error or '启动失败' + }, to=sid) + except Exception as e: + await sio.emit('core_control_result', { + 'success': False, + 'error': str(e) + }, to=sid) + + elif action == 'stop': + try: + status = get_status() + if not status.is_running: + await sio.emit('core_control_result', { + 'success': True, + 'message': '核心已停止', + 'data': status.to_dict() + }, to=sid) + return + + success, error = stop_core() + if success: + await asyncio.sleep(0.5) # 等待停止完成 + new_status = get_status() + await sio.emit('core_status', { + 'success': True, + 'data': new_status.to_dict(), + 'message': '核心已停止' + }) + else: + await sio.emit('core_control_result', { + 'success': False, + 'error': error or '停止失败' + }, to=sid) + except Exception as e: + await sio.emit('core_control_result', { + 'success': False, + 'error': str(e) + }, to=sid) + + +# 设备管理 Socket.IO 事件 +@sio.on('get_devices') +async def handle_get_devices(sid): + """获取当前所有在线设备列表""" + try: + from src.server_core.core import YosugaServerCore + core = await YosugaServerCore.get_instance() + devices = core.embedded_server.list_devices() + await sio.emit('devices_list', { + 'success': True, + 'data': devices + }, to=sid) + except Exception as e: + await sio.emit('devices_list', { + 'success': False, + 'error': str(e) + }, to=sid) + +@sio.on('send_device_rpc') +async def handle_send_device_rpc(sid, data): + """向指定设备发送 RPC 命令""" + device_id = data.get('device_id') if isinstance(data, dict) else None + rpc_call = data.get('rpc_call') if isinstance(data, dict) else None + if not device_id or not rpc_call: + await sio.emit('device_rpc_result', { + 'success': False, + 'error': '缺少 device_id 或 rpc_call' + }, to=sid) + return + try: + from src.server_core.core import YosugaServerCore + core = await YosugaServerCore.get_instance() + + # 注册一次性的 RPC 响应转发回调 + global _rpc_forwarder_registered + if not _rpc_forwarder_registered: + async def forward_rpc_response(dev_id: str, payload: dict): + await sio.emit('device_rpc_response', { + 'device_id': dev_id, + 'payload': payload + }) + core.device_dto.on_rpc_response = forward_rpc_response + _rpc_forwarder_registered = True + + core.embedded_server.send_rpc(device_id, rpc_call) + await sio.emit('device_rpc_result', { + 'success': True, + 'device_id': device_id, + 'message': 'RPC 命令已发送到设备' + }, to=sid) + except Exception as e: + await sio.emit('device_rpc_result', { + 'success': False, + 'error': str(e) + }, to=sid) + +# 设备管理 REST API +@fastapi_app.get("/api/devices") +async def get_devices_api(): + """获取所有在线设备(HTTP 备用)""" + try: + from src.server_core.core import YosugaServerCore + core = await YosugaServerCore.get_instance() + devices = core.embedded_server.list_devices() + return {"success": True, "data": devices} + except Exception as e: + return {"success": False, "error": str(e)} + +@sio.on('check_module_health') +async def handle_check_module(sid, data): + """WebSocket方式检查模块健康""" + module = data.get('module') if isinstance(data, dict) else None + if not module: + await sio.emit('module_health_result', { + "success": False, + "error": "未指定模块" + }, to=sid) + return + + try: + from src.config.config import cfg + diag = await get_diagnostics() + + config_map = { + 'asr': cfg.asr, + 'tts': cfg.tts, + 'ai': cfg.ai, + 'auto_agent': cfg.auto_agent, + 'llm_core': cfg.llm_core + } + + if module not in config_map: + await sio.emit('module_health_result', { + "success": False, + "error": f"未知模块: {module}" + }, to=sid) + return + + # 转换配置 + config_dict = {} + if hasattr(config_map[module], '__dataclass_fields__'): + from dataclasses import asdict + config_dict = asdict(config_map[module]) + else: + config_dict = dict(config_map[module]) + + result = await diag.quick_check_module(module, config_dict) + + await sio.emit('module_health_result', { + "success": True, + "module": module, + "data": result.to_dict() + }, to=sid) + + # 同时广播给所有客户端更新模块状态 + await sio.emit('module_status_update', { + "module": module, + "status": result.to_dict() + }) + + except Exception as e: + await sio.emit('module_health_result', { + "success": False, + "error": str(e) + }, to=sid) + +# REST API +@fastapi_app.get("/api/system/info") +async def get_system_info(): + """HTTP备用接口 - 减少日志""" + try: + cpu = psutil.cpu_percent(interval=0.1) + mem = psutil.virtual_memory() + disk = psutil.disk_usage('/') + proc = psutil.Process() + + return { + "success": True, + "data": { + "cpu": {"percent": cpu, "count": psutil.cpu_count()}, + "memory": { + "total": mem.total, "available": mem.available, + "percent": mem.percent, "used": mem.used, "free": mem.free + }, + "disk": { + "total": disk.total, "used": disk.used, + "free": disk.free, "percent": (disk.used/disk.total)*100 + }, + "process": { + "memory_percent": proc.memory_percent(), + "cpu_percent": proc.cpu_percent(interval=0.1), + "threads": proc.num_threads(), + "uptime": time.time() - proc.create_time() + } + } + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@fastapi_app.get("/api/core/status") +async def get_core_status_api(): + """HTTP备用接口""" + return {"success": True, "data": get_core_status()} + +@fastapi_app.post("/api/core/start") +async def start_core_api(): + """HTTP备用接口""" + try: + status = get_status() + if status.is_running: + return {"success": True, "data": status.to_dict(), "message": "核心已在运行"} + + success, error = start_core(project_root) + if not success: + raise HTTPException(status_code=500, detail=error or "启动失败") + + # 等待初始化完成 + for _ in range(20): + await asyncio.sleep(0.5) + status = get_status() + if status.is_running: + # 通过WebSocket广播状态更新(给所有连接的客户端) + await sio.emit('core_status', { + 'success': True, + 'data': status.to_dict(), + 'message': '核心启动成功' + }) + return {"success": True, "data": status.to_dict(), "message": "核心启动成功"} + + status = get_status() + return {"success": True, "data": status.to_dict(), "message": "核心启动中..."} + + except HTTPException: + raise + except Exception as e: + status = get_status() + if status.is_running: + return {"success": True, "data": status.to_dict(), "message": "核心已启动"} + raise HTTPException(status_code=500, detail=str(e)) + +@fastapi_app.post("/api/core/stop") +async def stop_core_api(): + """HTTP备用接口""" + try: + status = get_status() + if not status.is_running: + return {"success": True, "data": status.to_dict(), "message": "核心已停止"} + + success, error = stop_core() + if success: + for _ in range(20): + await asyncio.sleep(0.5) + status = get_status() + if not status.is_running: + # 广播状态更新 + await sio.emit('core_status', { + 'success': True, + 'data': status.to_dict(), + 'message': '核心已停止' + }) + return {"success": True, "data": status.to_dict(), "message": "核心已停止"} + + status = get_status() + return {"success": True, "data": status.to_dict(), "message": error or "核心停止中..."} + else: + raise HTTPException(status_code=500, detail=error or "停止失败") + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@fastapi_app.get("/api/modules/status") +async def get_modules_status(): + """模块状态""" + try: + from src.config.config import cfg + return { + "success": True, + "data": { + "asr": {"enabled": cfg.asr.enabled}, + "tts": {"enabled": cfg.tts.enabled}, + "ai": {"enabled": cfg.ai.api_key is not None}, + "auto_agent": {"enabled": cfg.auto_agent.enabled}, + "llm_core": {"enabled": cfg.llm_core.enabled} + } + } + except: + return { + "success": True, + "data": { + "asr": {"enabled": True}, "tts": {"enabled": True}, + "ai": {"enabled": True}, "auto_agent": {"enabled": True}, + "llm_core": {"enabled": True} + } + } + +@fastapi_app.get("/api/config") +async def get_config(): + from src.config.config import cfg + return {"success": True, "data": cfg.to_dict()} + +@fastapi_app.post("/api/config/{section}") +async def update_config(section: str, data: Dict[str, Any]): + from src.config.config import cfg + cfg.update({section: data}) + return {"success": True, "message": "配置已更新"} + +@fastapi_app.post("/api/config/reload") +async def reload_config(): + from src.config.config import cfg + cfg.reload() + return {"success": True, "message": "配置已重载"} + +@fastapi_app.get("/api/preferences") +async def get_preferences(): + prefs_path = Path(__file__).parent / "user_preferences.json" + if prefs_path.exists(): + with open(prefs_path, 'r', encoding='utf-8') as f: + return {"success": True, "data": json.load(f)} + return {"success": True, "data": {}} + +@fastapi_app.post("/api/preferences") +async def save_preferences(data: Dict[str, Any]): + prefs_path = Path(__file__).parent / "user_preferences.json" + with open(prefs_path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + return {"success": True, "message": "偏好已保存"} + +# 静态文件 +static_dir = Path(__file__).parent / "static" +if static_dir.exists(): + fastapi_app.mount("/", StaticFiles(directory=static_dir, html=True), name="static") + +def run_server(host: str = "0.0.0.0", port: int = 8089, debug: bool = False): + import uvicorn + uvicorn.run("backend.app:app", host=host, port=port, reload=debug, + access_log=False) # 禁用默认访问日志(我们使用自定义过滤器) + +if __name__ == "__main__": + run_server() \ No newline at end of file diff --git a/src/server_view/backend/core_manager.py b/src/server_view/backend/core_manager.py new file mode 100644 index 0000000..f2503cf --- /dev/null +++ b/src/server_view/backend/core_manager.py @@ -0,0 +1,248 @@ +""" +Yosuga Server 核心进程管理器 +""" +import asyncio +import threading +import time +import sys +import logging +from pathlib import Path +from typing import Optional, Dict, Any +from dataclasses import dataclass + +# 全局状态 +_core_instance: Optional[Any] = None +_core_thread: Optional[threading.Thread] = None +_core_start_time: Optional[float] = None +_core_stop_event: threading.Event = threading.Event() # 停止信号 +_core_lock = threading.Lock() +_logger: Optional[logging.Logger] = None + + +@dataclass +class CoreStatus: + is_running: bool = False + pid: int = 0 + uptime: float = 0.0 + error: Optional[str] = None + thread_alive: bool = False + start_time: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "is_running": self.is_running, + "pid": self.pid, + "uptime": round(self.uptime, 1), + "error": self.error, + "thread_alive": self.thread_alive, + "start_time": self.start_time + } + +import os +def get_status() -> CoreStatus: + """获取当前核心状态""" + global _core_thread, _core_start_time, _core_instance + + with _core_lock: + status = CoreStatus() + + if _core_thread is not None: + status.thread_alive = _core_thread.is_alive() + status.pid = os.getpid() + + if _core_start_time and (status.thread_alive or _core_instance is not None): + status.uptime = time.time() - _core_start_time + status.is_running = True + status.start_time = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(_core_start_time)) + + return status + + +def _setup_loguru_logging(log_queue: Optional[Any] = None): + """配置loguru,同时转发到标准logging以便Socket.IO捕获""" + try: + from loguru import logger + import logging + + logger.remove() + + # 关键:添加一个sink将loguru日志转发到标准logging + class LoguruToStandard: + def write(self, message): + # 解析loguru格式提取level + record = message.record + level = record["level"].name + # 获取标准logger并发送 + std_logger = logging.getLogger("yosuga") + if level == "DEBUG": + std_logger.debug(record["message"]) + elif level == "INFO": + std_logger.info(record["message"]) + elif level == "SUCCESS": + std_logger.info(record["message"]) + elif level == "WARNING": + std_logger.warning(record["message"]) + elif level == "ERROR": + std_logger.error(record["message"]) + elif level == "CRITICAL": + std_logger.critical(record["message"]) + + def flush(self): + pass + + # 添加转发处理器 + logger.add(LoguruToStandard(), format="{message}") + + # 文件日志 + from src.config.config import cfg + log_dir = Path(cfg.log_dir) + log_dir.mkdir(parents=True, exist_ok=True) + logger.add( + f"{log_dir}/Yosuga_server-{{time:YYYY-MM-DD_HH-mm-ss}}.log", + encoding="utf-8", + rotation="100 MB" + ) + + return logger + except Exception as e: + print(f"Loguru配置错误: {e}") + return None + + +def _run_core_thread(project_root: Path): + """在独立线程中运行YosugaServerCore""" + global _core_instance, _core_start_time, _core_stop_event, _logger + + _core_stop_event.clear() + + try: + sys.path.insert(0, str(project_root)) + + from src.server_core.core import YosugaServerCore + from src.config.config import cfg + + # 配置loguru并获取logger + logger = _setup_loguru_logging() + if logger: + logger.info("Yosuga_server 在线程中启动") + _logger = logger + + async def run_core(): + global _core_instance + try: + _core_instance = await YosugaServerCore.get_instance() + + if logger: + logger.success(f"YosugaServerCore 初始化完成,线程ID: {threading.current_thread().ident}") + + # 运行核心,同时检查停止信号 + core_task = asyncio.create_task(_core_instance.run()) + + # 等待任务完成或收到停止信号 + while not core_task.done(): + if _core_stop_event.is_set(): + core_task.cancel() + try: + await core_task + except asyncio.CancelledError: + if logger: + logger.info("核心收到停止信号,正在关闭...") + break + await asyncio.sleep(0.1) + + if logger: + logger.info("核心事件循环已结束") + + except asyncio.CancelledError: + if logger: + logger.info("核心任务已取消") + except Exception as e: + if logger: + logger.exception(f"核心运行异常: {e}") + raise + + # 运行异步核心 + asyncio.run(run_core()) + + except Exception as e: + import traceback + error_msg = f"核心线程异常: {str(e)}\n{traceback.format_exc()}" + print(error_msg, file=sys.stderr) + if _logger: + _logger.error(error_msg) + finally: + _core_instance = None + if logger: + logger.info("核心线程已退出") + + +def start_core(project_root: Path) -> tuple[bool, Optional[str]]: + """启动Yosuga核心""" + global _core_thread, _core_start_time, _core_stop_event + + with _core_lock: + if _core_thread is not None and _core_thread.is_alive(): + return True, None # 已经在运行 + + _core_start_time = None + _core_stop_event.clear() + + try: + _core_thread = threading.Thread( + target=_run_core_thread, + args=(project_root,), + name="YosugaServerCore", + daemon=True + ) + _core_thread.start() + _core_start_time = time.time() # 立即记录 + + time.sleep(0.5) + + if not _core_thread.is_alive(): + _core_start_time = None + return False, "核心线程未能启动" + + return True, None + + except Exception as e: + _core_start_time = None + return False, str(e) + + +def stop_core() -> tuple[bool, Optional[str]]: + """停止Yosuga核心""" + global _core_thread, _core_start_time, _core_stop_event + + with _core_lock: + if _core_thread is None or not _core_thread.is_alive(): + _core_thread = None + _core_start_time = None + return True, None # 已经停止 + + try: + # 发送停止信号 + _core_stop_event.set() + + # 等待线程结束(带超时) + _core_thread.join(timeout=10.0) + + was_alive = _core_thread.is_alive() + _core_thread = None + + if was_alive: + # 线程还在运行,但已经发送了停止信号 + # 由于daemon=True,主进程退出时会强制终止 + _core_start_time = None + return True, "核心停止信号已发送,正在后台停止" + + _core_start_time = None + return True, None + + except Exception as e: + return False, str(e) + + +# 兼容旧接口 +def get_core_status() -> Dict[str, Any]: + return get_status().to_dict() \ No newline at end of file diff --git a/src/server_view/backend/diagnostics.py b/src/server_view/backend/diagnostics.py new file mode 100644 index 0000000..1297a30 --- /dev/null +++ b/src/server_view/backend/diagnostics.py @@ -0,0 +1,524 @@ +""" +Yosuga Server 系统诊断模块 - TCP端口连通性版本 +生产级健康检查与自检工具(不依赖HTTP接口) +""" +import asyncio +import json +import socket +import time +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Dict, List, Optional, Any, Tuple +from urllib.parse import urlparse +import psutil +from loguru import logger + +class HealthStatus(Enum): + HEALTHY = "healthy" + UNHEALTHY = "unhealthy" + UNKNOWN = "unknown" + CHECKING = "checking" + +@dataclass +class CheckResult: + """单项检查结果""" + name: str + status: HealthStatus + message: str + details: Dict[str, Any] = field(default_factory=dict) + latency_ms: float = 0.0 + timestamp: float = field(default_factory=time.time) + + def to_dict(self) -> dict: + return { + "name": self.name, + "status": self.status.value, + "message": self.message, + "details": self.details, + "latency_ms": round(self.latency_ms, 2), + "timestamp": self.timestamp + } + +@dataclass +class DiagnosticsReport: + """完整诊断报告""" + overall_status: HealthStatus + checks: List[CheckResult] + summary: Dict[str, int] + generated_at: float = field(default_factory=time.time) + version: str = "1.1.0" + + def to_dict(self) -> dict: + return { + "overall_status": self.overall_status.value, + "checks": [c.to_dict() for c in self.checks], + "summary": self.summary, + "generated_at": self.generated_at, + "version": self.version + } + +class SystemDiagnostics: + """系统诊断核心类 - TCP端口连通性检测""" + + def __init__(self, config_path: Optional[Path] = None): + self.config_path = config_path or self._find_config_path() + self._timeout_seconds = 3 + + def _find_config_path(self) -> Path: + """自动查找配置文件路径""" + markers = ['settings.json', 'pyproject.toml'] + current = Path(__file__).resolve().parent.parent.parent.parent + + for path in [current, *current.parents]: + if (path / 'settings.json').exists(): + return path / 'settings.json' + if path == path.parent: + break + return current / 'settings.json' + + async def _check_tcp_port(self, host: str, port: int) -> Tuple[bool, float, Optional[str]]: + """ + 基础TCP端口连通性检查 + + Returns: + (是否连通, 延迟ms, 错误信息) + """ + start = time.time() + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), + timeout=self._timeout_seconds + ) + writer.close() + await writer.wait_closed() + latency = (time.time() - start) * 1000 + return True, latency, None + except asyncio.TimeoutError: + return False, self._timeout_seconds * 1000, "连接超时" + except ConnectionRefusedError: + return False, (time.time() - start) * 1000, "连接被拒绝" + except Exception as e: + return False, (time.time() - start) * 1000, str(e) + + def _parse_url(self, url: str) -> Tuple[str, int]: + """ + 从URL解析主机和端口 + 支持 http://host:port/path 格式 + """ + try: + parsed = urlparse(url) + host = parsed.hostname or 'localhost' + if parsed.port: + port = parsed.port + elif parsed.scheme == 'https': + port = 443 + elif parsed.scheme == 'http': + port = 80 + else: + port = 80 + return host, port + except Exception: + if ':' in url: + parts = url.split(':') + if len(parts) >= 2: + last_part = parts[-1].split('/')[0] + try: + return parts[-2].replace('//', ''), int(last_part) + except: + pass + return 'localhost', 80 + + async def check_asr(self, url: str = "http://localhost:20260") -> CheckResult: + """检查ASR服务 - TCP端口连通性""" + host, port = self._parse_url(url) + if port == 80 and '20260' in url: + port = 20260 + + is_open, latency, error = await self._check_tcp_port(host, port) + + if is_open: + return CheckResult( + name="ASR服务", + status=HealthStatus.HEALTHY, + message=f"端口可连通 {host}:{port}", + details={"host": host, "port": port, "protocol": "TCP"}, + latency_ms=latency + ) + else: + return CheckResult( + name="ASR服务", + status=HealthStatus.UNHEALTHY, + message=f"端口不可达 {host}:{port} - {error}", + details={"host": host, "port": port, "error": error}, + latency_ms=latency + ) + + async def check_tts(self, host: str = "localhost", port: int = 20261) -> CheckResult: + """检查TTS服务 - TCP端口连通性""" + is_open, latency, error = await self._check_tcp_port(host, port) + + if is_open: + return CheckResult( + name="TTS服务", + status=HealthStatus.HEALTHY, + message=f"端口可连通 {host}:{port}", + details={"host": host, "port": port}, + latency_ms=latency + ) + else: + return CheckResult( + name="TTS服务", + status=HealthStatus.UNHEALTHY, + message=f"端口不可达 {host}:{port} - {error}", + details={"host": host, "port": port, "error": error}, + latency_ms=latency + ) + + async def check_ai_service(self, base_url: str, api_key: Optional[str] = None) -> CheckResult: + """检查AI服务 - TCP端口连通性""" + host, port = self._parse_url(base_url) + + is_open, latency, error = await self._check_tcp_port(host, port) + + if is_open: + return CheckResult( + name="AI服务", + status=HealthStatus.HEALTHY, + message=f"端口可连通 {host}:{port}", + details={"host": host, "port": port, "base_url": base_url}, + latency_ms=latency + ) + else: + return CheckResult( + name="AI服务", + status=HealthStatus.UNHEALTHY, + message=f"端口不可达 {host}:{port} - {error}", + details={"host": host, "port": port, "error": error}, + latency_ms=latency + ) + + async def check_auto_agent(self, base_url: str) -> CheckResult: + """检查Auto Agent服务 - TCP端口连通性""" + host, port = self._parse_url(base_url) + + is_open, latency, error = await self._check_tcp_port(host, port) + + if is_open: + return CheckResult( + name="自动代理服务", + status=HealthStatus.HEALTHY, + message=f"端口可连通 {host}:{port}", + details={"host": host, "port": port}, + latency_ms=latency + ) + else: + return CheckResult( + name="自动代理服务", + status=HealthStatus.UNHEALTHY, + message=f"端口不可达 {host}:{port} - {error}", + details={"host": host, "port": port, "error": error}, + latency_ms=latency + ) + + async def check_config_file(self) -> CheckResult: + """检查配置文件合法性""" + try: + if not self.config_path.exists(): + return CheckResult( + name="配置文件", + status=HealthStatus.UNHEALTHY, + message=f"配置文件不存在: {self.config_path}" + ) + + with open(self.config_path, 'r', encoding='utf-8') as f: + content = f.read() + config = json.loads(content) + + required_sections = ['ai', 'tts', 'asr'] + missing = [s for s in required_sections if s not in config] + + if missing: + return CheckResult( + name="配置文件", + status=HealthStatus.UNHEALTHY, + message=f"缺少配置节: {', '.join(missing)}", + details={"missing_sections": missing} + ) + + return CheckResult( + name="配置文件", + status=HealthStatus.HEALTHY, + message=f"配置合法,含 {len(config)} 个配置节", + details={"sections": list(config.keys())} + ) + + except json.JSONDecodeError as e: + return CheckResult( + name="配置文件", + status=HealthStatus.UNHEALTHY, + message=f"JSON格式错误: {str(e)}" + ) + except Exception as e: + return CheckResult( + name="配置文件", + status=HealthStatus.UNHEALTHY, + message=f"读取失败: {str(e)}" + ) + + async def check_model_files(self, config: Optional[Dict] = None) -> CheckResult: + """检查模型文件存在性""" + try: + if config is None: + with open(self.config_path, 'r', encoding='utf-8') as f: + config = json.load(f) + + checks = {} + missing = [] + project_root = self.config_path.parent + + if 'tts' in config: + tts = config['tts'] + gpt_path = tts.get('gpt_model_name', '') + sovits_path = tts.get('sovits_model_name', '') + + if gpt_path: + full_path = project_root / gpt_path + exists = full_path.exists() + checks['gpt_model'] = {"path": str(full_path), "exists": exists} + if not exists: + missing.append(f"GPT模型: {gpt_path}") + + if sovits_path: + full_path = project_root / sovits_path + exists = full_path.exists() + checks['sovits_model'] = {"path": str(full_path), "exists": exists} + if not exists: + missing.append(f"SoVITS模型: {sovits_path}") + + if missing: + return CheckResult( + name="模型文件", + status=HealthStatus.UNHEALTHY, + message=f"缺少 {len(missing)} 个模型文件", + details={"missing": missing, "checks": checks} + ) + + return CheckResult( + name="模型文件", + status=HealthStatus.HEALTHY, + message="所有配置模型文件已找到", + details={"checks": checks} + ) + + except Exception as e: + return CheckResult( + name="模型文件", + status=HealthStatus.UNKNOWN, + message=f"检查失败: {str(e)}" + ) + + async def check_ports(self, ports: Optional[List[int]] = None) -> CheckResult: + """检查关键端口占用情况""" + if ports is None: + ports = [8089, 20260, 20261, 8765] + + try: + current_pid = psutil.Process().pid + current_ports = set() + + proc = psutil.Process(current_pid) + for conn in proc.connections(kind='inet'): + if conn.status == 'LISTEN': + current_ports.add(conn.lport) + + occupied_by_others = [] + for port in ports: + if port in current_ports: + continue + + for p in psutil.process_iter(['pid', 'name']): + try: + for conn in p.connections(kind='inet'): + if conn.lport == port: + occupied_by_others.append({ + "port": port, + "pid": p.pid, + "name": p.name() + }) + break + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + if occupied_by_others: + return CheckResult( + name="端口占用", + status=HealthStatus.UNHEALTHY, + message=f"{len(occupied_by_others)} 个端口被占用", + details={"conflicts": occupied_by_others} + ) + + return CheckResult( + name="端口占用", + status=HealthStatus.HEALTHY, + message="关键端口可用", + details={"checked_ports": list(ports), "self_ports": list(current_ports)} + ) + + except Exception as e: + return CheckResult( + name="端口占用", + status=HealthStatus.UNKNOWN, + message=f"检查失败: {str(e)}" + ) + + async def run_full_diagnostics(self) -> DiagnosticsReport: + """执行完整系统体检""" + logger.info("开始系统体检(TCP模式)...") + checks = [] + + checks.append(await self.check_config_file()) + checks.append(await self.check_ports()) + + service_results = await self._check_services_tcp() + checks.extend(service_results) + + checks.append(await self.check_model_files()) + + summary = { + "healthy": sum(1 for c in checks if c.status == HealthStatus.HEALTHY), + "unhealthy": sum(1 for c in checks if c.status == HealthStatus.UNHEALTHY), + "unknown": sum(1 for c in checks if c.status == HealthStatus.UNKNOWN), + "total": len(checks) + } + + if summary["unhealthy"] == 0: + overall = HealthStatus.HEALTHY + elif summary["unhealthy"] <= summary["healthy"]: + overall = HealthStatus.UNKNOWN + else: + overall = HealthStatus.UNHEALTHY + + report = DiagnosticsReport( + overall_status=overall, + checks=checks, + summary=summary + ) + + logger.info(f"体检完成: {summary['healthy']}/{summary['total']} 项正常") + return report + + async def _check_services_tcp(self) -> List[CheckResult]: + """检查各项服务 - 纯TCP连通性""" + results = [] + + try: + with open(self.config_path, 'r', encoding='utf-8') as f: + config = json.load(f) + + if 'asr' in config: + asr_url = config['asr'].get('url', 'http://localhost:20260') + results.append(await self.check_asr(asr_url)) + else: + results.append(CheckResult( + name="ASR服务", + status=HealthStatus.UNKNOWN, + message="配置中未启用ASR" + )) + + if 'tts' in config: + tts_cfg = config['tts'] + results.append(await self.check_tts( + tts_cfg.get('host', 'localhost'), + tts_cfg.get('port', 20261) + )) + else: + results.append(CheckResult( + name="TTS服务", + status=HealthStatus.UNKNOWN, + message="配置中未启用TTS" + )) + + if 'ai' in config: + ai_cfg = config['ai'] + results.append(await self.check_ai_service( + ai_cfg.get('base_url', 'http://localhost:1234/v1') + )) + else: + results.append(CheckResult( + name="AI服务", + status=HealthStatus.UNKNOWN, + message="配置中未启用AI" + )) + + if 'auto_agent' in config: + aa_cfg = config['auto_agent'] + results.append(await self.check_auto_agent( + aa_cfg.get('base_url', 'http://localhost:1234/v1') + )) + else: + results.append(CheckResult( + name="自动代理服务", + status=HealthStatus.UNKNOWN, + message="配置中未启用自动代理" + )) + + except Exception as e: + logger.error(f"服务检查失败: {e}") + results.append(CheckResult( + name="服务检查", + status=HealthStatus.UNHEALTHY, + message=f"配置加载失败: {str(e)}" + )) + + return results + + async def quick_check_module(self, module_name: str, config: Dict) -> CheckResult: + """快速检查单个模块 - TCP连通性""" + if module_name == "asr": + url = config.get('url', 'http://localhost:20260') + return await self.check_asr(url) + elif module_name == "tts": + return await self.check_tts( + config.get('host', 'localhost'), + config.get('port', 20261) + ) + elif module_name == "ai": + return await self.check_ai_service( + config.get('base_url', 'http://localhost:1234/v1') + ) + elif module_name == "auto_agent": + return await self.check_auto_agent( + config.get('base_url', 'http://localhost:1234/v1') + ) + elif module_name == "llm_core": + from src.server_view.backend.core_manager import get_status + status = get_status() + if status.is_running: + return CheckResult( + name="LLM核心", + status=HealthStatus.HEALTHY, + message="核心进程运行中", + details={"uptime": status.uptime, "pid": status.pid} + ) + else: + return CheckResult( + name="LLM核心", + status=HealthStatus.UNHEALTHY, + message="核心进程未启动" + ) + else: + return CheckResult( + name=module_name, + status=HealthStatus.UNKNOWN, + message="未知模块" + ) + +_diagnostics_instance: Optional[SystemDiagnostics] = None + +async def get_diagnostics() -> SystemDiagnostics: + """获取诊断实例(单例)""" + global _diagnostics_instance + if _diagnostics_instance is None: + _diagnostics_instance = SystemDiagnostics() + return _diagnostics_instance \ No newline at end of file diff --git a/src/server_view/frontend/.gitignore b/src/server_view/frontend/.gitignore new file mode 100644 index 0000000..d031e61 --- /dev/null +++ b/src/server_view/frontend/.gitignore @@ -0,0 +1,21 @@ +# Python-generated files +__pycache__/ +*.py[oc] +build/ +dist/ +wheels/ +*.egg-info + +# from ide +.vscode +.idea + +# Virtual environments +.venv +venv + +node_modules +node_modules/ +dist +dist/ +pnpm-lock.yaml diff --git a/src/server_view/frontend/index.html b/src/server_view/frontend/index.html new file mode 100644 index 0000000..a83b93f --- /dev/null +++ b/src/server_view/frontend/index.html @@ -0,0 +1,13 @@ + + +
+ + + +settings.json