// // Created by misaki on 2025/9/2. // #include "CommClass.h" #include // 引入字符串处理库头文件 #include // 时间库头文件 #include #include #include #include #include "ToolsClass.h" #include "sys_conf_singleton.h" // 静态成员初始化 WebSocketManager* WebSocketManager::instance = nullptr; std::mutex WebSocketManager::instance_mutex; std::string WebSocketManager::sn = SysConfJson::getInstance()->loadSN(); // 获取SN(同时也能初始化文件系统,如果还没有初始化这个文件系统的话) // 标签用于日志 static const char* TAG = "WebSocketManager"; // 构造函数 WebSocketManager::WebSocketManager() : client(nullptr), threads_running(false), connected(false), connecting(false), reconnect_attempts(0), stats{0, 0, 0, 0}, wifi(WifiConnectors::getInstance()) { } // 析构函数 WebSocketManager::~WebSocketManager() { disconnect(); // 停止线程 threads_running = false; queue_cv.notify_all(); if (reconnect_thread.joinable()) { reconnect_thread.join(); } if (heartbeat_thread.joinable()) { heartbeat_thread.join(); } if (send_thread.joinable()) { send_thread.join(); } } WebSocketManager* WebSocketManager::getInstance() { std::lock_guard lock(instance_mutex); // 加锁 if (instance == nullptr) { instance = new WebSocketManager(); } return instance; } bool WebSocketManager::initialize(const WebSocketConfig& ws_config) { this->config = ws_config; // 注册WiFi事件处理 esp_event_handler_instance_t wifi_instance; esp_event_handler_instance_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &WebSocketManager::wifiEventHandler, this, &wifi_instance); // 启动发送线程 // 只在线程未运行时启动发送线程 if (!send_thread.joinable()) { threads_running = true; ThreadConfig thread_config; thread_config.name = "WS_Send"; thread_config.stack_size = 4096; send_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::sendThread); } return true; } bool WebSocketManager::connect() { if (connecting || connected) { ESP_LOGI(TAG, "Already connected or connecting"); return true; } // 检查WiFi连接 if (!wifi->isWifiConnect()) { ESP_LOGE(TAG, "WiFi not connected, cannot establish WebSocket connection"); if (event_callback) { event_callback(WebSocketEvent::ERROR, "WiFi not connected"); } return false; } connecting = true; stats.connection_attempts++; // 创建WebSocket客户端 if (!createWebSocketClient()) { connecting = false; return false; } // 启动WebSocket客户端 esp_err_t err = esp_websocket_client_start(client); if (err != ESP_OK) { ESP_LOGE(TAG, "Failed to start WebSocket client: %d", err); destroyWebSocketClient(); connecting = false; return false; } // 启动重连和心跳线程 // 只有在线程未运行时才启动新线程 if (!reconnect_thread.joinable()) { ThreadConfig thread_config; thread_config.name = "WS_Reconnect"; thread_config.stack_size = 3072; reconnect_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::reconnectThread); } if (!heartbeat_thread.joinable()) { ThreadConfig thread_config; thread_config.name = "WS_Heartbeat"; thread_config.stack_size = 3072; heartbeat_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::heartbeatThread); } return true; } void WebSocketManager::disconnect() { if (client) { esp_websocket_client_stop(client); destroyWebSocketClient(); } connected = false; connecting = false; // 通知事件回调 if (event_callback) { event_callback(WebSocketEvent::DISCONNECTED, "Disconnected by user"); } } bool WebSocketManager::sendJson(const cppjson::Json& json) { if (!connected) { // 检查连接状态 ESP_LOGE(TAG, "Not connected, cannot send data"); return false; } std::string json_str = json.dump(); // 序列化 JSON if (json_str.empty() || json_str == "null") { // 检查序列化结果 ESP_LOGE(TAG, "Failed to stringify JSON"); return false; } { // 临界区 std::lock_guard lock(queue_mutex); send_queue.emplace(std::move(json_str)); // 移动进队列,零拷贝 } queue_cv.notify_one(); // 通知发送线程 return true; } bool WebSocketManager::sendRaw(const std::string& data) { if (!connected) { // 检查连接状态 ESP_LOGE(TAG, "Not connected, cannot send data"); return false; } std::lock_guard lock(queue_mutex); // 锁定队列 send_queue.push(data); // 添加到队列 queue_cv.notify_one(); // 通知发送线程 return true; } void WebSocketManager::setJsonCallback(const JsonDataCallback &callback) { json_callback = callback; } void WebSocketManager::setEventCallback(const EventCallback &callback) { event_callback = callback; } bool WebSocketManager::isConnected() const { return connected; } WebSocketConfig WebSocketManager::getConfig() const { return config; } void WebSocketManager::updateConfig(const WebSocketConfig& ws_config) { this->config = ws_config; } WebSocketManager::Stats WebSocketManager::getStats() const { return stats; } void WebSocketManager::websocketEventHandler(void* handler_args, esp_event_base_t base, int32_t event_id, void* event_data) { auto* ws_instance = static_cast(handler_args); auto* data = static_cast(event_data); switch(event_id) { case WEBSOCKET_EVENT_CONNECTED: ws_instance->connected = true; ws_instance->connecting = false; ws_instance->reconnect_attempts = 0; ws_instance->stats.successful_connections++; ESP_LOGI(TAG, "WebSocket connected"); if (ws_instance->event_callback) { ws_instance->event_callback(WebSocketEvent::CONNECTED, "Connected successfully"); } break; case WEBSOCKET_EVENT_DISCONNECTED: ws_instance->connected = false; ws_instance->connecting = false; ESP_LOGI(TAG, "WebSocket disconnected"); if (ws_instance->event_callback) { ws_instance->event_callback(WebSocketEvent::DISCONNECTED, "Disconnected"); } break; case WEBSOCKET_EVENT_DATA: if (data->data_len > 0) { ws_instance->stats.messages_received++; ws_instance->handleReceivedData((const char*)data->data_ptr, data->data_len); } break; case WEBSOCKET_EVENT_ERROR: // 错误事件 ws_instance->connected = false; ws_instance->connecting = false; ESP_LOGE(TAG, "WebSocket error"); if (ws_instance->event_callback) { ws_instance->event_callback(WebSocketEvent::ERROR, "WebSocket error occurred"); } break; default: ESP_LOGW(TAG, "Unhandled WebSocket event: %ld", event_id); break; } } void WebSocketManager::handleReceivedData(const char* data, const int len) const { // 解析JSON cppjson::Json json = cppjson::Json::parse(std::string(data, len)); // 失败会得到空 Json if (!json.isNull()) { // 解析成功 if (json_callback) json_callback(json); } else { // 解析失败,当原始文本处理 if (event_callback) event_callback(WebSocketEvent::DATA_RECEIVED, std::string(data, len)); } } void WebSocketManager::reconnectThread() { while (threads_running) { if (!connected && config.auto_reconnect && (config.max_reconnect_attempts == 0 || reconnect_attempts < config.max_reconnect_attempts)) { // 检查重连条件 // 检查WiFi连接 if (!wifi->isWifiConnect()) { ESP_LOGI(TAG, "WiFi not connected, waiting before WebSocket reconnect"); std::this_thread::sleep_for(std::chrono::milliseconds(config.reconnect_interval)); continue; } reconnect_attempts++; ESP_LOGI(TAG, "Attempting to reconnect (%ld/%d)", reconnect_attempts, config.max_reconnect_attempts); if (connect()) { ESP_LOGI(TAG, "Reconnection attempt initiated"); } else { ESP_LOGE(TAG, "Reconnection attempt failed"); } } // sleep std::this_thread::sleep_for(std::chrono::milliseconds(config.reconnect_interval)); } } void WebSocketManager::heartbeatThread() { while (threads_running) { if (connected && config.heartbeat_interval > 0) { // 如果处于连接状态且心跳间隔大于0 // 发送心跳消息 cppjson::Json hb = cppjson::Json::object(); hb.set("type", cppjson::Json("heartbeat")) .set("timestamp", cppjson::Json(esp_log_timestamp())); sendJson(hb); // 已经重载好了,直接塞 std::this_thread::sleep_for(std::chrono::milliseconds(config.heartbeat_interval)); } else { // 否则就让出CPU std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } } void WebSocketManager::sendThread() { while (threads_running) { std::unique_lock lock(queue_mutex); queue_cv.wait(lock, // 传入已持有的 unique_lock [this]() { // 等待队列非空或者线程停止 return !send_queue.empty() || !threads_running; }); if (!threads_running) { // 其实永远不会进入这里,写上只是保险起见 break; } if (!send_queue.empty() && connected) { // 如果队列非空且已连接 std::string data = std::move(send_queue.front()); send_queue.pop(); lock.unlock(); // 释放锁 // 发送数据 int sent = esp_websocket_client_send_text(client, data.c_str(), static_cast(data.length()), portMAX_DELAY); if (sent >= 0) { stats.messages_sent++; // 发送成功 } else { ESP_LOGE(TAG, "Failed to send data"); // 将数据重新放回队列 lock.lock(); send_queue.push(std::move(data)); lock.unlock(); } } } } bool WebSocketManager::createWebSocketClient() { esp_websocket_client_config_t ws_config = {}; ws_config.uri = config.uri.c_str(); ws_config.user_context = this; client = esp_websocket_client_init(&ws_config); if (!client) { ESP_LOGE(TAG, "Failed to initialize WebSocket client"); return false; } // 处理sn码逻辑 // 一般情况下sn码会在开机的时候从flash当中读出,因此此处判断是否为空 sn = SysConfJson::getInstance()->loadSN(); // 再读一次sn码 if (!sn.empty()) { // 如果sn码不为空 // 则在头部加入sn码提交 esp_websocket_client_append_header(client, "X-SN", sn.c_str()); // 同时在头部加入mac码和芯片序列号,方便服务端做验证 esp_websocket_client_append_header(client, "X-MAC", ToolsClass::getChipMAC().c_str()); esp_websocket_client_append_header(client, "X-CHIP-ID", ToolsClass::getChipSerialNumber().c_str()); } else { // 如果sn码为空,设备可能存在问题 ESP_LOGE(TAG, "SN is empty, please check your device"); return false; } // 注册事件处理 esp_websocket_register_events(client, WEBSOCKET_EVENT_ANY, &WebSocketManager::websocketEventHandler, this); return true; } void WebSocketManager::destroyWebSocketClient() { if (client) { esp_websocket_client_stop(client); esp_websocket_client_destroy(client); client = nullptr; } } void WebSocketManager::wifiEventHandler(void* arg, esp_event_base_t event_base, int32_t event_Id, void* event_data) { auto* ws_instance = static_cast(arg); if (event_base == IP_EVENT && event_Id == IP_EVENT_STA_GOT_IP) { // WiFi已连接,尝试重新连接WebSocket if (ws_instance->config.auto_reconnect && !ws_instance->connected) { ESP_LOGI(TAG, "WiFi connected, attempting to reconnect WebSocket"); ws_instance->connect(); } } }