From 8976ce4c8d1abaf0802fc818d09083680f8160c1 Mon Sep 17 00:00:00 2001 From: Winlin Date: Sat, 6 Sep 2025 08:10:49 -0400 Subject: [PATCH] AI: Support anonymous coroutine with code block. v7.0.80 (#4475) This PR introduces anonymous coroutine macros for easier coroutine creation and improves the State Threads (ST) mutex and condition variable handling in SRS. - **Added coroutine macros**: `SRS_COROUTINE_GO`, `SRS_COROUTINE_GO2`, `SRS_COROUTINE_GO_CTX`, `SRS_COROUTINE_GO_CTX2` - **Added `SrsCoroutineChan`**: Channel for sharing data between coroutines with coroutine-safe operations - **Simplified coroutine creation**: Go-like syntax for creating anonymous coroutines with code blocks --------- Co-authored-by: Jacob Su Co-authored-by: OSSRS-AI --- trunk/configure | 2 +- trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_async_call.cpp | 2 +- trunk/src/app/srs_app_rtc_conn.cpp | 12 +- trunk/src/app/srs_app_rtc_source.cpp | 4 +- trunk/src/app/srs_app_rtsp_source.cpp | 4 +- trunk/src/app/srs_app_source.cpp | 4 +- trunk/src/app/srs_app_srt_source.cpp | 4 +- trunk/src/app/srs_app_stream_token.cpp | 4 +- trunk/src/core/srs_core_version7.hpp | 2 +- trunk/src/protocol/srs_protocol_st.cpp | 68 ++++++- trunk/src/protocol/srs_protocol_st.hpp | 41 +++- trunk/src/utest/srs_utest.cpp | 38 ++++ trunk/src/utest/srs_utest.hpp | 143 ++++++++++++++ trunk/src/utest/srs_utest_st.cpp | 20 ++ trunk/src/utest/srs_utest_st2.cpp | 252 +++++++++++++++++++++++++ trunk/src/utest/srs_utest_st2.hpp | 14 ++ 17 files changed, 593 insertions(+), 22 deletions(-) create mode 100644 trunk/src/utest/srs_utest_st2.cpp create mode 100644 trunk/src/utest/srs_utest_st2.hpp diff --git a/trunk/configure b/trunk/configure index de78bd3c8..bb8c7dc37 100755 --- a/trunk/configure +++ b/trunk/configure @@ -378,7 +378,7 @@ if [[ $SRS_UTEST == YES ]]; then "srs_utest_mp4" "srs_utest_service" "srs_utest_app" "srs_utest_rtc" "srs_utest_config2" "srs_utest_protocol" "srs_utest_protocol2" "srs_utest_kernel2" "srs_utest_protocol3" "srs_utest_st" "srs_utest_rtc2" "srs_utest_rtc3" "srs_utest_fmp4" "srs_utest_source_lock" - "srs_utest_stream_token" "srs_utest_rtc_recv_track") + "srs_utest_stream_token" "srs_utest_rtc_recv_track" "srs_utest_st2") # Always include SRT utest MODULE_FILES+=("srs_utest_srt") if [[ $SRS_GB28181 == YES ]]; then diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index f0048ec23..fadeaca91 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-09-06, Merge [#4475](https://github.com/ossrs/srs/pull/4475): AI: Support anonymous coroutine with code block. v7.0.80 (#4475) * v7.0, 2025-09-05, Merge [#4474](https://github.com/ossrs/srs/pull/4474): WebRTC: Fix race condition in RTC nack timer callbacks. v7.0.79 (#4474) * v7.0, 2025-09-04, Merge [#4467](https://github.com/ossrs/srs/pull/4467): WebRTC: Fix NACK recovered packets not being added to receive queue. v7.0.78 (#4467) * v7.0, 2025-09-03, Merge [#4469](https://github.com/ossrs/srs/pull/4469): Upgrade HTTP parser from http-parser to llhttp. v7.0.77 (#4469) diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index c9a08b93e..c5d225a81 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -103,7 +103,7 @@ void SrsAsyncCallWorker::flush_tasks() // Avoid the async call blocking other coroutines. std::vector copy; if (true) { - SrsLocker(lock); + SrsLocker(&lock); if (tasks.empty()) { return; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 144392688..aeebff5fd 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -935,7 +935,7 @@ SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream *p) : p_(p) SrsRtcPublishRtcpTimer::~SrsRtcPublishRtcpTimer() { if (true) { - SrsLocker(lock_); + SrsLocker(&lock_); _srs_shared_timer->timer1s()->unsubscribe(this); } srs_mutex_destroy(lock_); @@ -949,7 +949,7 @@ srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval) // Therefore, during this function, the 'this' pointer might become invalid because // the object could be freed by another thread. As a result, we must lock the object // to prevent it from being freed. - SrsLocker(lock_); + SrsLocker(&lock_); ++_srs_pps_pub->sugar; @@ -982,7 +982,7 @@ SrsRtcPublishTwccTimer::SrsRtcPublishTwccTimer(SrsRtcPublishStream *p) : p_(p) SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer() { if (true) { - SrsLocker(lock_); + SrsLocker(&lock_); _srs_shared_timer->timer100ms()->unsubscribe(this); } srs_mutex_destroy(lock_); @@ -996,7 +996,7 @@ srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval) // Therefore, during this function, the 'this' pointer might become invalid because // the object could be freed by another thread. As a result, we must lock the object // to prevent it from being freed. - SrsLocker(lock_); + SrsLocker(&lock_); ++_srs_pps_pub->sugar; @@ -1752,7 +1752,7 @@ SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection *p) : p_(p SrsRtcConnectionNackTimer::~SrsRtcConnectionNackTimer() { if (true) { - SrsLocker(lock_); + SrsLocker(&lock_); _srs_shared_timer->timer20ms()->unsubscribe(this); } srs_mutex_destroy(lock_); @@ -1766,7 +1766,7 @@ srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval) // Therefore, during this function, the 'this' pointer might become invalid because // the object could be freed by another thread. As a result, we must lock the object // to prevent it from being freed. - SrsLocker(lock_); + SrsLocker(&lock_); if (!p_->nack_enabled_) { return err; diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 6136a0f91..b44e79217 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -302,7 +302,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtrget_stream_url(); std::map >::iterator it = pool.find(stream_url); @@ -339,7 +339,7 @@ SrsSharedPtr SrsRtcSourceManager::fetch(ISrsRequest *r) { // Use lock to protect coroutine switch. // @bug https://github.com/ossrs/srs/issues/1230 - SrsLocker(lock); + SrsLocker(&lock); string stream_url = r->get_stream_url(); std::map >::iterator it = pool.find(stream_url); diff --git a/trunk/src/app/srs_app_rtsp_source.cpp b/trunk/src/app/srs_app_rtsp_source.cpp index 22ed3478c..21eb188ae 100644 --- a/trunk/src/app/srs_app_rtsp_source.cpp +++ b/trunk/src/app/srs_app_rtsp_source.cpp @@ -176,7 +176,7 @@ srs_error_t SrsRtspSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtrget_stream_url(); std::map >::iterator it = pool.find(stream_url); @@ -213,7 +213,7 @@ SrsSharedPtr SrsRtspSourceManager::fetch(ISrsRequest *r) { // Use lock to protect coroutine switch. // @bug https://github.com/ossrs/srs/issues/1230 - SrsLocker(lock); + SrsLocker(&lock); string stream_url = r->get_stream_url(); std::map >::iterator it = pool.find(stream_url); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index df6d728b1..c143d73a8 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1579,7 +1579,7 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtrget_stream_url(); std::map >::iterator it = pool.find(stream_url); @@ -1620,7 +1620,7 @@ SrsSharedPtr SrsLiveSourceManager::fetch(ISrsRequest *r) // Use lock to protect coroutine switch. // @bug https://github.com/ossrs/srs/issues/1230 // TODO: FIXME: Use smaller scope lock. - SrsLocker(lock); + SrsLocker(&lock); string stream_url = r->get_stream_url(); std::map >::iterator it = pool.find(stream_url); diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index 6f0b1e612..98dc3bf46 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -160,7 +160,7 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtrget_stream_url(); std::map >::iterator it = pool.find(stream_url); @@ -196,7 +196,7 @@ SrsSharedPtr SrsSrtSourceManager::fetch(ISrsRequest *r) { // Use lock to protect coroutine switch. // @bug https://github.com/ossrs/srs/issues/1230 - SrsLocker(lock); + SrsLocker(&lock); string stream_url = r->get_stream_url(); std::map >::iterator it = pool.find(stream_url); diff --git a/trunk/src/app/srs_app_stream_token.cpp b/trunk/src/app/srs_app_stream_token.cpp index 6650750bf..2c0b2588e 100644 --- a/trunk/src/app/srs_app_stream_token.cpp +++ b/trunk/src/app/srs_app_stream_token.cpp @@ -84,7 +84,7 @@ srs_error_t SrsStreamPublishTokenManager::acquire_token(ISrsRequest *req, SrsStr std::string stream_url = req->get_stream_url(); SrsContextId current_cid = _srs_context->get_id(); - SrsLocker(mutex_); + SrsLocker(&mutex_); // Get or create token for this stream SrsStreamPublishToken *stream_token = NULL; @@ -116,7 +116,7 @@ srs_error_t SrsStreamPublishTokenManager::acquire_token(ISrsRequest *req, SrsStr void SrsStreamPublishTokenManager::release_token(const std::string &stream_url) { - SrsLocker(mutex_); + SrsLocker(&mutex_); // Find and erase the token from the map std::map::iterator it = tokens_.find(stream_url); diff --git a/trunk/src/core/srs_core_version7.hpp b/trunk/src/core/srs_core_version7.hpp index d03d1a01f..d434e9bad 100644 --- a/trunk/src/core/srs_core_version7.hpp +++ b/trunk/src/core/srs_core_version7.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 7 #define VERSION_MINOR 0 -#define VERSION_REVISION 79 +#define VERSION_REVISION 80 #endif \ No newline at end of file diff --git a/trunk/src/protocol/srs_protocol_st.cpp b/trunk/src/protocol/srs_protocol_st.cpp index 6d741ea06..08f736b4f 100644 --- a/trunk/src/protocol/srs_protocol_st.cpp +++ b/trunk/src/protocol/srs_protocol_st.cpp @@ -392,7 +392,13 @@ srs_cond_t srs_cond_new() int srs_cond_destroy(srs_cond_t cond) { - return st_cond_destroy((st_cond_t)cond); + if (!cond) { + return 0; + } + + int r0 = st_cond_destroy((st_cond_t)cond); + srs_assert(r0 == 0); + return r0; } int srs_cond_wait(srs_cond_t cond) @@ -425,7 +431,10 @@ int srs_mutex_destroy(srs_mutex_t mutex) if (!mutex) { return 0; } - return st_mutex_destroy((st_mutex_t)mutex); + + int r0 = st_mutex_destroy((st_mutex_t)mutex); + srs_assert(r0 == 0); + return r0; } int srs_mutex_lock(srs_mutex_t mutex) @@ -438,6 +447,61 @@ int srs_mutex_unlock(srs_mutex_t mutex) return st_mutex_unlock((st_mutex_t)mutex); } +SrsCond::SrsCond() +{ + cond_ = srs_cond_new(); +} + +SrsCond::~SrsCond() +{ + srs_cond_destroy(cond_); +} + +int SrsCond::wait() +{ + return srs_cond_wait(cond_); +} + +int SrsCond::timedwait(srs_utime_t timeout) +{ + return srs_cond_timedwait(cond_, timeout); +} + +int SrsCond::signal() +{ + return srs_cond_signal(cond_); +} + +int SrsCond::broadcast() +{ + return srs_cond_broadcast(cond_); +} + +SrsMutex::SrsMutex() +{ + mutex_ = srs_mutex_new(); +} + +SrsMutex::~SrsMutex() +{ + srs_mutex_destroy(mutex_); +} + +int SrsMutex::lock() +{ + return srs_mutex_lock(mutex_); +} + +int SrsMutex::unlock() +{ + return srs_mutex_unlock(mutex_); +} + +srs_mutex_t *SrsMutex::get() +{ + return &mutex_; +} + int srs_key_create(int *keyp, void (*destructor)(void *)) { return st_key_create(keyp, destructor); diff --git a/trunk/src/protocol/srs_protocol_st.hpp b/trunk/src/protocol/srs_protocol_st.hpp index 6888f84e1..162fb3d2e 100644 --- a/trunk/src/protocol/srs_protocol_st.hpp +++ b/trunk/src/protocol/srs_protocol_st.hpp @@ -84,6 +84,45 @@ extern int srs_mutex_destroy(srs_mutex_t mutex); extern int srs_mutex_lock(srs_mutex_t mutex); extern int srs_mutex_unlock(srs_mutex_t mutex); +// Wrap as ptr, so you can use SrsUniquePtr and SrsSharedPtr to manage it. +// For example: +// SrsUniquePtr cond(new SrsCond()); +// cond->signal(); +class SrsCond +{ +private: + srs_cond_t cond_; + +public: + SrsCond(); + virtual ~SrsCond(); + +public: + int wait(); + int timedwait(srs_utime_t timeout); + int signal(); + int broadcast(); +}; + +// Wrap as ptr, so you can use SrsUniquePtr and SrsSharedPtr to manage it. +// For example: +// SrsUniquePtr mutex(new SrsMutex()); +// SrsLocker(mutex->get()); +class SrsMutex +{ +private: + srs_mutex_t mutex_; + +public: + SrsMutex(); + virtual ~SrsMutex(); + +public: + int lock(); + int unlock(); + srs_mutex_t *get(); +}; + extern int srs_key_create(int *keyp, void (*destructor)(void *)); extern int srs_thread_setspecific(int key, void *value); extern int srs_thread_setspecific2(srs_thread_t thread, int key, void *value); @@ -109,7 +148,7 @@ extern bool srs_is_never_timeout(srs_utime_t tm); // The mutex locker. #define SrsLocker(instance) \ - impl__SrsLocker _SRS_free_##instance(&instance) + impl__SrsLocker _SRS_free_instance(instance) class impl__SrsLocker { diff --git a/trunk/src/utest/srs_utest.cpp b/trunk/src/utest/srs_utest.cpp index 3f66bca55..4850946f4 100644 --- a/trunk/src/utest/srs_utest.cpp +++ b/trunk/src/utest/srs_utest.cpp @@ -10,8 +10,10 @@ #include #include #include +#include #include #include +#include #include using namespace std; @@ -253,3 +255,39 @@ int MockProtectedBuffer::alloc(int size) return 0; } + +SrsCoroutineChan::SrsCoroutineChan() +{ + lock_ = srs_mutex_new(); +} + +SrsCoroutineChan::~SrsCoroutineChan() +{ + srs_mutex_destroy(lock_); +} + +SrsCoroutineChan &SrsCoroutineChan::push(void *value) +{ + SrsLocker(&lock_); + + args_.push_back(value); + return *this; +} + +void *SrsCoroutineChan::pop() +{ + SrsLocker(&lock_); + + void *arg = *args_.begin(); + args_.erase(args_.begin()); + return arg; +} + +SrsCoroutineChan *SrsCoroutineChan::copy() +{ + SrsLocker(&lock_); + + SrsCoroutineChan *cp = new SrsCoroutineChan(); + cp->args_ = args_; + return cp; +} diff --git a/trunk/src/utest/srs_utest.hpp b/trunk/src/utest/srs_utest.hpp index b9af5fe10..dfb9fa1ed 100644 --- a/trunk/src/utest/srs_utest.hpp +++ b/trunk/src/utest/srs_utest.hpp @@ -131,4 +131,147 @@ public: int alloc(int size); }; +// The chan for anonymous coroutine to share variables. +// The chan never free the args, you must manage the memory. +class SrsCoroutineChan +{ +private: + std::vector args_; + srs_mutex_t lock_; + +public: + SrsCoroutineChan(); + virtual ~SrsCoroutineChan(); + +public: + SrsCoroutineChan &push(void *value); + void *pop(); + SrsCoroutineChan *copy(); +}; + +// A helper to create a anonymous coroutine like goroutine in Go. +// * The context is used to share variables between coroutines. +// * The id is used to identify the coroutine. +// * The code_block is the code to run in the coroutine. +// +// The correct way is to avoid the block, unless you intend to do it, +// so you should create in the same scope, and use id to distinguish them. +// For example: +// SrsCoroutineChan ctx; +// +// SRS_COROUTINE_GO_IMPL(&ctx, coroutine1, { +// srs_usleep(1000 * SRS_UTIME_MILLISECONDS); +// }); +// +// SRS_COROUTINE_GO_IMPL(&ctx, coroutine2, { +// srs_usleep(1000 * SRS_UTIME_MILLISECONDS); +// }); +// +// // It won't wait for the coroutine to terminate. +// // So you will expect to run to here immediately. +// +// CAUTION: Note that if use a block to run the coroutine, it will +// stop and wait for the coroutine to terminate. So it maybe crash +// for the current thread is interrupted and stopping, such as the +// ctx.pop() will crash for requiring a lock on a stopping thread. +// For example: +// SrsCoroutineChan ctx; +// +// // Generally we SHOULD NOT do this, unless you intend to. +// if (true) { +// SRS_COROUTINE_GO_IMPL(&ctx, coroutine, { +// srs_usleep(1000 * SRS_UTIME_MILLISECONDS); +// }); +// } +// if (true) { +// SRS_COROUTINE_GO_IMPL(&ctx, coroutine, { +// srs_usleep(1000 * SRS_UTIME_MILLISECONDS); +// }); +// } +// +// // The coroutine will be stopped and wait for it to terminate. +// // So maybe it won't execute all your code there. +// +// Enjoiy the sugar for coroutines. +#define SRS_COROUTINE_GO_IMPL(context, id, code_block) \ + class AnonymousCoroutineHandler##id : public ISrsCoroutineHandler \ + { \ + private: \ + SrsCoroutineChan *ctx_; \ + \ + public: \ + AnonymousCoroutineHandler##id(SrsCoroutineChan *c) \ + { \ + /* Copy the context so that we can pop it in different coroutines. */ \ + ctx_ = c->copy(); \ + } \ + ~AnonymousCoroutineHandler##id() \ + { \ + srs_freep(ctx_); \ + } \ + \ + public: \ + virtual srs_error_t cycle() \ + { \ + SrsCoroutineChan &ctx = *ctx_; \ + (void)ctx; \ + code_block; \ + return srs_success; \ + } \ + }; \ + AnonymousCoroutineHandler##id handler##id(context); \ + SrsSTCoroutine st##id("anonymous", &handler##id); \ + srs_error_t err_coroutine##id = st##id.start(); \ + srs_assert(err_coroutine##id == srs_success) + +// A helper to create a anonymous coroutine like goroutine in Go. +// For example: +// SRS_COROUTINE_GO({ +// srs_usleep(1 * SRS_UTIME_MILLISECONDS); +// }); +#define SRS_COROUTINE_GO(code_block) \ + SrsCoroutineChan context##id; \ + SRS_COROUTINE_GO_IMPL(&context##id, coroutine0, code_block) + +// A helper to create a anonymous coroutine like goroutine in Go. +// With the id, it allows you to create multiple coroutines. +// For example: +// SRS_COROUTINE_GO2(coroutine1, { +// srs_usleep(1 * SRS_UTIME_MILLISECONDS); +// }); +// SRS_COROUTINE_GO2(coroutine2, { +// srs_usleep(1 * SRS_UTIME_MILLISECONDS); +// }); +#define SRS_COROUTINE_GO2(id, code_block) \ + SrsCoroutineChan context##id; \ + SRS_COROUTINE_GO_IMPL(&context##id, id, code_block) + +// A helper to create a anonymous coroutine like goroutine in Go. +// With the context, it allows you to share variables between coroutines. +// For example: +// SrsCoroutineChan ctx; +// ctx.push(1); +// SRS_COROUTINE_GO_CTX(ctx, { +// int v = (int)ctx.pop(); +// srs_usleep(v * SRS_UTIME_MILLISECONDS); +// }); +#define SRS_COROUTINE_GO_CTX(ctx, code_block) \ + SRS_COROUTINE_GO_IMPL(ctx, coroutine0, code_block) + +// A helper to create a anonymous coroutine like goroutine in Go. +// With the context and id, it allows you to create multiple coroutines. +// For example: +// SrsCoroutineChan ctx; +// ctx.push(1); +// SRS_COROUTINE_GO_CTX2(ctx, coroutine1, { +// int v = (int)ctx.pop(); +// srs_usleep(v * SRS_UTIME_MILLISECONDS); +// }); +// SRS_COROUTINE_GO_CTX2(ctx, coroutine2, { +// int v = (int)ctx.pop(); +// srs_usleep(v * SRS_UTIME_MILLISECONDS); +// }); +#define SRS_COROUTINE_GO_CTX2(ctx, id, code_block) \ + SRS_COROUTINE_GO_IMPL(ctx, id, code_block) + #endif diff --git a/trunk/src/utest/srs_utest_st.cpp b/trunk/src/utest/srs_utest_st.cpp index 57a6b3621..7e6bd1bda 100644 --- a/trunk/src/utest/srs_utest_st.cpp +++ b/trunk/src/utest/srs_utest_st.cpp @@ -14,6 +14,26 @@ using namespace std; +VOID TEST(StTest, CondPtrSugar) +{ + SrsUniquePtr cond(new SrsCond()); + cond->signal(); +} + +VOID TEST(StTest, MutexPtrSugar) +{ + if (true) { + SrsUniquePtr mutex(new SrsMutex()); + SrsLocker(mutex->get()); + } + + if (true) { + SrsUniquePtr mutex(new SrsMutex()); + mutex->lock(); + mutex->unlock(); + } +} + VOID TEST(StTest, StUtimeInMicroseconds) { st_utime_t st_time_1 = st_utime(); diff --git a/trunk/src/utest/srs_utest_st2.cpp b/trunk/src/utest/srs_utest_st2.cpp new file mode 100644 index 000000000..bc25e89e7 --- /dev/null +++ b/trunk/src/utest/srs_utest_st2.cpp @@ -0,0 +1,252 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace std; + +VOID TEST(StTest, AnonymouseSingleCoroutine) +{ + SRS_COROUTINE_GO({ + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + }); + + // Wait for coroutine to terminate. Otherwise, it will be stopped + // and terminated, which cause some of the code not executed. + srs_usleep(50 * SRS_UTIME_MILLISECONDS); +} + +VOID TEST(StTest, AnonymouseMultipleCoroutines) +{ + SRS_COROUTINE_GO2(coroutine1, { + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + }); + + // If multiple coroutines in the same scope, we should use different id. + SRS_COROUTINE_GO2(coroutine2, { + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + }); + + // Wait for coroutine to terminate. Otherwise, it will be stopped + // and terminated, which cause some of the code not executed. + srs_usleep(50 * SRS_UTIME_MILLISECONDS); +} + +VOID TEST(StTest, AnonymouseCoroutineWithContext) +{ + int counter = 0; + + SrsCoroutineChan ctx; + ctx.push(&counter); + + SRS_COROUTINE_GO_CTX(&ctx, { + int *counter = (int *)ctx.pop(); + (*counter)++; + }); + + // Coroutine not terminated, so the counter is not increased. + EXPECT_TRUE(counter == 0); + + // Wait for coroutine to run and terminated, or it will crash + // because the ctx.pop is called after coroutine terminated. + srs_usleep(50 * SRS_UTIME_MILLISECONDS); + EXPECT_TRUE(counter == 1); + + // Wait for coroutine to terminate. Otherwise, it will be stopped + // and terminated, which cause some of the code not executed. + srs_usleep(50 * SRS_UTIME_MILLISECONDS); +} + +VOID TEST(StTest, AnonymouseCoroutineWithSync) +{ + SrsUniquePtr cond(new SrsCond()); + int counter = 0; + + SrsCoroutineChan ctx; + ctx.push(cond.get()); + ctx.push(&counter); + + SRS_COROUTINE_GO_CTX(&ctx, { + SrsCond *cond = (SrsCond *)ctx.pop(); + int *counter = (int *)ctx.pop(); + + (*counter)++; + + // Notify main thread the work is done. + cond->signal(); + }); + + // The coroutine not terminated, so the counter is not increased. + EXPECT_TRUE(counter == 0); + + // Wait for the coroutine to terminate. The counter is increased. + cond->wait(); + EXPECT_TRUE(counter == 1); +} + +VOID TEST(StTest, AnonymouseCoroutineWithWaitgroup) +{ + SrsWaitGroup wg; + int counter = 0; + + SrsCoroutineChan ctx; + ctx.push(&wg); + ctx.push(&counter); + + wg.add(1); + SRS_COROUTINE_GO_CTX(&ctx, { + SrsWaitGroup *wg = (SrsWaitGroup *)ctx.pop(); + int *counter = (int *)ctx.pop(); + + (*counter)++; + + // Notify main thread the work is done. + wg->done(); + }); + + // The coroutine not terminated, so the counter is not increased. + EXPECT_TRUE(counter == 0); + + // Wait for the coroutine to terminate. The counter is increased. + wg.wait(); + EXPECT_TRUE(counter == 1); +} + +VOID TEST(StTest, AnonymouseCoroutineWithWaitgroups) +{ + SrsWaitGroup wg; + int counter = 0; + + SrsCoroutineChan ctx; + ctx.push(&wg); + ctx.push(&counter); + + wg.add(1); + SRS_COROUTINE_GO_CTX2(&ctx, coroutine1, { + // The ctx is copied, so we can pop it again in different coroutines. + SrsWaitGroup *wg = (SrsWaitGroup *)ctx.pop(); + int *counter = (int *)ctx.pop(); + + (*counter)++; + + // Notify main thread the work is done. + wg->done(); + }); + + wg.add(1); + SRS_COROUTINE_GO_CTX2(&ctx, coroutine2, { + // The ctx is copied, so we can pop it again in different coroutines. + SrsWaitGroup *wg = (SrsWaitGroup *)ctx.pop(); + int *counter = (int *)ctx.pop(); + + (*counter)++; + + // Notify main thread the work is done. + wg->done(); + }); + + // The coroutine not terminated, so the counter is not increased. + EXPECT_TRUE(counter == 0); + + // Wait for the coroutine to terminate. The counter is increased. + wg.wait(); + EXPECT_TRUE(counter == 2); +} + +VOID TEST(StTest, VerifyUsingRawCoroutine) +{ + srs_error_t err; + + class NormalThread : public ISrsCoroutineHandler + { + public: + virtual srs_error_t cycle() + { + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + return srs_success; + } + }; + + NormalThread trd; + SrsSTCoroutine st("test", &trd); + HELPER_ASSERT_SUCCESS(st.start()); +} + +VOID TEST(StTest, VerifyMultipleAnonymousClasses) +{ + do { + class AnonymousCoroutineHandler + { + }; + } while (0); + + do { + class AnonymousCoroutineHandler + { + }; + } while (0); + + SrsUniquePtr cond(new SrsCond()); + cond->signal(); + + SrsUniquePtr mutex(new SrsMutex()); + SrsLocker(mutex->get()); +} + +// CAUTION: Badcase, you should not follow this style. +VOID TEST(StTest, AnonymouseBadcase) +{ + // Generally we SHOULD NOT do this, unless you intend to. + if (true) { + SRS_COROUTINE_GO({ + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + }); + } + + // CAUTION: If multiple coroutines in the different scope, it's ok without id, + // but it's not recommended, becuase it will be stopped and your code + // maybe not executed. + // Generally we SHOULD NOT do this, unless you intend to. + if (true) { + SRS_COROUTINE_GO({ + srs_usleep(1 * SRS_UTIME_MILLISECONDS); + }); + } +} + +// CAUTION: Badcase, you should not follow this style. +VOID TEST(StTest, AnonymouseBadcase2) +{ + int counter = 0; + + SrsCoroutineChan ctx; + ctx.push(&counter); + + // Generally we SHOULD NOT do this, unless you intend to. + if (true) { + SRS_COROUTINE_GO_CTX(&ctx, { + int *counter = (int *)ctx.pop(); + (*counter)++; + }); + + // Wait for coroutine to terminate. Otherwise, it will crash, for the + // coroutine is terminated while ctx.pop(), the lock is invalid. + srs_usleep(100 * SRS_UTIME_MILLISECONDS); + } + + // Coroutine terminated, so the counter is increased. + EXPECT_TRUE(counter == 1); +} diff --git a/trunk/src/utest/srs_utest_st2.hpp b/trunk/src/utest/srs_utest_st2.hpp new file mode 100644 index 000000000..39d20604b --- /dev/null +++ b/trunk/src/utest/srs_utest_st2.hpp @@ -0,0 +1,14 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#ifndef SRS_UTEST_ST2_HPP +#define SRS_UTEST_ST2_HPP + +#include + +#include + +#endif // SRS_UTEST_ST2_HPP