From f76f31aa126028029e60e28b25bd18f07acda13f Mon Sep 17 00:00:00 2001 From: Xiaoxia Date: Sun, 13 Apr 2025 23:12:44 +0800 Subject: [PATCH] Prevent too many opus packets in queue --- main/application.cc | 19 ++++++++++++++++++- main/application.h | 1 + main/display/display.h | 2 +- main/protocols/mqtt_protocol.cc | 4 ++++ main/protocols/protocol.cc | 4 ++++ main/protocols/protocol.h | 2 ++ main/protocols/websocket_protocol.cc | 3 +++ 7 files changed, 33 insertions(+), 2 deletions(-) diff --git a/main/application.cc b/main/application.cc index af3f9d9..3147c1b 100644 --- a/main/application.cc +++ b/main/application.cc @@ -362,8 +362,11 @@ void Application::Start() { Alert(Lang::Strings::ERROR, message.c_str(), "sad", Lang::Sounds::P3_EXCLAMATION); }); protocol_->OnIncomingAudio([this](std::vector&& data) { + const int max_packets_in_queue = 300 / OPUS_FRAME_DURATION_MS; std::lock_guard lock(mutex_); - audio_decode_queue_.emplace_back(std::move(data)); + if (audio_decode_queue_.size() < max_packets_in_queue) { + audio_decode_queue_.emplace_back(std::move(data)); + } }); protocol_->OnAudioChannelOpened([this, codec, &board]() { board.SetPowerSaveMode(false); @@ -451,6 +454,9 @@ void Application::Start() { audio_processor_.Initialize(codec, realtime_chat_enabled_); audio_processor_.OnOutput([this](std::vector&& data) { background_task_->Schedule([this, data = std::move(data)]() mutable { + if (protocol_->IsAudioChannelBusy()) { + return; + } opus_encoder_->Encode(std::move(data), [this](std::vector&& opus) { Schedule([this, opus = std::move(opus)]() { protocol_->SendAudio(opus); @@ -524,6 +530,8 @@ void Application::OnClockTimer() { // Print the debug info every 10 seconds if (clock_ticks_ % 10 == 0) { + // SystemInfo::PrintRealTimeStats(pdMS_TO_TICKS(1000)); + int free_sram = heap_caps_get_free_size(MALLOC_CAP_INTERNAL); int min_free_sram = heap_caps_get_minimum_free_size(MALLOC_CAP_INTERNAL); ESP_LOGI(TAG, "Free internal: %u minimal internal: %u", free_sram, min_free_sram); @@ -582,6 +590,10 @@ void Application::AudioLoop() { } void Application::OnAudioOutput() { + if (busy_decoding_audio_) { + return; + } + auto now = std::chrono::steady_clock::now(); auto codec = Board::GetInstance().GetAudioCodec(); const int max_silence_seconds = 10; @@ -609,7 +621,9 @@ void Application::OnAudioOutput() { lock.unlock(); audio_decode_cv_.notify_all(); + busy_decoding_audio_ = true; background_task_->Schedule([this, codec, opus = std::move(opus)]() mutable { + busy_decoding_audio_ = false; if (aborted_) { return; } @@ -651,6 +665,9 @@ void Application::OnAudioInput() { std::vector data; ReadAudio(data, 16000, 30 * 16000 / 1000); background_task_->Schedule([this, data = std::move(data)]() mutable { + if (protocol_->IsAudioChannelBusy()) { + return; + } opus_encoder_->Encode(std::move(data), [this](std::vector&& opus) { Schedule([this, opus = std::move(opus)]() { protocol_->SendAudio(opus); diff --git a/main/application.h b/main/application.h index e5d89ea..0fe6d04 100644 --- a/main/application.h +++ b/main/application.h @@ -99,6 +99,7 @@ private: #endif bool aborted_ = false; bool voice_detected_ = false; + bool busy_decoding_audio_ = false; int clock_ticks_ = 0; TaskHandle_t check_new_version_task_handle_ = nullptr; diff --git a/main/display/display.h b/main/display/display.h index 8342a32..ab47d8c 100644 --- a/main/display/display.h +++ b/main/display/display.h @@ -67,7 +67,7 @@ protected: class DisplayLockGuard { public: DisplayLockGuard(Display *display) : display_(display) { - if (!display_->Lock(3000)) { + if (!display_->Lock(30000)) { ESP_LOGE("Display", "Failed to lock display"); } } diff --git a/main/protocols/mqtt_protocol.cc b/main/protocols/mqtt_protocol.cc index 59984d1..d5ac563 100644 --- a/main/protocols/mqtt_protocol.cc +++ b/main/protocols/mqtt_protocol.cc @@ -133,7 +133,10 @@ void MqttProtocol::SendAudio(const std::vector& data) { ESP_LOGE(TAG, "Failed to encrypt audio data"); return; } + + busy_sending_audio_ = true; udp_->Send(encrypted); + busy_sending_audio_ = false; } void MqttProtocol::CloseAudioChannel() { @@ -164,6 +167,7 @@ bool MqttProtocol::OpenAudioChannel() { } } + busy_sending_audio_ = false; error_occurred_ = false; session_id_ = ""; xEventGroupClearBits(event_group_handle_, MQTT_PROTOCOL_SERVER_HELLO_EVENT); diff --git a/main/protocols/protocol.cc b/main/protocols/protocol.cc index e4e0b56..b89bed6 100644 --- a/main/protocols/protocol.cc +++ b/main/protocols/protocol.cc @@ -126,3 +126,7 @@ bool Protocol::IsTimeout() const { return timeout; } +bool Protocol::IsAudioChannelBusy() const { + return busy_sending_audio_; +} + diff --git a/main/protocols/protocol.h b/main/protocols/protocol.h index df7a36b..1dbd268 100644 --- a/main/protocols/protocol.h +++ b/main/protocols/protocol.h @@ -48,6 +48,7 @@ public: virtual bool OpenAudioChannel() = 0; virtual void CloseAudioChannel() = 0; virtual bool IsAudioChannelOpened() const = 0; + virtual bool IsAudioChannelBusy() const; virtual void SendAudio(const std::vector& data) = 0; virtual void SendWakeWordDetected(const std::string& wake_word); virtual void SendStartListening(ListeningMode mode); @@ -66,6 +67,7 @@ protected: int server_sample_rate_ = 24000; int server_frame_duration_ = 60; bool error_occurred_ = false; + bool busy_sending_audio_ = false; std::string session_id_; std::chrono::time_point last_incoming_time_; diff --git a/main/protocols/websocket_protocol.cc b/main/protocols/websocket_protocol.cc index db61093..64e0740 100644 --- a/main/protocols/websocket_protocol.cc +++ b/main/protocols/websocket_protocol.cc @@ -30,7 +30,9 @@ void WebsocketProtocol::SendAudio(const std::vector& data) { return; } + busy_sending_audio_ = true; websocket_->Send(data.data(), data.size(), true); + busy_sending_audio_ = false; } bool WebsocketProtocol::SendText(const std::string& text) { @@ -63,6 +65,7 @@ bool WebsocketProtocol::OpenAudioChannel() { delete websocket_; } + busy_sending_audio_ = false; error_occurred_ = false; std::string url = CONFIG_WEBSOCKET_URL; std::string token = "Bearer " + std::string(CONFIG_WEBSOCKET_ACCESS_TOKEN);