// // Created by misaki on 2025/9/2. // #include "CommClass.h" #include // 引入字符串处理库头文件 #include // 时间库头文件 #include #include #include #include #include "ToolsClass.h" #include "sys_conf_singleton.h" // 静态成员初始化 WebSocketManager* WebSocketManager::instance = nullptr; std::mutex WebSocketManager::instance_mutex; std::string WebSocketManager::sn = SYS_CONF_JSON().loadSN(); // 获取SN(同时也能初始化文件系统,如果还没有初始化这个文件系统的话) // 标签用于日志 static const char* TAG = "WebSocketManager"; // 构造函数 WebSocketManager::WebSocketManager() : client(nullptr), threads_running(false), connected(false), connecting(false), reconnect_attempts(0), stats{0, 0, 0, 0}, wifi(WifiConnectors::getInstance()) { } // 析构函数 WebSocketManager::~WebSocketManager() { disconnect(); // 停止线程 threads_running = false; queue_cv.notify_all(); if (reconnect_thread.joinable()) { reconnect_thread.join(); } if (heartbeat_thread.joinable()) { heartbeat_thread.join(); } if (send_thread.joinable()) { send_thread.join(); } } WebSocketManager* WebSocketManager::getInstance() { std::lock_guard lock(instance_mutex); // 加锁 if (instance == nullptr) { instance = new WebSocketManager(); } return instance; } bool WebSocketManager::initialize(const WebSocketConfig& ws_config) { this->config = ws_config; // 注册WiFi事件处理 esp_event_handler_instance_t wifi_instance; esp_event_handler_instance_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &WebSocketManager::wifiEventHandler, this, &wifi_instance); // 启动发送线程 threads_running = true; ThreadConfig thread_config; thread_config.name = "WS_Send"; thread_config.stack_size = 4096; send_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::sendThread); return true; } bool WebSocketManager::connect() { if (connecting || connected) { ESP_LOGI(TAG, "Already connected or connecting"); return true; } // 检查WiFi连接 if (!wifi->isWifiConnect()) { ESP_LOGE(TAG, "WiFi not connected, cannot establish WebSocket connection"); if (event_callback) { event_callback(WebSocketEvent::ERROR, "WiFi not connected"); } return false; } connecting = true; stats.connection_attempts++; // 创建WebSocket客户端 if (!createWebSocketClient()) { connecting = false; return false; } // 启动WebSocket客户端 esp_err_t err = esp_websocket_client_start(client); if (err != ESP_OK) { ESP_LOGE(TAG, "Failed to start WebSocket client: %d", err); destroyWebSocketClient(); connecting = false; return false; } // 启动重连和心跳线程 ThreadConfig thread_config; thread_config.name = "WS_Reconnect"; thread_config.stack_size = 3072; reconnect_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::reconnectThread); thread_config.name = "WS_Heartbeat"; heartbeat_thread = ThreadManager::createMemberThread(thread_config, this, &WebSocketManager::heartbeatThread); return true; } void WebSocketManager::disconnect() { if (client) { esp_websocket_client_stop(client); destroyWebSocketClient(); } connected = false; connecting = false; // 通知事件回调 if (event_callback) { event_callback(WebSocketEvent::DISCONNECTED, "Disconnected by user"); } } bool WebSocketManager::sendJson(cJSON* json) { if (!connected) { // 检查连接状态 ESP_LOGE(TAG, "Not connected, cannot send data"); cJSON_Delete(json); return false; } char* json_str = cJSON_PrintUnformatted(json); // 将JSON对象转换为字符串 cJSON_Delete(json); // 释放JSON对象 if (!json_str) { // 检查转换结果 ESP_LOGE(TAG, "Failed to stringify JSON"); return false; } std::lock_guard lock(queue_mutex); // 锁定队列 send_queue.emplace(json_str); // 添加到队列 free(json_str); // 释放json_str的内存 queue_cv.notify_one(); // 通知发送线程 return true; } bool WebSocketManager::sendRaw(const std::string& data) { if (!connected) { // 检查连接状态 ESP_LOGE(TAG, "Not connected, cannot send data"); return false; } std::lock_guard lock(queue_mutex); // 锁定队列 send_queue.push(data); // 添加到队列 queue_cv.notify_one(); // 通知发送线程 return true; } void WebSocketManager::setJsonCallback(const JsonDataCallback &callback) { json_callback = callback; } void WebSocketManager::setEventCallback(const EventCallback &callback) { event_callback = callback; } bool WebSocketManager::isConnected() const { return connected; } WebSocketConfig WebSocketManager::getConfig() const { return config; } void WebSocketManager::updateConfig(const WebSocketConfig& ws_config) { this->config = ws_config; } WebSocketManager::Stats WebSocketManager::getStats() const { return stats; } void WebSocketManager::websocketEventHandler(void* handler_args, esp_event_base_t base, int32_t event_id, void* event_data) { auto* ws_instance = static_cast(handler_args); auto* data = static_cast(event_data); switch(event_id) { case WEBSOCKET_EVENT_CONNECTED: ws_instance->connected = true; ws_instance->connecting = false; ws_instance->reconnect_attempts = 0; ws_instance->stats.successful_connections++; ESP_LOGI(TAG, "WebSocket connected"); if (ws_instance->event_callback) { ws_instance->event_callback(WebSocketEvent::CONNECTED, "Connected successfully"); } break; case WEBSOCKET_EVENT_DISCONNECTED: ws_instance->connected = false; ws_instance->connecting = false; ESP_LOGI(TAG, "WebSocket disconnected"); if (ws_instance->event_callback) { ws_instance->event_callback(WebSocketEvent::DISCONNECTED, "Disconnected"); } break; case WEBSOCKET_EVENT_DATA: if (data->data_len > 0) { ws_instance->stats.messages_received++; ws_instance->handleReceivedData((const char*)data->data_ptr, data->data_len); } break; case WEBSOCKET_EVENT_ERROR: ws_instance->connected = false; ws_instance->connecting = false; ESP_LOGE(TAG, "WebSocket error"); if (ws_instance->event_callback) { ws_instance->event_callback(WebSocketEvent::ERROR, "WebSocket error occurred"); } break; default: ESP_LOGW(TAG, "Unhandled WebSocket event: %ld", event_id); break; } } void WebSocketManager::handleReceivedData(const char* data, const int len) const { // 尝试解析JSON if (cJSON* json = cJSON_ParseWithLength(data, len)) { // 成功解析为JSON if (json_callback) { json_callback(json); } else { cJSON_Delete(json); } } else { // 不是JSON格式,作为原始数据处理 std::string message(data, len); if (event_callback) { event_callback(WebSocketEvent::DATA_RECEIVED, message); } } } void WebSocketManager::reconnectThread() { while (threads_running) { if (!connected && config.auto_reconnect && (config.max_reconnect_attempts == 0 || reconnect_attempts < config.max_reconnect_attempts)) { // 检查WiFi连接 if (!wifi->isWifiConnect()) { ESP_LOGI(TAG, "WiFi not connected, waiting before WebSocket reconnect"); std::this_thread::sleep_for(std::chrono::milliseconds(config.reconnect_interval)); continue; } reconnect_attempts++; ESP_LOGI(TAG, "Attempting to reconnect (%ld/%d)", reconnect_attempts, config.max_reconnect_attempts); if (connect()) { ESP_LOGI(TAG, "Reconnection attempt initiated"); } else { ESP_LOGE(TAG, "Reconnection attempt failed"); } } std::this_thread::sleep_for(std::chrono::milliseconds(config.reconnect_interval)); } } void WebSocketManager::heartbeatThread() { while (threads_running) { if (connected && config.heartbeat_interval > 0) { // 如果处于连接状态且心跳间隔大于0 // 发送心跳消息 cJSON* heartbeat = cJSON_CreateObject(); cJSON_AddStringToObject(heartbeat, "type", "heartbeat"); cJSON_AddNumberToObject(heartbeat, "timestamp", static_cast(esp_log_timestamp())); sendJson(heartbeat); std::this_thread::sleep_for(std::chrono::milliseconds(config.heartbeat_interval)); } else { // 否则就让出CPU std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } } void WebSocketManager::sendThread() { while (threads_running) { std::unique_lock lock(queue_mutex); queue_cv.wait(lock, // 传入已持有的 unique_lock [this]() { // 等待队列非空或者线程停止 return !send_queue.empty() || !threads_running; }); if (!threads_running) { // 其实永远不会进入这里,写上只是保险起见 break; } if (!send_queue.empty() && connected) { // 如果队列非空且已连接 std::string data = std::move(send_queue.front()); send_queue.pop(); lock.unlock(); // 释放锁 // 发送数据 int sent = esp_websocket_client_send_text(client, data.c_str(), static_cast(data.length()), portMAX_DELAY); if (sent >= 0) { stats.messages_sent++; // 发送成功 } else { ESP_LOGE(TAG, "Failed to send data"); // 将数据重新放回队列 lock.lock(); send_queue.push(std::move(data)); lock.unlock(); } } } } bool WebSocketManager::createWebSocketClient() { esp_websocket_client_config_t ws_config = {}; ws_config.uri = config.uri.c_str(); ws_config.user_context = this; client = esp_websocket_client_init(&ws_config); if (!client) { ESP_LOGE(TAG, "Failed to initialize WebSocket client"); return false; } // 处理sn码逻辑 // 一般情况下sn码会在开机的时候从flash当中读出,因此此处判断是否为空 if (!sn.empty()) { // 如果sn码不为空 // 则在头部加入sn码提交 esp_websocket_client_append_header(client, "sn", sn.c_str()); } else { // 否则在头部加入mac码和芯片序列号 esp_websocket_client_append_header(client, "mac", ToolsClass::getChipMAC().c_str()); esp_websocket_client_append_header(client, "chip_id", ToolsClass::getChipSerialNumber().c_str()); } // 注册事件处理 esp_websocket_register_events(client, WEBSOCKET_EVENT_ANY, &WebSocketManager::websocketEventHandler, this); return true; } void WebSocketManager::destroyWebSocketClient() { if (client) { esp_websocket_client_stop(client); esp_websocket_client_destroy(client); client = nullptr; } } void WebSocketManager::wifiEventHandler(void* arg, esp_event_base_t event_base, int32_t event_Id, void* event_data) { auto* ws_instance = static_cast(arg); if (event_base == IP_EVENT && event_Id == IP_EVENT_STA_GOT_IP) { // WiFi已连接,尝试重新连接WebSocket if (ws_instance->config.auto_reconnect && !ws_instance->connected) { ESP_LOGI(TAG, "WiFi connected, attempting to reconnect WebSocket"); ws_instance->connect(); } } }