server_callback.h 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
  18. #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
  19. // IWYU pragma: private, include <grpcpp/support/server_callback.h>
  20. #include <atomic>
  21. #include <functional>
  22. #include <type_traits>
  23. #include <grpcpp/impl/codegen/call.h>
  24. #include <grpcpp/impl/codegen/call_op_set.h>
  25. #include <grpcpp/impl/codegen/callback_common.h>
  26. #include <grpcpp/impl/codegen/config.h>
  27. #include <grpcpp/impl/codegen/core_codegen_interface.h>
  28. #include <grpcpp/impl/codegen/message_allocator.h>
  29. #include <grpcpp/impl/codegen/status.h>
  30. #include <grpcpp/impl/codegen/sync.h>
  31. namespace grpc {
  32. // Declare base class of all reactors as internal
  33. namespace internal {
  34. // Forward declarations
  35. template <class Request, class Response>
  36. class CallbackUnaryHandler;
  37. template <class Request, class Response>
  38. class CallbackClientStreamingHandler;
  39. template <class Request, class Response>
  40. class CallbackServerStreamingHandler;
  41. template <class Request, class Response>
  42. class CallbackBidiHandler;
  43. class ServerReactor {
  44. public:
  45. virtual ~ServerReactor() = default;
  46. virtual void OnDone() = 0;
  47. virtual void OnCancel() = 0;
  48. // The following is not API. It is for internal use only and specifies whether
  49. // all reactions of this Reactor can be run without an extra executor
  50. // scheduling. This should only be used for internally-defined reactors with
  51. // trivial reactions.
  52. virtual bool InternalInlineable() { return false; }
  53. private:
  54. template <class Request, class Response>
  55. friend class CallbackUnaryHandler;
  56. template <class Request, class Response>
  57. friend class CallbackClientStreamingHandler;
  58. template <class Request, class Response>
  59. friend class CallbackServerStreamingHandler;
  60. template <class Request, class Response>
  61. friend class CallbackBidiHandler;
  62. };
  63. /// The base class of ServerCallbackUnary etc.
  64. class ServerCallbackCall {
  65. public:
  66. virtual ~ServerCallbackCall() {}
  67. // This object is responsible for tracking when it is safe to call OnDone and
  68. // OnCancel. OnDone should not be called until the method handler is complete,
  69. // Finish has been called, the ServerContext CompletionOp (which tracks
  70. // cancellation or successful completion) has completed, and all outstanding
  71. // Read/Write actions have seen their reactions. OnCancel should not be called
  72. // until after the method handler is done and the RPC has completed with a
  73. // cancellation. This is tracked by counting how many of these conditions have
  74. // been met and calling OnCancel when none remain unmet.
  75. // Public versions of MaybeDone: one where we don't know the reactor in
  76. // advance (used for the ServerContext CompletionOp), and one for where we
  77. // know the inlineability of the OnDone reaction. You should set the inline
  78. // flag to true if either the Reactor is InternalInlineable() or if this
  79. // callback is already being forced to run dispatched to an executor
  80. // (typically because it contains additional work than just the MaybeDone).
  81. void MaybeDone() {
  82. if (GPR_UNLIKELY(Unref() == 1)) {
  83. ScheduleOnDone(reactor()->InternalInlineable());
  84. }
  85. }
  86. void MaybeDone(bool inline_ondone) {
  87. if (GPR_UNLIKELY(Unref() == 1)) {
  88. ScheduleOnDone(inline_ondone);
  89. }
  90. }
  91. // Fast version called with known reactor passed in, used from derived
  92. // classes, typically in non-cancel case
  93. void MaybeCallOnCancel(ServerReactor* reactor) {
  94. if (GPR_UNLIKELY(UnblockCancellation())) {
  95. CallOnCancel(reactor);
  96. }
  97. }
  98. // Slower version called from object that doesn't know the reactor a priori
  99. // (such as the ServerContext CompletionOp which is formed before the
  100. // reactor). This is used in cancel cases only, so it's ok to be slower and
  101. // invoke a virtual function.
  102. void MaybeCallOnCancel() {
  103. if (GPR_UNLIKELY(UnblockCancellation())) {
  104. CallOnCancel(reactor());
  105. }
  106. }
  107. protected:
  108. /// Increases the reference count
  109. void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
  110. private:
  111. virtual ServerReactor* reactor() = 0;
  112. // CallOnDone performs the work required at completion of the RPC: invoking
  113. // the OnDone function and doing all necessary cleanup. This function is only
  114. // ever invoked on a fully-Unref'fed ServerCallbackCall.
  115. virtual void CallOnDone() = 0;
  116. // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
  117. // to an executor.
  118. void ScheduleOnDone(bool inline_ondone);
  119. // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
  120. // it to an executor.
  121. void CallOnCancel(ServerReactor* reactor);
  122. // Implement the cancellation constraint counter. Return true if OnCancel
  123. // should be called, false otherwise.
  124. bool UnblockCancellation() {
  125. return on_cancel_conditions_remaining_.fetch_sub(
  126. 1, std::memory_order_acq_rel) == 1;
  127. }
  128. /// Decreases the reference count and returns the previous value
  129. int Unref() {
  130. return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
  131. }
  132. std::atomic_int on_cancel_conditions_remaining_{2};
  133. std::atomic_int callbacks_outstanding_{
  134. 3}; // reserve for start, Finish, and CompletionOp
  135. };
  136. template <class Request, class Response>
  137. class DefaultMessageHolder : public MessageHolder<Request, Response> {
  138. public:
  139. DefaultMessageHolder() {
  140. this->set_request(&request_obj_);
  141. this->set_response(&response_obj_);
  142. }
  143. void Release() override {
  144. // the object is allocated in the call arena.
  145. this->~DefaultMessageHolder<Request, Response>();
  146. }
  147. private:
  148. Request request_obj_;
  149. Response response_obj_;
  150. };
  151. } // namespace internal
  152. // Forward declarations
  153. class ServerUnaryReactor;
  154. template <class Request>
  155. class ServerReadReactor;
  156. template <class Response>
  157. class ServerWriteReactor;
  158. template <class Request, class Response>
  159. class ServerBidiReactor;
  160. // NOTE: The actual call/stream object classes are provided as API only to
  161. // support mocking. There are no implementations of these class interfaces in
  162. // the API.
  163. class ServerCallbackUnary : public internal::ServerCallbackCall {
  164. public:
  165. ~ServerCallbackUnary() override {}
  166. virtual void Finish(grpc::Status s) = 0;
  167. virtual void SendInitialMetadata() = 0;
  168. protected:
  169. // Use a template rather than explicitly specifying ServerUnaryReactor to
  170. // delay binding and avoid a circular forward declaration issue
  171. template <class Reactor>
  172. void BindReactor(Reactor* reactor) {
  173. reactor->InternalBindCall(this);
  174. }
  175. };
  176. template <class Request>
  177. class ServerCallbackReader : public internal::ServerCallbackCall {
  178. public:
  179. ~ServerCallbackReader() override {}
  180. virtual void Finish(grpc::Status s) = 0;
  181. virtual void SendInitialMetadata() = 0;
  182. virtual void Read(Request* msg) = 0;
  183. protected:
  184. void BindReactor(ServerReadReactor<Request>* reactor) {
  185. reactor->InternalBindReader(this);
  186. }
  187. };
  188. template <class Response>
  189. class ServerCallbackWriter : public internal::ServerCallbackCall {
  190. public:
  191. ~ServerCallbackWriter() override {}
  192. virtual void Finish(grpc::Status s) = 0;
  193. virtual void SendInitialMetadata() = 0;
  194. virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
  195. virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
  196. grpc::Status s) = 0;
  197. protected:
  198. void BindReactor(ServerWriteReactor<Response>* reactor) {
  199. reactor->InternalBindWriter(this);
  200. }
  201. };
  202. template <class Request, class Response>
  203. class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
  204. public:
  205. ~ServerCallbackReaderWriter() override {}
  206. virtual void Finish(grpc::Status s) = 0;
  207. virtual void SendInitialMetadata() = 0;
  208. virtual void Read(Request* msg) = 0;
  209. virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
  210. virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
  211. grpc::Status s) = 0;
  212. protected:
  213. void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
  214. reactor->InternalBindStream(this);
  215. }
  216. };
  217. // The following classes are the reactor interfaces that are to be implemented
  218. // by the user, returned as the output parameter of the method handler for a
  219. // callback method. Note that none of the classes are pure; all reactions have a
  220. // default empty reaction so that the user class only needs to override those
  221. // reactions that it cares about. The reaction methods will be invoked by the
  222. // library in response to the completion of various operations. Reactions must
  223. // not include blocking operations (such as blocking I/O, starting synchronous
  224. // RPCs, or waiting on condition variables). Reactions may be invoked
  225. // concurrently, except that OnDone is called after all others (assuming proper
  226. // API usage). The reactor may not be deleted until OnDone is called.
  227. /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
  228. template <class Request, class Response>
  229. class ServerBidiReactor : public internal::ServerReactor {
  230. public:
  231. // NOTE: Initializing stream_ as a constructor initializer rather than a
  232. // default initializer because gcc-4.x requires a copy constructor for
  233. // default initializing a templated member, which isn't ok for atomic.
  234. // TODO(vjpai): Switch to default constructor and default initializer when
  235. // gcc-4.x is no longer supported
  236. ServerBidiReactor() : stream_(nullptr) {}
  237. ~ServerBidiReactor() override = default;
  238. /// Send any initial metadata stored in the RPC context. If not invoked,
  239. /// any initial metadata will be passed along with the first Write or the
  240. /// Finish (if there are no writes).
  241. void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) {
  242. ServerCallbackReaderWriter<Request, Response>* stream =
  243. stream_.load(std::memory_order_acquire);
  244. if (stream == nullptr) {
  245. grpc::internal::MutexLock l(&stream_mu_);
  246. stream = stream_.load(std::memory_order_relaxed);
  247. if (stream == nullptr) {
  248. backlog_.send_initial_metadata_wanted = true;
  249. return;
  250. }
  251. }
  252. stream->SendInitialMetadata();
  253. }
  254. /// Initiate a read operation.
  255. ///
  256. /// \param[out] req Where to eventually store the read message. Valid when
  257. /// the library calls OnReadDone
  258. void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
  259. ServerCallbackReaderWriter<Request, Response>* stream =
  260. stream_.load(std::memory_order_acquire);
  261. if (stream == nullptr) {
  262. grpc::internal::MutexLock l(&stream_mu_);
  263. stream = stream_.load(std::memory_order_relaxed);
  264. if (stream == nullptr) {
  265. backlog_.read_wanted = req;
  266. return;
  267. }
  268. }
  269. stream->Read(req);
  270. }
  271. /// Initiate a write operation.
  272. ///
  273. /// \param[in] resp The message to be written. The library does not take
  274. /// ownership but the caller must ensure that the message is
  275. /// not deleted or modified until OnWriteDone is called.
  276. void StartWrite(const Response* resp) {
  277. StartWrite(resp, grpc::WriteOptions());
  278. }
  279. /// Initiate a write operation with specified options.
  280. ///
  281. /// \param[in] resp The message to be written. The library does not take
  282. /// ownership but the caller must ensure that the message is
  283. /// not deleted or modified until OnWriteDone is called.
  284. /// \param[in] options The WriteOptions to use for writing this message
  285. void StartWrite(const Response* resp, grpc::WriteOptions options)
  286. ABSL_LOCKS_EXCLUDED(stream_mu_) {
  287. ServerCallbackReaderWriter<Request, Response>* stream =
  288. stream_.load(std::memory_order_acquire);
  289. if (stream == nullptr) {
  290. grpc::internal::MutexLock l(&stream_mu_);
  291. stream = stream_.load(std::memory_order_relaxed);
  292. if (stream == nullptr) {
  293. backlog_.write_wanted = resp;
  294. backlog_.write_options_wanted = options;
  295. return;
  296. }
  297. }
  298. stream->Write(resp, options);
  299. }
  300. /// Initiate a write operation with specified options and final RPC Status,
  301. /// which also causes any trailing metadata for this RPC to be sent out.
  302. /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
  303. /// single step. A key difference, though, is that this operation doesn't have
  304. /// an OnWriteDone reaction - it is considered complete only when OnDone is
  305. /// available. An RPC can either have StartWriteAndFinish or Finish, but not
  306. /// both.
  307. ///
  308. /// \param[in] resp The message to be written. The library does not take
  309. /// ownership but the caller must ensure that the message is
  310. /// not deleted or modified until OnDone is called.
  311. /// \param[in] options The WriteOptions to use for writing this message
  312. /// \param[in] s The status outcome of this RPC
  313. void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
  314. grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
  315. ServerCallbackReaderWriter<Request, Response>* stream =
  316. stream_.load(std::memory_order_acquire);
  317. if (stream == nullptr) {
  318. grpc::internal::MutexLock l(&stream_mu_);
  319. stream = stream_.load(std::memory_order_relaxed);
  320. if (stream == nullptr) {
  321. backlog_.write_and_finish_wanted = true;
  322. backlog_.write_wanted = resp;
  323. backlog_.write_options_wanted = options;
  324. backlog_.status_wanted = std::move(s);
  325. return;
  326. }
  327. }
  328. stream->WriteAndFinish(resp, options, std::move(s));
  329. }
  330. /// Inform system of a planned write operation with specified options, but
  331. /// allow the library to schedule the actual write coalesced with the writing
  332. /// of trailing metadata (which takes place on a Finish call).
  333. ///
  334. /// \param[in] resp The message to be written. The library does not take
  335. /// ownership but the caller must ensure that the message is
  336. /// not deleted or modified until OnWriteDone is called.
  337. /// \param[in] options The WriteOptions to use for writing this message
  338. void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
  339. StartWrite(resp, options.set_last_message());
  340. }
  341. /// Indicate that the stream is to be finished and the trailing metadata and
  342. /// RPC status are to be sent. Every RPC MUST be finished using either Finish
  343. /// or StartWriteAndFinish (but not both), even if the RPC is already
  344. /// cancelled.
  345. ///
  346. /// \param[in] s The status outcome of this RPC
  347. void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
  348. ServerCallbackReaderWriter<Request, Response>* stream =
  349. stream_.load(std::memory_order_acquire);
  350. if (stream == nullptr) {
  351. grpc::internal::MutexLock l(&stream_mu_);
  352. stream = stream_.load(std::memory_order_relaxed);
  353. if (stream == nullptr) {
  354. backlog_.finish_wanted = true;
  355. backlog_.status_wanted = std::move(s);
  356. return;
  357. }
  358. }
  359. stream->Finish(std::move(s));
  360. }
  361. /// Notifies the application that an explicit StartSendInitialMetadata
  362. /// operation completed. Not used when the sending of initial metadata
  363. /// piggybacks onto the first write.
  364. ///
  365. /// \param[in] ok Was it successful? If false, no further write-side operation
  366. /// will succeed.
  367. virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
  368. /// Notifies the application that a StartRead operation completed.
  369. ///
  370. /// \param[in] ok Was it successful? If false, no further read-side operation
  371. /// will succeed.
  372. virtual void OnReadDone(bool /*ok*/) {}
  373. /// Notifies the application that a StartWrite (or StartWriteLast) operation
  374. /// completed.
  375. ///
  376. /// \param[in] ok Was it successful? If false, no further write-side operation
  377. /// will succeed.
  378. virtual void OnWriteDone(bool /*ok*/) {}
  379. /// Notifies the application that all operations associated with this RPC
  380. /// have completed. This is an override (from the internal base class) but
  381. /// still abstract, so derived classes MUST override it to be instantiated.
  382. void OnDone() override = 0;
  383. /// Notifies the application that this RPC has been cancelled. This is an
  384. /// override (from the internal base class) but not final, so derived classes
  385. /// should override it if they want to take action.
  386. void OnCancel() override {}
  387. private:
  388. friend class ServerCallbackReaderWriter<Request, Response>;
  389. // May be overridden by internal implementation details. This is not a public
  390. // customization point.
  391. virtual void InternalBindStream(
  392. ServerCallbackReaderWriter<Request, Response>* stream) {
  393. grpc::internal::MutexLock l(&stream_mu_);
  394. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
  395. stream->SendInitialMetadata();
  396. }
  397. if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
  398. stream->Read(backlog_.read_wanted);
  399. }
  400. if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
  401. stream->WriteAndFinish(backlog_.write_wanted,
  402. std::move(backlog_.write_options_wanted),
  403. std::move(backlog_.status_wanted));
  404. } else {
  405. if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
  406. stream->Write(backlog_.write_wanted,
  407. std::move(backlog_.write_options_wanted));
  408. }
  409. if (GPR_UNLIKELY(backlog_.finish_wanted)) {
  410. stream->Finish(std::move(backlog_.status_wanted));
  411. }
  412. }
  413. // Set stream_ last so that other functions can use it lock-free
  414. stream_.store(stream, std::memory_order_release);
  415. }
  416. grpc::internal::Mutex stream_mu_;
  417. // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
  418. // once C++17 or ABSL is supported since stream and backlog are
  419. // mutually exclusive in this class. Do likewise with the
  420. // remaining reactor classes and their backlogs as well.
  421. std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
  422. struct PreBindBacklog {
  423. bool send_initial_metadata_wanted = false;
  424. bool write_and_finish_wanted = false;
  425. bool finish_wanted = false;
  426. Request* read_wanted = nullptr;
  427. const Response* write_wanted = nullptr;
  428. grpc::WriteOptions write_options_wanted;
  429. grpc::Status status_wanted;
  430. };
  431. PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
  432. };
  433. /// \a ServerReadReactor is the interface for a client-streaming RPC.
  434. template <class Request>
  435. class ServerReadReactor : public internal::ServerReactor {
  436. public:
  437. ServerReadReactor() : reader_(nullptr) {}
  438. ~ServerReadReactor() override = default;
  439. /// The following operation initiations are exactly like ServerBidiReactor.
  440. void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) {
  441. ServerCallbackReader<Request>* reader =
  442. reader_.load(std::memory_order_acquire);
  443. if (reader == nullptr) {
  444. grpc::internal::MutexLock l(&reader_mu_);
  445. reader = reader_.load(std::memory_order_relaxed);
  446. if (reader == nullptr) {
  447. backlog_.send_initial_metadata_wanted = true;
  448. return;
  449. }
  450. }
  451. reader->SendInitialMetadata();
  452. }
  453. void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
  454. ServerCallbackReader<Request>* reader =
  455. reader_.load(std::memory_order_acquire);
  456. if (reader == nullptr) {
  457. grpc::internal::MutexLock l(&reader_mu_);
  458. reader = reader_.load(std::memory_order_relaxed);
  459. if (reader == nullptr) {
  460. backlog_.read_wanted = req;
  461. return;
  462. }
  463. }
  464. reader->Read(req);
  465. }
  466. void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) {
  467. ServerCallbackReader<Request>* reader =
  468. reader_.load(std::memory_order_acquire);
  469. if (reader == nullptr) {
  470. grpc::internal::MutexLock l(&reader_mu_);
  471. reader = reader_.load(std::memory_order_relaxed);
  472. if (reader == nullptr) {
  473. backlog_.finish_wanted = true;
  474. backlog_.status_wanted = std::move(s);
  475. return;
  476. }
  477. }
  478. reader->Finish(std::move(s));
  479. }
  480. /// The following notifications are exactly like ServerBidiReactor.
  481. virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
  482. virtual void OnReadDone(bool /*ok*/) {}
  483. void OnDone() override = 0;
  484. void OnCancel() override {}
  485. private:
  486. friend class ServerCallbackReader<Request>;
  487. // May be overridden by internal implementation details. This is not a public
  488. // customization point.
  489. virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
  490. ABSL_LOCKS_EXCLUDED(reader_mu_) {
  491. grpc::internal::MutexLock l(&reader_mu_);
  492. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
  493. reader->SendInitialMetadata();
  494. }
  495. if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
  496. reader->Read(backlog_.read_wanted);
  497. }
  498. if (GPR_UNLIKELY(backlog_.finish_wanted)) {
  499. reader->Finish(std::move(backlog_.status_wanted));
  500. }
  501. // Set reader_ last so that other functions can use it lock-free
  502. reader_.store(reader, std::memory_order_release);
  503. }
  504. grpc::internal::Mutex reader_mu_;
  505. std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
  506. struct PreBindBacklog {
  507. bool send_initial_metadata_wanted = false;
  508. bool finish_wanted = false;
  509. Request* read_wanted = nullptr;
  510. grpc::Status status_wanted;
  511. };
  512. PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
  513. };
  514. /// \a ServerWriteReactor is the interface for a server-streaming RPC.
  515. template <class Response>
  516. class ServerWriteReactor : public internal::ServerReactor {
  517. public:
  518. ServerWriteReactor() : writer_(nullptr) {}
  519. ~ServerWriteReactor() override = default;
  520. /// The following operation initiations are exactly like ServerBidiReactor.
  521. void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) {
  522. ServerCallbackWriter<Response>* writer =
  523. writer_.load(std::memory_order_acquire);
  524. if (writer == nullptr) {
  525. grpc::internal::MutexLock l(&writer_mu_);
  526. writer = writer_.load(std::memory_order_relaxed);
  527. if (writer == nullptr) {
  528. backlog_.send_initial_metadata_wanted = true;
  529. return;
  530. }
  531. }
  532. writer->SendInitialMetadata();
  533. }
  534. void StartWrite(const Response* resp) {
  535. StartWrite(resp, grpc::WriteOptions());
  536. }
  537. void StartWrite(const Response* resp, grpc::WriteOptions options)
  538. ABSL_LOCKS_EXCLUDED(writer_mu_) {
  539. ServerCallbackWriter<Response>* writer =
  540. writer_.load(std::memory_order_acquire);
  541. if (writer == nullptr) {
  542. grpc::internal::MutexLock l(&writer_mu_);
  543. writer = writer_.load(std::memory_order_relaxed);
  544. if (writer == nullptr) {
  545. backlog_.write_wanted = resp;
  546. backlog_.write_options_wanted = options;
  547. return;
  548. }
  549. }
  550. writer->Write(resp, options);
  551. }
  552. void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
  553. grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
  554. ServerCallbackWriter<Response>* writer =
  555. writer_.load(std::memory_order_acquire);
  556. if (writer == nullptr) {
  557. grpc::internal::MutexLock l(&writer_mu_);
  558. writer = writer_.load(std::memory_order_relaxed);
  559. if (writer == nullptr) {
  560. backlog_.write_and_finish_wanted = true;
  561. backlog_.write_wanted = resp;
  562. backlog_.write_options_wanted = options;
  563. backlog_.status_wanted = std::move(s);
  564. return;
  565. }
  566. }
  567. writer->WriteAndFinish(resp, options, std::move(s));
  568. }
  569. void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
  570. StartWrite(resp, options.set_last_message());
  571. }
  572. void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
  573. ServerCallbackWriter<Response>* writer =
  574. writer_.load(std::memory_order_acquire);
  575. if (writer == nullptr) {
  576. grpc::internal::MutexLock l(&writer_mu_);
  577. writer = writer_.load(std::memory_order_relaxed);
  578. if (writer == nullptr) {
  579. backlog_.finish_wanted = true;
  580. backlog_.status_wanted = std::move(s);
  581. return;
  582. }
  583. }
  584. writer->Finish(std::move(s));
  585. }
  586. /// The following notifications are exactly like ServerBidiReactor.
  587. virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
  588. virtual void OnWriteDone(bool /*ok*/) {}
  589. void OnDone() override = 0;
  590. void OnCancel() override {}
  591. private:
  592. friend class ServerCallbackWriter<Response>;
  593. // May be overridden by internal implementation details. This is not a public
  594. // customization point.
  595. virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
  596. ABSL_LOCKS_EXCLUDED(writer_mu_) {
  597. grpc::internal::MutexLock l(&writer_mu_);
  598. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
  599. writer->SendInitialMetadata();
  600. }
  601. if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
  602. writer->WriteAndFinish(backlog_.write_wanted,
  603. std::move(backlog_.write_options_wanted),
  604. std::move(backlog_.status_wanted));
  605. } else {
  606. if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
  607. writer->Write(backlog_.write_wanted,
  608. std::move(backlog_.write_options_wanted));
  609. }
  610. if (GPR_UNLIKELY(backlog_.finish_wanted)) {
  611. writer->Finish(std::move(backlog_.status_wanted));
  612. }
  613. }
  614. // Set writer_ last so that other functions can use it lock-free
  615. writer_.store(writer, std::memory_order_release);
  616. }
  617. grpc::internal::Mutex writer_mu_;
  618. std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
  619. struct PreBindBacklog {
  620. bool send_initial_metadata_wanted = false;
  621. bool write_and_finish_wanted = false;
  622. bool finish_wanted = false;
  623. const Response* write_wanted = nullptr;
  624. grpc::WriteOptions write_options_wanted;
  625. grpc::Status status_wanted;
  626. };
  627. PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
  628. };
  629. class ServerUnaryReactor : public internal::ServerReactor {
  630. public:
  631. ServerUnaryReactor() : call_(nullptr) {}
  632. ~ServerUnaryReactor() override = default;
  633. /// StartSendInitialMetadata is exactly like ServerBidiReactor.
  634. void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) {
  635. ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
  636. if (call == nullptr) {
  637. grpc::internal::MutexLock l(&call_mu_);
  638. call = call_.load(std::memory_order_relaxed);
  639. if (call == nullptr) {
  640. backlog_.send_initial_metadata_wanted = true;
  641. return;
  642. }
  643. }
  644. call->SendInitialMetadata();
  645. }
  646. /// Finish is similar to ServerBidiReactor except for one detail.
  647. /// If the status is non-OK, any message will not be sent. Instead,
  648. /// the client will only receive the status and any trailing metadata.
  649. void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) {
  650. ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
  651. if (call == nullptr) {
  652. grpc::internal::MutexLock l(&call_mu_);
  653. call = call_.load(std::memory_order_relaxed);
  654. if (call == nullptr) {
  655. backlog_.finish_wanted = true;
  656. backlog_.status_wanted = std::move(s);
  657. return;
  658. }
  659. }
  660. call->Finish(std::move(s));
  661. }
  662. /// The following notifications are exactly like ServerBidiReactor.
  663. virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
  664. void OnDone() override = 0;
  665. void OnCancel() override {}
  666. private:
  667. friend class ServerCallbackUnary;
  668. // May be overridden by internal implementation details. This is not a public
  669. // customization point.
  670. virtual void InternalBindCall(ServerCallbackUnary* call)
  671. ABSL_LOCKS_EXCLUDED(call_mu_) {
  672. grpc::internal::MutexLock l(&call_mu_);
  673. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
  674. call->SendInitialMetadata();
  675. }
  676. if (GPR_UNLIKELY(backlog_.finish_wanted)) {
  677. call->Finish(std::move(backlog_.status_wanted));
  678. }
  679. // Set call_ last so that other functions can use it lock-free
  680. call_.store(call, std::memory_order_release);
  681. }
  682. grpc::internal::Mutex call_mu_;
  683. std::atomic<ServerCallbackUnary*> call_{nullptr};
  684. struct PreBindBacklog {
  685. bool send_initial_metadata_wanted = false;
  686. bool finish_wanted = false;
  687. grpc::Status status_wanted;
  688. };
  689. PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
  690. };
  691. namespace internal {
  692. template <class Base>
  693. class FinishOnlyReactor : public Base {
  694. public:
  695. explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); }
  696. void OnDone() override { this->~FinishOnlyReactor(); }
  697. };
  698. using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
  699. template <class Request>
  700. using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
  701. template <class Response>
  702. using UnimplementedWriteReactor =
  703. FinishOnlyReactor<ServerWriteReactor<Response>>;
  704. template <class Request, class Response>
  705. using UnimplementedBidiReactor =
  706. FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
  707. } // namespace internal
  708. // TODO(vjpai): Remove namespace experimental when last known users are migrated
  709. // off.
  710. namespace experimental {
  711. template <class Request, class Response>
  712. using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
  713. } // namespace experimental
  714. } // namespace grpc
  715. #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H