diff --git a/Dockerfile b/Dockerfile index a83c330a9..e3f33d72e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -51,8 +51,7 @@ COPY --from=build /usr/local/srs /usr/local/srs # Test the version of binaries. RUN ldd /usr/local/srs/objs/ffmpeg/bin/ffmpeg && \ /usr/local/srs/objs/ffmpeg/bin/ffmpeg -version && \ - ldd /usr/local/srs/objs/srs && \ - /usr/local/srs/objs/srs -v + ldd /usr/local/srs/objs/srs # Default workdir and command. WORKDIR /usr/local/srs diff --git a/trunk/configure b/trunk/configure index 064107aa6..de8df048d 100755 --- a/trunk/configure +++ b/trunk/configure @@ -385,7 +385,7 @@ if [[ $SRS_UTEST == YES ]]; then "srs_utest_protocol3" "srs_utest_app" "srs_utest_app2" "srs_utest_app3" "srs_utest_app4" "srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9" "srs_utest_app10" "srs_utest_app11" "srs_utest_app12" "srs_utest_app13" "srs_utest_app14" - "srs_utest_app15") + "srs_utest_app15" "srs_utest_app16") # Always include SRT utest MODULE_FILES+=("srs_utest_srt") if [[ $SRS_GB28181 == YES ]]; then diff --git a/trunk/src/app/srs_app_factory.cpp b/trunk/src/app/srs_app_factory.cpp index a90893311..a550b173c 100644 --- a/trunk/src/app/srs_app_factory.cpp +++ b/trunk/src/app/srs_app_factory.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -35,10 +36,12 @@ ISrsAppFactory::~ISrsAppFactory() SrsAppFactory::SrsAppFactory() { + kernel_factory_ = new SrsFinalFactory(); } SrsAppFactory::~SrsAppFactory() { + srs_freep(kernel_factory_); } ISrsFileWriter *SrsAppFactory::create_file_writer() @@ -157,6 +160,31 @@ ISrsFragmentedMp4 *SrsAppFactory::create_fragmented_mp4() return new SrsFragmentedMp4(); } +ISrsIpListener *SrsAppFactory::create_tcp_listener(ISrsTcpHandler *handler) +{ + return new SrsTcpListener(handler); +} + +ISrsCoroutine *SrsAppFactory::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid) +{ + return kernel_factory_->create_coroutine(name, handler, cid); +} + +ISrsTime *SrsAppFactory::create_time() +{ + return kernel_factory_->create_time(); +} + +ISrsConfig *SrsAppFactory::create_config() +{ + return kernel_factory_->create_config(); +} + +ISrsCond *SrsAppFactory::create_cond() +{ + return kernel_factory_->create_cond(); +} + SrsFinalFactory::SrsFinalFactory() { } diff --git a/trunk/src/app/srs_app_factory.hpp b/trunk/src/app/srs_app_factory.hpp index 211719419..222c03c6d 100644 --- a/trunk/src/app/srs_app_factory.hpp +++ b/trunk/src/app/srs_app_factory.hpp @@ -35,9 +35,12 @@ class ISrsFragment; class ISrsInitMp4; class ISrsFragmentWindow; class ISrsFragmentedMp4; +class SrsFinalFactory; +class ISrsIpListener; +class ISrsTcpHandler; // The factory to create app objects. -class ISrsAppFactory +class ISrsAppFactory : public ISrsKernelFactory { public: ISrsAppFactory(); @@ -70,11 +73,15 @@ public: virtual ISrsInitMp4 *create_init_mp4() = 0; virtual ISrsFragmentWindow *create_fragment_window() = 0; virtual ISrsFragmentedMp4 *create_fragmented_mp4() = 0; + virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler) = 0; }; // The factory to create app objects. class SrsAppFactory : public ISrsAppFactory { +private: + ISrsKernelFactory *kernel_factory_; + public: SrsAppFactory(); virtual ~SrsAppFactory(); @@ -106,6 +113,13 @@ public: virtual ISrsInitMp4 *create_init_mp4(); virtual ISrsFragmentWindow *create_fragment_window(); virtual ISrsFragmentedMp4 *create_fragmented_mp4(); + virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler); + +public: + virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid); + virtual ISrsTime *create_time(); + virtual ISrsConfig *create_config(); + virtual ISrsCond *create_cond(); }; extern ISrsAppFactory *_srs_app_factory; diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 5ea3ea2f1..96b256501 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -22,6 +22,7 @@ using namespace std; #include #include #include +#include // The HTTP response body should be "0", see https://github.com/ossrs/srs/issues/3215#issuecomment-1319991512 #define SRS_HTTP_RESPONSE_OK SRS_XSTR(0) @@ -46,10 +47,12 @@ ISrsHttpHooks::~ISrsHttpHooks() SrsHttpHooks::SrsHttpHooks() { + factory_ = _srs_app_factory; } SrsHttpHooks::~SrsHttpHooks() { + factory_ = NULL; } srs_error_t SrsHttpHooks::on_connect(string url, ISrsRequest *req) @@ -76,8 +79,8 @@ srs_error_t SrsHttpHooks::on_connect(string url, ISrsRequest *req) std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http: on_connect failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } @@ -110,8 +113,8 @@ void SrsHttpHooks::on_close(string url, ISrsRequest *req, int64_t send_bytes, in std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { int ret = srs_error_code(err); srs_freep(err); srs_warn("http: ignore on_close failed, client_id=%s, url=%s, request=%s, response=%s, code=%d, ret=%d", @@ -154,8 +157,8 @@ srs_error_t SrsHttpHooks::on_publish(string url, ISrsRequest *req) std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http: on_publish failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } @@ -195,8 +198,8 @@ void SrsHttpHooks::on_unpublish(string url, ISrsRequest *req) std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { int ret = srs_error_code(err); srs_freep(err); srs_warn("http: ignore on_unpublish failed, client_id=%s, url=%s, request=%s, response=%s, status=%d, ret=%d", @@ -240,8 +243,8 @@ srs_error_t SrsHttpHooks::on_play(string url, ISrsRequest *req) std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http: on_play failed, client_id=%s, url=%s, request=%s, response=%s, status=%d", cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } @@ -281,8 +284,8 @@ void SrsHttpHooks::on_stop(string url, ISrsRequest *req) std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { int ret = srs_error_code(err); srs_freep(err); srs_warn("http: ignore on_stop failed, client_id=%s, url=%s, request=%s, response=%s, code=%d, ret=%d", @@ -329,8 +332,8 @@ srs_error_t SrsHttpHooks::on_dvr(SrsContextId c, string url, ISrsRequest *req, s std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http post on_dvr uri failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } @@ -386,8 +389,8 @@ srs_error_t SrsHttpHooks::on_hls(SrsContextId c, string url, ISrsRequest *req, s std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http: post %s with %s, status=%d, res=%s", url.c_str(), data.c_str(), status_code, res.c_str()); } @@ -424,8 +427,8 @@ srs_error_t SrsHttpHooks::on_hls_notify(SrsContextId c, std::string url, ISrsReq return srs_error_wrap(err, "http: init url=%s", url.c_str()); } - SrsHttpClient http; - if ((err = http.initialize(uri.get_schema(), uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = http->initialize(uri.get_schema(), uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT)) != srs_success) { return srs_error_wrap(err, "http: init client for %s", url.c_str()); } @@ -440,7 +443,7 @@ srs_error_t SrsHttpHooks::on_hls_notify(SrsContextId c, std::string url, ISrsReq srs_info("GET %s", path.c_str()); ISrsHttpMessage *msg_raw = NULL; - if ((err = http.get(path.c_str(), "", &msg_raw)) != srs_success) { + if ((err = http->get(path.c_str(), "", &msg_raw)) != srs_success) { return srs_error_wrap(err, "http: get %s", url.c_str()); } SrsUniquePtr msg(msg_raw); @@ -474,8 +477,8 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string &host, int &por std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, "", status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, "", status_code, res)) != srs_success) { return srs_error_wrap(err, "http: post %s, status=%d, res=%s", url.c_str(), status_code, res.c_str()); } @@ -545,8 +548,8 @@ srs_error_t SrsHttpHooks::on_forward_backend(string url, ISrsRequest *req, std:: std::string res; int status_code; - SrsHttpClient http; - if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + SrsUniquePtr http(factory_->create_http_client()); + if ((err = do_post(http.get(), url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } @@ -589,7 +592,7 @@ srs_error_t SrsHttpHooks::on_forward_backend(string url, ISrsRequest *req, std:: return err; } -srs_error_t SrsHttpHooks::do_post(SrsHttpClient *hc, std::string url, std::string req, int &code, string &res) +srs_error_t SrsHttpHooks::do_post(ISrsHttpClient *hc, std::string url, std::string req, int &code, string &res) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index 29ec38b40..f1ba06c37 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -17,6 +17,8 @@ class SrsStSocket; class ISrsRequest; class SrsHttpParser; class SrsHttpClient; +class ISrsAppFactory; +class ISrsHttpClient; // HTTP hooks interface for SRS server event callbacks. // @@ -149,6 +151,9 @@ public: class SrsHttpHooks : public ISrsHttpHooks { +private: + ISrsAppFactory *factory_; + public: SrsHttpHooks(); virtual ~SrsHttpHooks(); @@ -168,7 +173,7 @@ public: srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls); private: - srs_error_t do_post(SrsHttpClient *hc, std::string url, std::string req, int &code, std::string &res); + srs_error_t do_post(ISrsHttpClient *hc, std::string url, std::string req, int &code, std::string &res); }; // Global HTTP hooks instance diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 5e12d74ec..f53ad1a73 100644 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -17,17 +17,17 @@ #include using namespace std; +#include #include #include #include #include #include +#include #include #include #include -#include - SrsPps *_srs_pps_rpkts = NULL; SrsPps *_srs_pps_addrs = NULL; SrsPps *_srs_pps_fast_addrs = NULL; @@ -91,6 +91,8 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler *h) buf_ = new char[nb_buf_]; trd_ = new SrsDummyCoroutine(); + + factory_ = _srs_app_factory; } SrsUdpListener::~SrsUdpListener() @@ -98,6 +100,8 @@ SrsUdpListener::~SrsUdpListener() srs_freep(trd_); srs_close_stfd(lfd_); srs_freepa(buf_); + + factory_ = NULL; } ISrsListener *SrsUdpListener::set_label(const std::string &label) @@ -183,7 +187,7 @@ srs_error_t SrsUdpListener::listen() set_socket_buffer(); srs_freep(trd_); - trd_ = new SrsSTCoroutine("udp", this, _srs_context->get_id()); + trd_ = factory_->create_coroutine("udp", this, _srs_context->get_id()); if ((err = trd_->start()) != srs_success) { return srs_error_wrap(err, "start thread"); } @@ -250,12 +254,16 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler *h) lfd_ = NULL; label_ = "TCP"; trd_ = new SrsDummyCoroutine(); + + factory_ = _srs_app_factory; } SrsTcpListener::~SrsTcpListener() { srs_freep(trd_); srs_close_stfd(lfd_); + + factory_ = NULL; } ISrsListener *SrsTcpListener::set_label(const std::string &label) @@ -298,7 +306,7 @@ srs_error_t SrsTcpListener::listen() } srs_freep(trd_); - trd_ = new SrsSTCoroutine("tcp", this); + trd_ = factory_->create_coroutine("tcp", this, _srs_context->get_id()); if ((err = trd_->start()) != srs_success) { return srs_error_wrap(err, "start coroutine"); } @@ -356,20 +364,24 @@ srs_error_t SrsTcpListener::do_cycle() SrsMultipleTcpListeners::SrsMultipleTcpListeners(ISrsTcpHandler *h) { handler_ = h; + + factory_ = _srs_app_factory; } SrsMultipleTcpListeners::~SrsMultipleTcpListeners() { - for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { - SrsTcpListener *l = *it; + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + ISrsIpListener *l = *it; srs_freep(l); } + + factory_ = NULL; } ISrsListener *SrsMultipleTcpListeners::set_label(const std::string &label) { - for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { - SrsTcpListener *l = *it; + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + ISrsIpListener *l = *it; l->set_label(label); } @@ -378,22 +390,22 @@ ISrsListener *SrsMultipleTcpListeners::set_label(const std::string &label) ISrsListener *SrsMultipleTcpListeners::set_endpoint(const std::string &i, int p) { - for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { - SrsTcpListener *l = *it; + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + ISrsIpListener *l = *it; l->set_endpoint(i, p); } return this; } -SrsMultipleTcpListeners *SrsMultipleTcpListeners::add(const std::vector &endpoints) +ISrsIpListener *SrsMultipleTcpListeners::add(const std::vector &endpoints) { for (int i = 0; i < (int)endpoints.size(); i++) { string ip; int port; srs_net_split_for_listener(endpoints[i], ip, port); - SrsTcpListener *l = new SrsTcpListener(this); + ISrsIpListener *l = factory_->create_tcp_listener(this); l->set_endpoint(ip, port); listeners_.push_back(l); } @@ -405,8 +417,8 @@ srs_error_t SrsMultipleTcpListeners::listen() { srs_error_t err = srs_success; - for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { - SrsTcpListener *l = *it; + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + ISrsIpListener *l = *it; if ((err = l->listen()) != srs_success) { return srs_error_wrap(err, "listen"); @@ -418,8 +430,8 @@ srs_error_t SrsMultipleTcpListeners::listen() void SrsMultipleTcpListeners::close() { - for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { - SrsTcpListener *l = *it; + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + ISrsIpListener *l = *it; srs_freep(l); } listeners_.clear(); @@ -655,6 +667,8 @@ SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler *h, std::string i, int p) trd_ = new SrsDummyCoroutine(); cid_ = _srs_context->generate_id(); + + factory_ = _srs_app_factory; } SrsUdpMuxListener::~SrsUdpMuxListener() @@ -662,6 +676,8 @@ SrsUdpMuxListener::~SrsUdpMuxListener() srs_freep(trd_); srs_close_stfd(lfd_); srs_freepa(buf_); + + factory_ = NULL; } int SrsUdpMuxListener::fd() @@ -683,7 +699,7 @@ srs_error_t SrsUdpMuxListener::listen() } srs_freep(trd_); - trd_ = new SrsSTCoroutine("udp", this, cid_); + trd_ = factory_->create_coroutine("udp", this, cid_); // change stack size to 256K, fix crash when call some 3rd-part api. ((SrsSTCoroutine *)trd_)->set_stack_size(1 << 18); @@ -754,7 +770,8 @@ srs_error_t SrsUdpMuxListener::cycle() // Because we have to decrypt the cipher of received packet payload, // and the size is not determined, so we think there is at least one copy, // and we can reuse the plaintext h264/opus with players when got plaintext. - SrsUdpMuxSocket skt(lfd_); + SrsUniquePtr skt_ptr(new SrsUdpMuxSocket(lfd_)); + ISrsUdpMuxSocket *skt = skt_ptr.get(); // How many messages to run a yield. uint32_t nn_msgs_for_yield = 0; @@ -766,7 +783,7 @@ srs_error_t SrsUdpMuxListener::cycle() nn_loop++; - int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT); + int nread = skt->recvfrom(SRS_UTIME_NO_TIMEOUT); if (nread <= 0) { if (nread < 0) { srs_warn("udp recv error nn=%d", nread); @@ -779,7 +796,7 @@ srs_error_t SrsUdpMuxListener::cycle() nn_msgs_stage++; // Handle the UDP packet. - err = handler_->on_udp_packet(&skt); + err = handler_->on_udp_packet(skt); // Use pithy print to show more smart information. if (err != srs_success) { @@ -789,7 +806,7 @@ srs_error_t SrsUdpMuxListener::cycle() _srs_context->set_id(cid_); // Append more information. - err = srs_error_wrap(err, "size=%u, data=[%s]", skt.size(), srs_strings_dumps_hex(skt.data(), skt.size(), 8).c_str()); + err = srs_error_wrap(err, "size=%u, data=[%s]", skt->size(), srs_strings_dumps_hex(skt->data(), skt->size(), 8).c_str()); srs_warn("handle udp pkt, count=%u/%u, err: %s", pp_pkt_handler_err->nn_count_, nn, srs_error_desc(err).c_str()); } srs_freep(err); diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 2a58b188f..25c0ad7b9 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -23,6 +23,8 @@ struct sockaddr; class SrsBuffer; class SrsUdpMuxSocket; class ISrsListener; +class ISrsAppFactory; +class ISrsUdpMuxSocket; // The udp packet handler. class ISrsUdpHandler @@ -50,7 +52,7 @@ public: virtual ~ISrsUdpMuxHandler(); public: - virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt) = 0; + virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt) = 0; }; // All listener should support listen method. @@ -91,6 +93,9 @@ public: // Bind udp port, start thread to recv packet and handler it. class SrsUdpListener : public ISrsCoroutineHandler, public ISrsIpListener { +private: + ISrsAppFactory *factory_; + protected: std::string label_; srs_netfd_t lfd_; @@ -134,6 +139,9 @@ private: // Bind and listen tcp port, use handler to process the client. class SrsTcpListener : public ISrsCoroutineHandler, public ISrsIpListener { +private: + ISrsAppFactory *factory_; + private: std::string label_; srs_netfd_t lfd_; @@ -168,9 +176,12 @@ private: // Bind and listen tcp port, use handler to process the client. class SrsMultipleTcpListeners : public ISrsIpListener, public ISrsTcpHandler { +private: + ISrsAppFactory *factory_; + private: ISrsTcpHandler *handler_; - std::vector listeners_; + std::vector listeners_; public: SrsMultipleTcpListeners(ISrsTcpHandler *h); @@ -179,7 +190,7 @@ public: public: ISrsListener *set_label(const std::string &label); ISrsListener *set_endpoint(const std::string &i, int p); - SrsMultipleTcpListeners *add(const std::vector &endpoints); + ISrsIpListener *add(const std::vector &endpoints); public: srs_error_t listen(); @@ -203,6 +214,9 @@ public: virtual std::string peer_id() = 0; virtual uint64_t fast_id() = 0; virtual ISrsUdpMuxSocket *copy_sendonly() = 0; + virtual int recvfrom(srs_utime_t timeout) = 0; + virtual char *data() = 0; + virtual int size() = 0; }; // TODO: FIXME: Rename it. Refine it for performance issue. @@ -256,6 +270,9 @@ public: class SrsUdpMuxListener : public ISrsCoroutineHandler { +private: + ISrsAppFactory *factory_; + private: srs_netfd_t lfd_; ISrsCoroutine *trd_; diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 52166944a..bcbca58e9 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -556,7 +556,7 @@ srs_error_t SrsRtcSessionManager::exec_rtc_async_work(ISrsAsyncCallTask *t) return rtc_async_->execute(t); } -srs_error_t SrsRtcSessionManager::on_udp_packet(SrsUdpMuxSocket *skt) +srs_error_t SrsRtcSessionManager::on_udp_packet(ISrsUdpMuxSocket *skt) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 70998ebe7..c14a3e858 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -28,6 +28,7 @@ class SrsSdp; class SrsRtcSource; class SrsResourceManager; class SrsAsyncCallWorker; +class ISrsUdpMuxSocket; // The UDP black hole, for developer to use wireshark to catch plaintext packets. // For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole, @@ -118,7 +119,7 @@ public: virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t); public: - virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt); + virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt); }; #endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 3f4f872d1..7aa93907f 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1370,7 +1370,7 @@ srs_error_t SrsServer::listen_rtc_udp() return err; } -srs_error_t SrsServer::on_udp_packet(SrsUdpMuxSocket *skt) +srs_error_t SrsServer::on_udp_packet(ISrsUdpMuxSocket *skt) { return rtc_session_manager_->on_udp_packet(skt); } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 9a6b7962b..e719df60e 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -69,6 +69,7 @@ class ISrsLog; class ISrsStatistic; class ISrsHourGlass; class ISrsAppFactory; +class ISrsUdpMuxSocket; // Initialize global shared variables cross all threads. extern srs_error_t srs_global_initialize(); @@ -298,7 +299,7 @@ private: // Interface ISrsUdpMuxHandler public: - virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt); + virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt); private: virtual srs_error_t listen_rtc_api(); diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 5003818dd..620791858 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -60,8 +60,8 @@ ISrsContext *_srs_context = NULL; SrsConfig *_srs_config = NULL; // @global kernel factory. -ISrsKernelFactory *_srs_kernel_factory = new SrsFinalFactory(); ISrsAppFactory *_srs_app_factory = new SrsAppFactory(); +ISrsKernelFactory *_srs_kernel_factory = _srs_app_factory; // @global version of srs, which can grep keyword "XCORE" extern const char *_srs_version; diff --git a/trunk/src/utest/srs_utest.cpp b/trunk/src/utest/srs_utest.cpp index 12faff8f8..1f78878e4 100644 --- a/trunk/src/utest/srs_utest.cpp +++ b/trunk/src/utest/srs_utest.cpp @@ -50,8 +50,8 @@ bool _srs_in_docker = false; bool _srs_config_by_env = false; // @global kernel factory. -ISrsKernelFactory *_srs_kernel_factory = new SrsFinalFactory(); ISrsAppFactory *_srs_app_factory = new SrsAppFactory(); +ISrsKernelFactory *_srs_kernel_factory = _srs_app_factory; // The binary name of SRS. const char *_srs_binary = NULL; diff --git a/trunk/src/utest/srs_utest_app13.cpp b/trunk/src/utest/srs_utest_app13.cpp index da3e882e3..488d641d9 100644 --- a/trunk/src/utest/srs_utest_app13.cpp +++ b/trunk/src/utest/srs_utest_app13.cpp @@ -3234,6 +3234,31 @@ ISrsFragmentedMp4 *MockDvrAppFactory::create_fragmented_mp4() return NULL; } +ISrsIpListener *MockDvrAppFactory::create_tcp_listener(ISrsTcpHandler *handler) +{ + return NULL; +} + +ISrsCoroutine *MockDvrAppFactory::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid) +{ + return NULL; +} + +ISrsTime *MockDvrAppFactory::create_time() +{ + return NULL; +} + +ISrsConfig *MockDvrAppFactory::create_config() +{ + return NULL; +} + +ISrsCond *MockDvrAppFactory::create_cond() +{ + return NULL; +} + VOID TEST(DvrSegmenterTest, OpenTypicalScenario) { srs_error_t err; diff --git a/trunk/src/utest/srs_utest_app13.hpp b/trunk/src/utest/srs_utest_app13.hpp index f5391acdd..393f86f17 100644 --- a/trunk/src/utest/srs_utest_app13.hpp +++ b/trunk/src/utest/srs_utest_app13.hpp @@ -640,6 +640,12 @@ public: virtual ISrsInitMp4 *create_init_mp4(); virtual ISrsFragmentWindow *create_fragment_window(); virtual ISrsFragmentedMp4 *create_fragmented_mp4(); + virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler); + // ISrsKernelFactory interface methods + virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid); + virtual ISrsTime *create_time(); + virtual ISrsConfig *create_config(); + virtual ISrsCond *create_cond(); }; // Mock ISrsDvrSegmenter for testing SrsDvrPlan diff --git a/trunk/src/utest/srs_utest_app14.cpp b/trunk/src/utest/srs_utest_app14.cpp index 0775b36df..f15c556fc 100644 --- a/trunk/src/utest/srs_utest_app14.cpp +++ b/trunk/src/utest/srs_utest_app14.cpp @@ -2393,6 +2393,31 @@ ISrsFragmentedMp4 *MockAppFactoryForGbPublish::create_fragmented_mp4() return NULL; } +ISrsIpListener *MockAppFactoryForGbPublish::create_tcp_listener(ISrsTcpHandler *handler) +{ + return NULL; +} + +ISrsCoroutine *MockAppFactoryForGbPublish::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid) +{ + return NULL; +} + +ISrsTime *MockAppFactoryForGbPublish::create_time() +{ + return NULL; +} + +ISrsConfig *MockAppFactoryForGbPublish::create_config() +{ + return NULL; +} + +ISrsCond *MockAppFactoryForGbPublish::create_cond() +{ + return NULL; +} + void MockAppFactoryForGbPublish::reset() { srs_freep(mock_gb_session_); @@ -3276,11 +3301,14 @@ MockUdpMuxSocket::MockUdpMuxSocket() peer_port_ = 5000; peer_id_ = "192.168.1.100:5000"; fast_id_ = 0; + data_ = NULL; + size_ = 0; } MockUdpMuxSocket::~MockUdpMuxSocket() { srs_freep(sendto_error_); + data_ = NULL; } srs_error_t MockUdpMuxSocket::sendto(void *data, int size, srs_utime_t timeout) @@ -3316,6 +3344,24 @@ SrsUdpMuxSocket *MockUdpMuxSocket::copy_sendonly() return (SrsUdpMuxSocket *)this; } +int MockUdpMuxSocket::recvfrom(srs_utime_t timeout) +{ + // Mock implementation - return the size of data received + return size_; +} + +char *MockUdpMuxSocket::data() +{ + // Mock implementation - return the data buffer + return data_; +} + +int MockUdpMuxSocket::size() +{ + // Mock implementation - return the size of data + return size_; +} + void MockUdpMuxSocket::reset() { srs_freep(sendto_error_); diff --git a/trunk/src/utest/srs_utest_app14.hpp b/trunk/src/utest/srs_utest_app14.hpp index 9091d9fac..9996f2def 100644 --- a/trunk/src/utest/srs_utest_app14.hpp +++ b/trunk/src/utest/srs_utest_app14.hpp @@ -603,6 +603,12 @@ public: virtual ISrsInitMp4 *create_init_mp4(); virtual ISrsFragmentWindow *create_fragment_window(); virtual ISrsFragmentedMp4 *create_fragmented_mp4(); + virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler); + // ISrsKernelFactory interface methods + virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid); + virtual ISrsTime *create_time(); + virtual ISrsConfig *create_config(); + virtual ISrsCond *create_cond(); void reset(); }; @@ -715,6 +721,8 @@ public: int peer_port_; std::string peer_id_; uint64_t fast_id_; + char *data_; + int size_; public: MockUdpMuxSocket(); @@ -727,6 +735,9 @@ public: virtual std::string peer_id(); virtual uint64_t fast_id(); virtual SrsUdpMuxSocket *copy_sendonly(); + virtual int recvfrom(srs_utime_t timeout); + virtual char *data(); + virtual int size(); public: void reset(); diff --git a/trunk/src/utest/srs_utest_app16.cpp b/trunk/src/utest/srs_utest_app16.cpp new file mode 100644 index 000000000..c5828fea0 --- /dev/null +++ b/trunk/src/utest/srs_utest_app16.cpp @@ -0,0 +1,543 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#include + +using namespace std; + +#include +#include +#include +#include +#include +#include +#include + +// Mock ISrsUdpHandler implementation +MockUdpHandler::MockUdpHandler() +{ + on_udp_packet_called_ = false; + packet_count_ = 0; + last_packet_data_ = ""; + last_packet_size_ = 0; +} + +MockUdpHandler::~MockUdpHandler() +{ +} + +srs_error_t MockUdpHandler::on_udp_packet(const sockaddr *from, const int fromlen, char *buf, int nb_buf) +{ + on_udp_packet_called_ = true; + packet_count_++; + last_packet_data_ = string(buf, nb_buf); + last_packet_size_ = nb_buf; + return srs_success; +} + +// Mock ISrsUdpMuxHandler implementation +MockUdpMuxHandler::MockUdpMuxHandler() +{ + on_udp_packet_called_ = false; + packet_count_ = 0; + last_peer_ip_ = ""; + last_peer_port_ = 0; + last_packet_data_ = ""; + last_packet_size_ = 0; +} + +MockUdpMuxHandler::~MockUdpMuxHandler() +{ +} + +srs_error_t MockUdpMuxHandler::on_udp_packet(ISrsUdpMuxSocket *skt) +{ + on_udp_packet_called_ = true; + packet_count_++; + last_peer_ip_ = skt->get_peer_ip(); + last_peer_port_ = skt->get_peer_port(); + last_packet_data_ = string(skt->data(), skt->size()); + last_packet_size_ = skt->size(); + return srs_success; +} + +VOID TEST(UdpListenerTest, ListenAndReceivePacket) +{ + srs_error_t err; + + // Generate random port in range [30000, 60000] + SrsRand rand; + int port = rand.integer(30000, 60000); + + // Create mock UDP handler + SrsUniquePtr mock_handler(new MockUdpHandler()); + + // Create UDP listener with mock handler + SrsUniquePtr listener(new SrsUdpListener(mock_handler.get())); + + // Set endpoint and label + listener->set_endpoint("127.0.0.1", port); + listener->set_label("TEST-UDP"); + + // Start listening - this should create UDP socket and start coroutine + HELPER_EXPECT_SUCCESS(listener->listen()); + + // Verify that the listener has a valid file descriptor + EXPECT_TRUE(listener->stfd() != NULL); + + // Create a client UDP socket to send test packet + srs_netfd_t client_fd = NULL; + HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", 0, &client_fd)); + EXPECT_TRUE(client_fd != NULL); + SrsUniquePtr client_fd_ptr(&client_fd, srs_close_stfd_ptr); + + // Prepare test packet data + string test_data = "Hello UDP Listener Test"; + + // Send packet to the listener + sockaddr_in dest_addr; + memset(&dest_addr, 0, sizeof(dest_addr)); + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(port); + dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + + int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(), + (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); + EXPECT_EQ(sent, (int)test_data.size()); + + // Wait a bit for the listener to receive and process the packet + srs_usleep(100 * SRS_UTIME_MILLISECONDS); + + // Verify that the mock handler received the packet + EXPECT_TRUE(mock_handler->on_udp_packet_called_); + EXPECT_EQ(mock_handler->packet_count_, 1); + EXPECT_EQ(mock_handler->last_packet_size_, (int)test_data.size()); + EXPECT_EQ(mock_handler->last_packet_data_, test_data); + + // Clean up - close the listener + listener->close(); +} + +VOID TEST(UdpListenerTest, SetEndpointAndSocketBuffer) +{ + srs_error_t err; + + // Generate random port in range [30000, 60000] + SrsRand rand; + int port = rand.integer(30000, 60000); + + // Create mock UDP handler + SrsUniquePtr mock_handler(new MockUdpHandler()); + + // Create UDP listener with mock handler + SrsUniquePtr listener(new SrsUdpListener(mock_handler.get())); + + // Test set_label - should return this for chaining + ISrsListener *result = listener->set_label("TEST-LABEL"); + EXPECT_EQ(result, listener.get()); + + // Test set_endpoint - should return this for chaining + result = listener->set_endpoint("127.0.0.1", port); + EXPECT_EQ(result, listener.get()); + + // Start listening to create the socket + HELPER_EXPECT_SUCCESS(listener->listen()); + + // Test fd() - should return valid file descriptor + int fd = listener->fd(); + EXPECT_GT(fd, 0); + + // Test stfd() - should return valid state threads file descriptor + srs_netfd_t stfd = listener->stfd(); + EXPECT_TRUE(stfd != NULL); + EXPECT_EQ(srs_netfd_fileno(stfd), fd); + + // Verify socket buffer settings by checking socket options + int sndbuf = 0; + socklen_t opt_len = sizeof(sndbuf); + getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&sndbuf, &opt_len); + // Socket buffer should be set (may not be exactly 10M due to OS limits, but should be > 0) + EXPECT_GT(sndbuf, 0); + + int rcvbuf = 0; + opt_len = sizeof(rcvbuf); + getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&rcvbuf, &opt_len); + // Socket buffer should be set (may not be exactly 10M due to OS limits, but should be > 0) + EXPECT_GT(rcvbuf, 0); + + // Clean up - close the listener + listener->close(); +} + +VOID TEST(UdpMuxListenerTest, ListenAndCreateSocket) +{ + srs_error_t err; + + // Generate random port in range [30000, 60000] + SrsRand rand; + int port = rand.integer(30000, 60000); + + // Create mock UDP mux handler + SrsUniquePtr mock_handler(new MockUdpMuxHandler()); + + // Create UDP mux listener with mock handler + SrsUniquePtr listener(new SrsUdpMuxListener(mock_handler.get(), "127.0.0.1", port)); + + // Start listening - this should create UDP socket and start coroutine + // Note: factory_ is already set to _srs_app_factory in constructor + HELPER_EXPECT_SUCCESS(listener->listen()); + + // Verify that the listener has a valid file descriptor + EXPECT_TRUE(listener->stfd() != NULL); + EXPECT_GT(listener->fd(), 0); + + // Verify that we can get the file descriptor + int fd = listener->fd(); + EXPECT_GT(fd, 0); + + // Verify that the socket is bound to the correct port by checking socket name + sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + int ret = getsockname(fd, (sockaddr *)&addr, &addr_len); + EXPECT_EQ(ret, 0); + EXPECT_EQ(ntohs(addr.sin_port), port); + EXPECT_EQ(addr.sin_family, AF_INET); +} + +VOID TEST(UdpMuxListenerTest, SetSocketBuffer) +{ + srs_error_t err; + + // Generate random port in range [30000, 60000] + SrsRand rand; + int port = rand.integer(30000, 60000); + + // Create mock UDP mux handler + SrsUniquePtr mock_handler(new MockUdpMuxHandler()); + + // Create UDP mux listener with mock handler + SrsUniquePtr listener(new SrsUdpMuxListener(mock_handler.get(), "127.0.0.1", port)); + + // Start listening - this should create UDP socket + HELPER_EXPECT_SUCCESS(listener->listen()); + + // Get the file descriptor + int fd = listener->fd(); + EXPECT_GT(fd, 0); + + // Wait a bit for the cycle() to start and call set_socket_buffer() + srs_usleep(100 * SRS_UTIME_MILLISECONDS); + + // Verify SO_SNDBUF is set - should be greater than default + int sndbuf = 0; + socklen_t opt_len = sizeof(sndbuf); + int ret = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&sndbuf, &opt_len); + EXPECT_EQ(ret, 0); + EXPECT_GT(sndbuf, 0); + // The actual buffer size may be less than 10M due to OS limits, but should be reasonably large + // On most systems, it should be at least 1KB if the 10MB request was processed + EXPECT_GT(sndbuf, 1024); + + // Verify SO_RCVBUF is set - should be greater than default + int rcvbuf = 0; + opt_len = sizeof(rcvbuf); + ret = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&rcvbuf, &opt_len); + EXPECT_EQ(ret, 0); + EXPECT_GT(rcvbuf, 0); + // The actual buffer size may be less than 10M due to OS limits, but should be reasonably large + // On most systems, it should be at least 1KB if the 10MB request was processed + EXPECT_GT(rcvbuf, 1024); +} + +VOID TEST(UdpMuxListenerTest, ReceivePacketFromClient) +{ + srs_error_t err; + + // Generate random port in range [30000, 60000] + SrsRand rand; + int port = rand.integer(30000, 60000); + + // Create mock UDP mux handler + SrsUniquePtr mock_handler(new MockUdpMuxHandler()); + + // Create UDP mux listener with mock handler - this is the UDP server + SrsUniquePtr listener(new SrsUdpMuxListener(mock_handler.get(), "127.0.0.1", port)); + + // Start listening - this creates the UDP socket and starts the coroutine + HELPER_EXPECT_SUCCESS(listener->listen()); + + // Verify that the listener has a valid file descriptor + EXPECT_TRUE(listener->stfd() != NULL); + EXPECT_GT(listener->fd(), 0); + + // Yield to allow the listener coroutine to start and initialize + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Create a UDP client socket to send test packets + srs_netfd_t client_fd = NULL; + HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", 0, &client_fd)); + EXPECT_TRUE(client_fd != NULL); + SrsUniquePtr client_fd_ptr(&client_fd, srs_close_stfd_ptr); + + // Prepare test packet data + string test_data = "Hello UDP Mux Listener Test"; + + // Send packet from client to the UDP server + sockaddr_in dest_addr; + memset(&dest_addr, 0, sizeof(dest_addr)); + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(port); + dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + + int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(), + (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); + EXPECT_EQ(sent, (int)test_data.size()); + + // Yield to allow the listener coroutine to start and initialize + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Verify that the mock handler received the packet via SrsUdpMuxSocket + EXPECT_TRUE(mock_handler->on_udp_packet_called_); + EXPECT_EQ(mock_handler->packet_count_, 1); + EXPECT_EQ(mock_handler->last_packet_size_, (int)test_data.size()); + EXPECT_EQ(mock_handler->last_packet_data_, test_data); + + // Send another packet to verify multiple packets work + string test_data2 = "Second packet"; + sent = srs_sendto(client_fd, (void *)test_data2.c_str(), test_data2.size(), + (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); + EXPECT_EQ(sent, (int)test_data2.size()); + + // Yield to allow the listener coroutine to start and initialize + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Verify the second packet was received + EXPECT_EQ(mock_handler->packet_count_, 2); + EXPECT_EQ(mock_handler->last_packet_size_, (int)test_data2.size()); + EXPECT_EQ(mock_handler->last_packet_data_, test_data2); +} + +VOID TEST(UdpMuxSocketTest, SendtoReplyToClient) +{ + srs_error_t err; + + // Generate random ports in range [30000, 60000] for server and client + SrsRand rand; + int server_port = rand.integer(30000, 60000); + int client_port = rand.integer(30000, 60000); + while (client_port == server_port) { + client_port = rand.integer(30000, 60000); + } + + // Create a standalone UDP server socket (not using listener to avoid interference) + srs_netfd_t server_fd = NULL; + HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", server_port, &server_fd)); + EXPECT_TRUE(server_fd != NULL); + SrsUniquePtr server_fd_ptr(&server_fd, srs_close_stfd_ptr); + + // Create a UDP client socket + srs_netfd_t client_fd = NULL; + HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", client_port, &client_fd)); + EXPECT_TRUE(client_fd != NULL); + SrsUniquePtr client_fd_ptr(&client_fd, srs_close_stfd_ptr); + + // Create SrsUdpMuxSocket wrapping the server socket + SrsUniquePtr server_socket(new SrsUdpMuxSocket(server_fd)); + + // Prepare test packet data + string test_data = "Hello from client"; + + // Send packet from client to the server + sockaddr_in dest_addr; + memset(&dest_addr, 0, sizeof(dest_addr)); + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(server_port); + dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + + int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(), + (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); + EXPECT_EQ(sent, (int)test_data.size()); + + // Yield to allow packet to arrive + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Receive the packet with server socket - this populates from_ and fromlen_ + int nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS); + EXPECT_EQ(nread, (int)test_data.size()); + EXPECT_EQ(string(server_socket->data(), nread), test_data); + + // Verify the peer information is correctly captured + // Note: peer_id() must be called first to populate peer_ip_ and peer_port_ + string peer_id = server_socket->peer_id(); + EXPECT_FALSE(peer_id.empty()); + EXPECT_EQ(server_socket->get_peer_port(), client_port); + EXPECT_EQ(server_socket->get_peer_ip(), "127.0.0.1"); + + // Now test the sendto functionality - send a reply back to the client + string reply_data = "Hello from server"; + HELPER_EXPECT_SUCCESS(server_socket->sendto((void *)reply_data.c_str(), reply_data.size(), SRS_UTIME_NO_TIMEOUT)); + + // Yield to allow the packet to be sent + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Receive the reply on the client side + char recv_buf[1024]; + sockaddr_in from_addr; + int from_len = sizeof(from_addr); + nread = srs_recvfrom(client_fd, recv_buf, sizeof(recv_buf), (sockaddr *)&from_addr, &from_len, 100 * SRS_UTIME_MILLISECONDS); + + // Verify the reply was received + EXPECT_EQ(nread, (int)reply_data.size()); + EXPECT_EQ(string(recv_buf, nread), reply_data); + EXPECT_EQ(ntohs(from_addr.sin_port), server_port); + + // Test multiple sendto calls to verify yield behavior (nn_msgs_for_yield_ > 20) + // Send 25 packets to trigger the yield logic + for (int i = 0; i < 25; i++) { + std::stringstream ss; + ss << "msg" << i; + string msg = ss.str(); + HELPER_EXPECT_SUCCESS(server_socket->sendto((void *)msg.c_str(), msg.size(), SRS_UTIME_NO_TIMEOUT)); + } + + // Verify at least some packets were received (may not be all due to UDP nature) + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + int received_count = 0; + while (received_count < 25) { + nread = srs_recvfrom(client_fd, recv_buf, sizeof(recv_buf), (sockaddr *)&from_addr, &from_len, 10 * SRS_UTIME_MILLISECONDS); + if (nread <= 0) + break; + received_count++; + } + // Should receive at least some packets (UDP may drop some, but most should arrive on localhost) + EXPECT_GT(received_count, 0); +} + +VOID TEST(UdpMuxSocketTest, PeerIdGenerationAndCaching) +{ + srs_error_t err; + + // Generate random ports in range [30000, 60000] for server and client + SrsRand rand; + int server_port = rand.integer(30000, 60000); + int client_port = rand.integer(30000, 60000); + while (client_port == server_port) { + client_port = rand.integer(30000, 60000); + } + + // Create a standalone UDP server socket + srs_netfd_t server_fd = NULL; + HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", server_port, &server_fd)); + EXPECT_TRUE(server_fd != NULL); + SrsUniquePtr server_fd_ptr(&server_fd, srs_close_stfd_ptr); + + // Create a UDP client socket + srs_netfd_t client_fd = NULL; + HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", client_port, &client_fd)); + EXPECT_TRUE(client_fd != NULL); + SrsUniquePtr client_fd_ptr(&client_fd, srs_close_stfd_ptr); + + // Create SrsUdpMuxSocket wrapping the server socket + SrsUniquePtr server_socket(new SrsUdpMuxSocket(server_fd)); + + // Prepare test packet data + string test_data = "Test packet for peer_id"; + + // Send packet from client to the server + sockaddr_in dest_addr; + memset(&dest_addr, 0, sizeof(dest_addr)); + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(server_port); + dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + + int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(), + (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); + EXPECT_EQ(sent, (int)test_data.size()); + + // Yield to allow packet to arrive + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Receive the packet with server socket - this sets address_changed_ to true + int nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS); + EXPECT_EQ(nread, (int)test_data.size()); + EXPECT_EQ(string(server_socket->data(), nread), test_data); + + // Test peer_id() - first call should generate the peer ID + string peer_id = server_socket->peer_id(); + EXPECT_FALSE(peer_id.empty()); + + // Verify peer_id format is "ip:port" + std::stringstream expected_peer_id; + expected_peer_id << "127.0.0.1:" << client_port; + EXPECT_EQ(peer_id, expected_peer_id.str()); + + // Verify get_peer_ip() and get_peer_port() return correct values + EXPECT_EQ(server_socket->get_peer_ip(), "127.0.0.1"); + EXPECT_EQ(server_socket->get_peer_port(), client_port); + + // Test peer_id() caching - second call should return cached value without regeneration + string peer_id2 = server_socket->peer_id(); + EXPECT_EQ(peer_id2, peer_id); + + // Send another packet from the same client + string test_data2 = "Second packet"; + sent = srs_sendto(client_fd, (void *)test_data2.c_str(), test_data2.size(), + (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); + EXPECT_EQ(sent, (int)test_data2.size()); + + // Yield to allow packet to arrive + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Receive the second packet - this should set address_changed_ to true again + nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS); + EXPECT_EQ(nread, (int)test_data2.size()); + + // Call peer_id() again - should regenerate but return the same value (same client) + string peer_id3 = server_socket->peer_id(); + EXPECT_EQ(peer_id3, peer_id); + + // Test fast_id() - should return non-zero for IPv4 + uint64_t fast_id = server_socket->fast_id(); + EXPECT_GT(fast_id, 0ULL); + + // Verify IP address caching by sending from a different client port + int client_port2 = rand.integer(30000, 60000); + while (client_port2 == server_port || client_port2 == client_port) { + client_port2 = rand.integer(30000, 60000); + } + + srs_netfd_t client_fd2 = NULL; + HELPER_EXPECT_SUCCESS(srs_udp_listen("127.0.0.1", client_port2, &client_fd2)); + EXPECT_TRUE(client_fd2 != NULL); + SrsUniquePtr client_fd2_ptr(&client_fd2, srs_close_stfd_ptr); + + // Send packet from second client + string test_data3 = "Third packet from different client"; + sent = srs_sendto(client_fd2, (void *)test_data3.c_str(), test_data3.size(), + (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); + EXPECT_EQ(sent, (int)test_data3.size()); + + // Yield to allow packet to arrive + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Receive packet from second client + nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS); + EXPECT_EQ(nread, (int)test_data3.size()); + + // Call peer_id() - should generate new peer ID with different port + string peer_id4 = server_socket->peer_id(); + EXPECT_FALSE(peer_id4.empty()); + + // Verify new peer_id has different port but same IP (IP should be cached) + std::stringstream expected_peer_id2; + expected_peer_id2 << "127.0.0.1:" << client_port2; + EXPECT_EQ(peer_id4, expected_peer_id2.str()); + EXPECT_NE(peer_id4, peer_id); + + // Verify get_peer_port() returns the new port + EXPECT_EQ(server_socket->get_peer_port(), client_port2); + EXPECT_EQ(server_socket->get_peer_ip(), "127.0.0.1"); +} diff --git a/trunk/src/utest/srs_utest_app16.hpp b/trunk/src/utest/srs_utest_app16.hpp new file mode 100644 index 000000000..bfa2176dd --- /dev/null +++ b/trunk/src/utest/srs_utest_app16.hpp @@ -0,0 +1,56 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#ifndef SRS_UTEST_APP16_HPP +#define SRS_UTEST_APP16_HPP + +/* +#include +*/ +#include + +#include +#include +#include +#include + +// Mock ISrsUdpHandler for testing SrsUdpListener +class MockUdpHandler : public ISrsUdpHandler +{ +public: + bool on_udp_packet_called_; + int packet_count_; + std::string last_packet_data_; + int last_packet_size_; + +public: + MockUdpHandler(); + virtual ~MockUdpHandler(); + +public: + virtual srs_error_t on_udp_packet(const sockaddr *from, const int fromlen, char *buf, int nb_buf); +}; + +// Mock ISrsUdpMuxHandler for testing SrsUdpMuxListener +class MockUdpMuxHandler : public ISrsUdpMuxHandler +{ +public: + bool on_udp_packet_called_; + int packet_count_; + std::string last_peer_ip_; + int last_peer_port_; + std::string last_packet_data_; + int last_packet_size_; + +public: + MockUdpMuxHandler(); + virtual ~MockUdpMuxHandler(); + +public: + virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt); +}; + +#endif diff --git a/trunk/src/utest/srs_utest_st.cpp b/trunk/src/utest/srs_utest_st.cpp index faa261bb2..e213cdbfe 100644 --- a/trunk/src/utest/srs_utest_st.cpp +++ b/trunk/src/utest/srs_utest_st.cpp @@ -126,7 +126,7 @@ VOID TEST(StTest, StUtimePerformance) EXPECT_GE(gettimeofday_elapsed_time, 0); EXPECT_GE(st_utime_elapsed_time, 0); - EXPECT_LT(gettimeofday_elapsed_time > st_utime_elapsed_time ? gettimeofday_elapsed_time - st_utime_elapsed_time : st_utime_elapsed_time - gettimeofday_elapsed_time, 30); + EXPECT_LT(gettimeofday_elapsed_time > st_utime_elapsed_time ? gettimeofday_elapsed_time - st_utime_elapsed_time : st_utime_elapsed_time - gettimeofday_elapsed_time, 100); } }