From 6846f8e8939d77e17c9355f86cd6bab247f5ae7d Mon Sep 17 00:00:00 2001 From: OSSRS-AI Date: Sun, 12 Oct 2025 23:03:07 -0400 Subject: [PATCH] AI: Add utest to cover recv thread module --- trunk/src/app/srs_app_recv_thread.cpp | 20 +- trunk/src/app/srs_app_recv_thread.hpp | 6 +- trunk/src/utest/srs_utest_app17.cpp | 309 ++++++++++++++++++++++++++ trunk/src/utest/srs_utest_app17.hpp | 42 ++++ 4 files changed, 364 insertions(+), 13 deletions(-) diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 697bed4b2..9034b3919 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -162,7 +162,7 @@ ISrsQueueRecvThread::~ISrsQueueRecvThread() SrsQueueRecvThread::SrsQueueRecvThread(SrsLiveConsumer *consumer, ISrsRtmpServer *rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid) { - _consumer = consumer; + consumer_ = consumer; rtmp_ = rtmp_sdk; recv_error_ = srs_success; trd_ = new SrsRecvThread(this, rtmp_sdk, tm, parent_cid); @@ -232,8 +232,8 @@ srs_error_t SrsQueueRecvThread::consume(SrsRtmpCommonMessage *msg) // @see SrsRtmpConn::process_play_control_msg queue_.push_back(msg); #ifdef SRS_PERF_QUEUE_COND_WAIT - if (_consumer) { - _consumer->wakeup(); + if (consumer_) { + consumer_->wakeup(); } #endif return srs_success; @@ -254,8 +254,8 @@ void SrsQueueRecvThread::interrupt(srs_error_t err) recv_error_ = srs_error_copy(err); #ifdef SRS_PERF_QUEUE_COND_WAIT - if (_consumer) { - _consumer->wakeup(); + if (consumer_) { + consumer_->wakeup(); } #endif } @@ -287,12 +287,12 @@ SrsPublishRecvThread::SrsPublishRecvThread(ISrsRtmpServer *rtmp_sdk, ISrsRequest { rtmp_ = rtmp_sdk; - _conn = conn; + conn_ = conn; source_ = source; nn_msgs_for_yield_ = 0; recv_error_ = srs_success; - _nb_msgs = 0; + nb_msgs_ = 0; video_frames_ = 0; error_ = srs_cond_new(); @@ -336,7 +336,7 @@ srs_error_t SrsPublishRecvThread::wait(srs_utime_t tm) int64_t SrsPublishRecvThread::nb_msgs() { - return _nb_msgs; + return nb_msgs_; } uint64_t SrsPublishRecvThread::nb_video_frames() @@ -387,7 +387,7 @@ srs_error_t SrsPublishRecvThread::consume(SrsRtmpCommonMessage *msg) cid_ = ncid_; } - _nb_msgs++; + nb_msgs_++; if (msg->header_.is_video()) { video_frames_++; @@ -398,7 +398,7 @@ srs_error_t SrsPublishRecvThread::consume(SrsRtmpCommonMessage *msg) srs_time_now_realtime(), msg->header_.timestamp, msg->size); // the rtmp connection will handle this message - err = _conn->handle_publish_message(source_, msg); + err = conn_->handle_publish_message(source_, msg); // must always free it, // the source will copy it if need to use. diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index f70574b74..6831bb93e 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -131,7 +131,7 @@ private: ISrsRtmpServer *rtmp_; // The recv thread error code. srs_error_t recv_error_; - SrsLiveConsumer *_consumer; + SrsLiveConsumer *consumer_; public: // TODO: FIXME: Refine timeout in time unit. @@ -182,7 +182,7 @@ private: ISrsRtmpServer *rtmp_; ISrsRequest *req_; // The msgs already got. - int64_t _nb_msgs; + int64_t nb_msgs_; // The video frames we got. uint64_t video_frames_; // For mr(merged read), @@ -194,7 +194,7 @@ private: bool realtime_; // The recv thread error code. srs_error_t recv_error_; - SrsRtmpConn *_conn; + SrsRtmpConn *conn_; // The params for conn callback. SrsSharedPtr source_; // The error timeout cond diff --git a/trunk/src/utest/srs_utest_app17.cpp b/trunk/src/utest/srs_utest_app17.cpp index de3a1aa37..bf2d1fe4b 100644 --- a/trunk/src/utest/srs_utest_app17.cpp +++ b/trunk/src/utest/srs_utest_app17.cpp @@ -11,11 +11,14 @@ using namespace std; #include #include #include +#include #include #include #include #include #include +#include +#include #include #include @@ -2855,3 +2858,309 @@ VOID TEST(HttpxConnTest, OnConnDoneWithNonTimeoutError) srs_freep(err); srs_freep(mock_manager); } + +// Mock ISrsRtmpServer implementation for SrsQueueRecvThread +MockRtmpServerForQueueRecvThread::MockRtmpServerForQueueRecvThread() +{ + set_auto_response_called_ = false; + auto_response_value_ = true; +} + +MockRtmpServerForQueueRecvThread::~MockRtmpServerForQueueRecvThread() +{ +} + +void MockRtmpServerForQueueRecvThread::set_recv_timeout(srs_utime_t tm) +{ +} + +void MockRtmpServerForQueueRecvThread::set_send_timeout(srs_utime_t tm) +{ +} + +srs_error_t MockRtmpServerForQueueRecvThread::handshake() +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::connect_app(ISrsRequest *req) +{ + return srs_success; +} + +uint32_t MockRtmpServerForQueueRecvThread::proxy_real_ip() +{ + return 0; +} + +srs_error_t MockRtmpServerForQueueRecvThread::set_window_ack_size(int ack_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::set_peer_bandwidth(int bandwidth, int type) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::set_chunk_size(int chunk_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::response_connect_app(ISrsRequest *req, const char *server_ip) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::on_bw_done() +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::start_play(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::start_fmle_publish(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::start_haivision_publish(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::fmle_unpublish(int stream_id, double unpublish_tid) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::start_flash_publish(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::start_publishing(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::redirect(ISrsRequest *r, std::string url, bool &accepted) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::send_and_free_packet(SrsRtmpCommand *packet, int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::on_play_client_pause(int stream_id, bool is_pause) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::set_in_window_ack_size(int ack_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForQueueRecvThread::recv_message(SrsRtmpCommonMessage **pmsg) +{ + return srs_success; +} + +void MockRtmpServerForQueueRecvThread::set_auto_response(bool v) +{ + set_auto_response_called_ = true; + auto_response_value_ = v; +} + +void MockRtmpServerForQueueRecvThread::set_merge_read(bool v, IMergeReadHandler *handler) +{ +} + +void MockRtmpServerForQueueRecvThread::set_recv_buffer(int buffer_size) +{ +} + +void MockRtmpServerForQueueRecvThread::reset() +{ + set_auto_response_called_ = false; + auto_response_value_ = true; +} + +// Test SrsQueueRecvThread basic queue operations +// This test covers the major use scenario: consume messages, check queue state, pump messages, and handle errors +VOID TEST(QueueRecvThreadTest, BasicQueueOperations) +{ + srs_error_t err; + + // Create mock RTMP server + SrsUniquePtr mock_rtmp(new MockRtmpServerForQueueRecvThread()); + + // Create SrsQueueRecvThread (without starting the actual recv thread) + SrsUniquePtr queue_thread(new SrsQueueRecvThread(NULL, mock_rtmp.get(), 5 * SRS_UTIME_SECONDS, SrsContextId())); + + // Test 1: Initially queue should be empty + EXPECT_TRUE(queue_thread->empty()); + EXPECT_EQ(0, queue_thread->size()); + EXPECT_FALSE(queue_thread->interrupted()); + + // Test 2: Consume first message + SrsRtmpCommonMessage *msg1 = new SrsRtmpCommonMessage(); + msg1->header_.message_type_ = RTMP_MSG_VideoMessage; + msg1->header_.payload_length_ = 10; + msg1->create_payload(10); + HELPER_EXPECT_SUCCESS(queue_thread->consume(msg1)); + + // Queue should have one message + EXPECT_FALSE(queue_thread->empty()); + EXPECT_EQ(1, queue_thread->size()); + EXPECT_TRUE(queue_thread->interrupted()); // interrupted() returns true when queue is not empty + + // Test 3: Consume second message + SrsRtmpCommonMessage *msg2 = new SrsRtmpCommonMessage(); + msg2->header_.message_type_ = RTMP_MSG_AudioMessage; + msg2->header_.payload_length_ = 20; + msg2->create_payload(20); + HELPER_EXPECT_SUCCESS(queue_thread->consume(msg2)); + + // Queue should have two messages + EXPECT_FALSE(queue_thread->empty()); + EXPECT_EQ(2, queue_thread->size()); + + // Test 4: Pump first message (FIFO order) + SrsRtmpCommonMessage *pumped_msg1 = queue_thread->pump(); + EXPECT_TRUE(pumped_msg1 != NULL); + EXPECT_EQ(RTMP_MSG_VideoMessage, pumped_msg1->header_.message_type_); + EXPECT_EQ(10, pumped_msg1->header_.payload_length_); + srs_freep(pumped_msg1); + + // Queue should have one message left + EXPECT_FALSE(queue_thread->empty()); + EXPECT_EQ(1, queue_thread->size()); + + // Test 5: Pump second message + SrsRtmpCommonMessage *pumped_msg2 = queue_thread->pump(); + EXPECT_TRUE(pumped_msg2 != NULL); + EXPECT_EQ(RTMP_MSG_AudioMessage, pumped_msg2->header_.message_type_); + EXPECT_EQ(20, pumped_msg2->header_.payload_length_); + srs_freep(pumped_msg2); + + // Queue should be empty again + EXPECT_TRUE(queue_thread->empty()); + EXPECT_EQ(0, queue_thread->size()); + EXPECT_FALSE(queue_thread->interrupted()); + + // Test 6: Test error_code() - initially should be success + err = queue_thread->error_code(); + HELPER_EXPECT_SUCCESS(err); + + // Test 7: Test interrupt() with error + srs_error_t test_error = srs_error_new(ERROR_SOCKET_READ, "test error"); + queue_thread->interrupt(test_error); + + // Error code should now return the error + err = queue_thread->error_code(); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_SOCKET_READ, srs_error_code(err)); + srs_freep(err); + + // Test 8: Test on_start() and on_stop() - verify set_auto_response is called + queue_thread->on_start(); + EXPECT_TRUE(mock_rtmp->set_auto_response_called_); + EXPECT_FALSE(mock_rtmp->auto_response_value_); // Should be set to false + + mock_rtmp->reset(); + queue_thread->on_stop(); + EXPECT_TRUE(mock_rtmp->set_auto_response_called_); + EXPECT_TRUE(mock_rtmp->auto_response_value_); // Should be set to true +} + +// Test SrsPublishRecvThread basic operations +// This test covers the major use scenario: wait(), nb_msgs(), nb_video_frames(), error_code(), set_cid(), get_cid() +VOID TEST(PublishRecvThreadTest, BasicOperations) +{ + srs_error_t err; + + // Create mock dependencies + SrsUniquePtr mock_rtmp(new MockRtmpServerForQueueRecvThread()); + SrsUniquePtr mock_req(new MockSrsRequest("__defaultVhost__", "live", "test_stream")); + SrsSharedPtr mock_source; // NULL is fine for this test + + // Create SrsPublishRecvThread (without starting the actual recv thread) + SrsUniquePtr publish_thread(new SrsPublishRecvThread( + mock_rtmp.get(), mock_req.get(), -1, 5 * SRS_UTIME_SECONDS, NULL, mock_source, SrsContextId())); + + // Test 1: Initially nb_msgs should be 0 + EXPECT_EQ(0, publish_thread->nb_msgs()); + + // Test 2: Initially nb_video_frames should be 0 + EXPECT_EQ(0, publish_thread->nb_video_frames()); + + // Test 3: Test error_code() - initially should be success + err = publish_thread->error_code(); + HELPER_EXPECT_SUCCESS(err); + + // Test 4: Test set_cid() and get_cid() + SrsContextId test_cid; + test_cid.set_value("test-context-id"); + publish_thread->set_cid(test_cid); + SrsContextId retrieved_cid = publish_thread->get_cid(); + EXPECT_STREQ(test_cid.c_str(), retrieved_cid.c_str()); + + // Test 5: Test wait() with timeout - should return success when no error + err = publish_thread->wait(100 * SRS_UTIME_MILLISECONDS); + HELPER_EXPECT_SUCCESS(err); + + // Test 6: Test consume() to increment message counters + SrsRtmpCommonMessage *video_msg = new SrsRtmpCommonMessage(); + video_msg->header_.message_type_ = RTMP_MSG_VideoMessage; + video_msg->header_.payload_length_ = 100; + video_msg->create_payload(100); + + // Note: consume() will try to call _conn->handle_publish_message() which will fail with NULL _conn + // So we expect this to fail, but we can still test the counter increment by checking nb_msgs + // For this basic test, we'll just verify the initial state and error handling + + // Test 7: Test interrupt() with error + srs_error_t test_error = srs_error_new(ERROR_SOCKET_READ, "test error"); + publish_thread->interrupt(test_error); + + // Error code should now return the error + err = publish_thread->error_code(); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_SOCKET_READ, srs_error_code(err)); + srs_freep(err); + + // Test 8: Test wait() after error - should return the error immediately + err = publish_thread->wait(100 * SRS_UTIME_MILLISECONDS); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_SOCKET_READ, srs_error_code(err)); + srs_freep(err); + + // Test 9: Test interrupted() - should always return false for publish recv thread + EXPECT_FALSE(publish_thread->interrupted()); + + // Clean up + srs_freep(video_msg); +} diff --git a/trunk/src/utest/srs_utest_app17.hpp b/trunk/src/utest/srs_utest_app17.hpp index 963dbf105..1a6cde7b0 100644 --- a/trunk/src/utest/srs_utest_app17.hpp +++ b/trunk/src/utest/srs_utest_app17.hpp @@ -647,4 +647,46 @@ public: virtual void expire(); }; +// Mock ISrsRtmpServer for testing SrsQueueRecvThread +class MockRtmpServerForQueueRecvThread : public ISrsRtmpServer +{ +public: + bool set_auto_response_called_; + bool auto_response_value_; + +public: + MockRtmpServerForQueueRecvThread(); + virtual ~MockRtmpServerForQueueRecvThread(); + +public: + virtual void set_recv_timeout(srs_utime_t tm); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_error_t handshake(); + virtual srs_error_t connect_app(ISrsRequest *req); + virtual uint32_t proxy_real_ip(); + virtual srs_error_t set_window_ack_size(int ack_size); + virtual srs_error_t set_peer_bandwidth(int bandwidth, int type); + virtual srs_error_t set_chunk_size(int chunk_size); + virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip); + virtual srs_error_t on_bw_done(); + virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration); + virtual srs_error_t start_play(int stream_id); + virtual srs_error_t start_fmle_publish(int stream_id); + virtual srs_error_t start_haivision_publish(int stream_id); + virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid); + virtual srs_error_t start_flash_publish(int stream_id); + virtual srs_error_t start_publishing(int stream_id); + virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted); + virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id); + virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); + virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id); + virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause); + virtual srs_error_t set_in_window_ack_size(int ack_size); + virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); + virtual void set_auto_response(bool v); + virtual void set_merge_read(bool v, IMergeReadHandler *handler); + virtual void set_recv_buffer(int buffer_size); + void reset(); +}; + #endif