// // Created by misaki on 2025/9/2. // #include "CommClass.h" #include // 引入字符串处理库头文件 #include // 时间库头文件 #include #include #include #include // 静态成员初始化 WebSocketManager* WebSocketManager::instance = nullptr; std::mutex WebSocketManager::instance_mutex; // 标签用于日志 static const char* TAG = "WebSocketManager"; // 构造函数 WebSocketManager::WebSocketManager() : client(nullptr), threads_running(false), connected(false), connecting(false), reconnect_attempts(0), stats{0, 0, 0, 0}, wifi(WifiConnectors::getInstance()) { // 初始化统计信息 stats = {0, 0, 0, 0}; } // 析构函数 WebSocketManager::~WebSocketManager() { disconnect(); // 停止线程 threads_running = false; queue_cv.notify_all(); if (reconnect_thread.joinable()) { reconnect_thread.join(); } if (heartbeat_thread.joinable()) { heartbeat_thread.join(); } if (send_thread.joinable()) { send_thread.join(); } } WebSocketManager* WebSocketManager::getInstance() { std::lock_guard lock(instance_mutex); if (instance == nullptr) { instance = new WebSocketManager(); } return instance; } bool WebSocketManager::initialize(const WebSocketConfig& config) { this->config = config; // 注册WiFi事件处理 esp_event_handler_instance_t instance; esp_event_handler_instance_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &WebSocketManager::wifiEventHandler, this, &instance); // 启动发送线程 threads_running = true; ThreadConfig thread_config; thread_config.name = "WS_Send"; thread_config.stack_size = 4096; send_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::sendThread); return true; } bool WebSocketManager::connect() { if (connecting || connected) { ESP_LOGI(TAG, "Already connected or connecting"); return true; } // 检查WiFi连接 if (!wifi->isWifiConnect()) { ESP_LOGE(TAG, "WiFi not connected, cannot establish WebSocket connection"); if (event_callback) { event_callback(WebSocketEvent::ERROR, "WiFi not connected"); } return false; } connecting = true; stats.connection_attempts++; // 创建WebSocket客户端 if (!createWebSocketClient()) { connecting = false; return false; } // 启动WebSocket客户端 esp_err_t err = esp_websocket_client_start(client); if (err != ESP_OK) { ESP_LOGE(TAG, "Failed to start WebSocket client: %d", err); destroyWebSocketClient(); connecting = false; return false; } // 启动重连和心跳线程 ThreadConfig thread_config; thread_config.name = "WS_Reconnect"; thread_config.stack_size = 3072; reconnect_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::reconnectThread); thread_config.name = "WS_Heartbeat"; heartbeat_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::heartbeatThread); return true; } void WebSocketManager::disconnect() { if (client) { esp_websocket_client_stop(client); destroyWebSocketClient(); } connected = false; connecting = false; // 通知事件回调 if (event_callback) { event_callback(WebSocketEvent::DISCONNECTED, "Disconnected by user"); } } bool WebSocketManager::sendJson(cJSON* json) { if (!connected) { // 检查连接状态 ESP_LOGE(TAG, "Not connected, cannot send data"); cJSON_Delete(json); return false; } char* json_str = cJSON_PrintUnformatted(json); // 将JSON对象转换为字符串 cJSON_Delete(json); // 释放JSON对象 if (!json_str) { // 检查转换结果 ESP_LOGE(TAG, "Failed to stringify JSON"); return false; } std::lock_guard lock(queue_mutex); // 锁定队列 send_queue.push(json_str); // 添加到队列 free(json_str); // 释放内存 queue_cv.notify_one(); // 通知发送线程 return true; } bool WebSocketManager::sendRaw(const std::string& data) { if (!connected) { // 检查连接状态 ESP_LOGE(TAG, "Not connected, cannot send data"); return false; } std::lock_guard lock(queue_mutex); // 锁定队列 send_queue.push(data); // 添加到队列 queue_cv.notify_one(); // 通知发送线程 return true; } void WebSocketManager::setJsonCallback(JsonDataCallback callback) { json_callback = callback; } void WebSocketManager::setEventCallback(EventCallback callback) { event_callback = callback; } bool WebSocketManager::isConnected() const { return connected; } WebSocketConfig WebSocketManager::getConfig() const { return config; } void WebSocketManager::updateConfig(const WebSocketConfig& config) { this->config = config; } WebSocketManager::Stats WebSocketManager::getStats() const { return stats; } void WebSocketManager::websocketEventHandler(void* handler_args, esp_event_base_t base, int32_t event_id, void* event_data) { WebSocketManager* instance = static_cast(handler_args); esp_websocket_event_data_t* data = (esp_websocket_event_data_t*)event_data; switch(event_id) { case WEBSOCKET_EVENT_CONNECTED: instance->connected = true; instance->connecting = false; instance->reconnect_attempts = 0; instance->stats.successful_connections++; ESP_LOGI(TAG, "WebSocket connected"); if (instance->event_callback) { instance->event_callback(WebSocketEvent::CONNECTED, "Connected successfully"); } break; case WEBSOCKET_EVENT_DISCONNECTED: instance->connected = false; instance->connecting = false; ESP_LOGI(TAG, "WebSocket disconnected"); if (instance->event_callback) { instance->event_callback(WebSocketEvent::DISCONNECTED, "Disconnected"); } break; case WEBSOCKET_EVENT_DATA: if (data->data_len > 0) { instance->stats.messages_received++; instance->handleReceivedData((const char*)data->data_ptr, data->data_len); } break; case WEBSOCKET_EVENT_ERROR: instance->connected = false; instance->connecting = false; ESP_LOGE(TAG, "WebSocket error"); if (instance->event_callback) { instance->event_callback(WebSocketEvent::ERROR, "WebSocket error occurred"); } break; } } void WebSocketManager::handleReceivedData(const char* data, int len) { // 尝试解析JSON cJSON* json = cJSON_ParseWithLength(data, len); if (json) { // 成功解析为JSON if (json_callback) { json_callback(json); } else { cJSON_Delete(json); } } else { // 不是JSON格式,作为原始数据处理 std::string message(data, len); if (event_callback) { event_callback(WebSocketEvent::DATA_RECEIVED, message); } } } void WebSocketManager::reconnectThread() { while (threads_running) { if (!connected && config.auto_reconnect && (config.max_reconnect_attempts == 0 || reconnect_attempts < config.max_reconnect_attempts)) { // 检查WiFi连接 if (!wifi->isWifiConnect()) { ESP_LOGI(TAG, "WiFi not connected, waiting before WebSocket reconnect"); std::this_thread::sleep_for(std::chrono::milliseconds(config.reconnect_interval)); continue; } reconnect_attempts++; ESP_LOGI(TAG, "Attempting to reconnect (%ld/%d)", reconnect_attempts, config.max_reconnect_attempts); if (connect()) { ESP_LOGI(TAG, "Reconnection attempt initiated"); } else { ESP_LOGE(TAG, "Reconnection attempt failed"); } } std::this_thread::sleep_for(std::chrono::milliseconds(config.reconnect_interval)); } } void WebSocketManager::heartbeatThread() { while (threads_running) { if (connected && config.heartbeat_interval > 0) { // 发送心跳消息 cJSON* heartbeat = cJSON_CreateObject(); cJSON_AddStringToObject(heartbeat, "type", "heartbeat"); cJSON_AddNumberToObject(heartbeat, "timestamp", esp_log_timestamp()); sendJson(heartbeat); std::this_thread::sleep_for(std::chrono::milliseconds(config.heartbeat_interval)); } else { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } } void WebSocketManager::sendThread() { while (threads_running) { std::unique_lock lock(queue_mutex); queue_cv.wait(lock, [this]() { return !send_queue.empty() || !threads_running; }); if (!threads_running) { break; } if (!send_queue.empty() && connected) { std::string data = std::move(send_queue.front()); send_queue.pop(); lock.unlock(); // 发送数据 int sent = esp_websocket_client_send_text(client, data.c_str(), static_cast(data.length()), portMAX_DELAY); if (sent >= 0) { stats.messages_sent++; } else { ESP_LOGE(TAG, "Failed to send data"); // 将数据重新放回队列 lock.lock(); send_queue.push(std::move(data)); lock.unlock(); } } } } bool WebSocketManager::createWebSocketClient() { esp_websocket_client_config_t ws_config = {}; ws_config.uri = config.uri.c_str(); ws_config.user_context = this; client = esp_websocket_client_init(&ws_config); if (!client) { ESP_LOGE(TAG, "Failed to initialize WebSocket client"); return false; } // 注册事件处理 esp_websocket_register_events(client, WEBSOCKET_EVENT_ANY, &WebSocketManager::websocketEventHandler, this); return true; } void WebSocketManager::destroyWebSocketClient() { if (client) { esp_websocket_client_stop(client); esp_websocket_client_destroy(client); client = nullptr; } } void WebSocketManager::wifiEventHandler(void* arg, esp_event_base_t event_base, int32_t event_id, void* event_data) { WebSocketManager* instance = static_cast(arg); if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) { // WiFi已连接,尝试重新连接WebSocket if (instance->config.auto_reconnect && !instance->connected) { ESP_LOGI(TAG, "WiFi connected, attempting to reconnect WebSocket"); instance->connect(); } } }