event_engine.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // Copyright 2021 The gRPC Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. #ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H
  15. #define GRPC_EVENT_ENGINE_EVENT_ENGINE_H
  16. #include <grpc/support/port_platform.h>
  17. #include <functional>
  18. #include <vector>
  19. #include "absl/status/status.h"
  20. #include "absl/status/statusor.h"
  21. #include "absl/time/time.h"
  22. #include <grpc/event_engine/endpoint_config.h>
  23. #include <grpc/event_engine/memory_allocator.h>
  24. #include <grpc/event_engine/port.h>
  25. // TODO(hork): Define the Endpoint::Write metrics collection system
  26. namespace grpc_event_engine {
  27. namespace experimental {
  28. ////////////////////////////////////////////////////////////////////////////////
  29. /// The EventEngine encapsulates all platform-specific behaviors related to low
  30. /// level network I/O, timers, asynchronous execution, and DNS resolution.
  31. ///
  32. /// This interface allows developers to provide their own event management and
  33. /// network stacks. Motivating uses cases for supporting custom EventEngines
  34. /// include the ability to hook into external event loops, and using different
  35. /// EventEngine instances for each channel to better insulate network I/O and
  36. /// callback processing from other channels.
  37. ///
  38. /// A default cross-platform EventEngine instance is provided by gRPC.
  39. ///
  40. /// LIFESPAN AND OWNERSHIP
  41. ///
  42. /// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure
  43. /// that the engines remain available until they are no longer needed. Depending
  44. /// on the use case, engines may live until gRPC is shut down.
  45. ///
  46. /// EXAMPLE USAGE (Not yet implemented)
  47. ///
  48. /// Custom EventEngines can be specified per channel, and allow configuration
  49. /// for both clients and servers. To set a custom EventEngine for a client
  50. /// channel, you can do something like the following:
  51. ///
  52. /// ChannelArguments args;
  53. /// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
  54. /// args.SetEventEngine(engine);
  55. /// MyAppClient client(grpc::CreateCustomChannel(
  56. /// "localhost:50051", grpc::InsecureChannelCredentials(), args));
  57. ///
  58. /// A gRPC server can use a custom EventEngine by calling the
  59. /// ServerBuilder::SetEventEngine method:
  60. ///
  61. /// ServerBuilder builder;
  62. /// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
  63. /// builder.SetEventEngine(engine);
  64. /// std::unique_ptr<Server> server(builder.BuildAndStart());
  65. /// server->Wait();
  66. ///
  67. ////////////////////////////////////////////////////////////////////////////////
  68. class EventEngine {
  69. public:
  70. /// A custom closure type for EventEngine task execution.
  71. ///
  72. /// Throughout the EventEngine API, \a Closure ownership is retained by the
  73. /// caller - the EventEngine will never delete a Closure, and upon
  74. /// cancellation, the EventEngine will simply forget the Closure exists. The
  75. /// caller is responsible for all necessary cleanup.
  76. class Closure {
  77. public:
  78. Closure() = default;
  79. // Closure's are an interface, and thus non-copyable.
  80. Closure(const Closure&) = delete;
  81. Closure& operator=(const Closure&) = delete;
  82. // Polymorphic type => virtual destructor
  83. virtual ~Closure() = default;
  84. // Run the contained code.
  85. virtual void Run() = 0;
  86. };
  87. /// Represents a scheduled task.
  88. ///
  89. /// \a TaskHandles are returned by \a Run* methods, and can be given to the
  90. /// \a Cancel method.
  91. struct TaskHandle {
  92. intptr_t keys[2];
  93. };
  94. /// A handle to a cancellable connection attempt.
  95. ///
  96. /// Returned by \a Connect, and can be passed to \a CancelConnect.
  97. struct ConnectionHandle {
  98. intptr_t keys[2];
  99. };
  100. /// Thin wrapper around a platform-specific sockaddr type. A sockaddr struct
  101. /// exists on all platforms that gRPC supports.
  102. ///
  103. /// Platforms are expected to provide definitions for:
  104. /// * sockaddr
  105. /// * sockaddr_in
  106. /// * sockaddr_in6
  107. class ResolvedAddress {
  108. public:
  109. static constexpr socklen_t MAX_SIZE_BYTES = 128;
  110. ResolvedAddress(const sockaddr* address, socklen_t size);
  111. ResolvedAddress() = default;
  112. ResolvedAddress(const ResolvedAddress&) = default;
  113. const struct sockaddr* address() const;
  114. socklen_t size() const;
  115. private:
  116. char address_[MAX_SIZE_BYTES];
  117. socklen_t size_ = 0;
  118. };
  119. /// One end of a connection between a gRPC client and server. Endpoints are
  120. /// created when connections are established, and Endpoint operations are
  121. /// gRPC's primary means of communication.
  122. ///
  123. /// Endpoints must use the provided MemoryAllocator for all data buffer memory
  124. /// allocations. gRPC allows applications to set memory constraints per
  125. /// Channel or Server, and the implementation depends on all dynamic memory
  126. /// allocation being handled by the quota system.
  127. class Endpoint {
  128. public:
  129. /// Shuts down all connections and invokes all pending read or write
  130. /// callbacks with an error status.
  131. virtual ~Endpoint() = default;
  132. /// Reads data from the Endpoint.
  133. ///
  134. /// When data is available on the connection, that data is moved into the
  135. /// \a buffer, and the \a on_read callback is called. The caller must ensure
  136. /// that the callback has access to the buffer when executed later.
  137. /// Ownership of the buffer is not transferred. Valid slices *may* be placed
  138. /// into the buffer even if the callback is invoked with a non-OK Status.
  139. ///
  140. /// There can be at most one outstanding read per Endpoint at any given
  141. /// time. An outstanding read is one in which the \a on_read callback has
  142. /// not yet been executed for some previous call to \a Read. If an attempt
  143. /// is made to call \a Read while a previous read is still outstanding, the
  144. /// \a EventEngine must abort.
  145. ///
  146. /// For failed read operations, implementations should pass the appropriate
  147. /// statuses to \a on_read. For example, callbacks might expect to receive
  148. /// CANCELLED on endpoint shutdown.
  149. virtual void Read(std::function<void(absl::Status)> on_read,
  150. SliceBuffer* buffer) = 0;
  151. /// Writes data out on the connection.
  152. ///
  153. /// \a on_writable is called when the connection is ready for more data. The
  154. /// Slices within the \a data buffer may be mutated at will by the Endpoint
  155. /// until \a on_writable is called. The \a data SliceBuffer will remain
  156. /// valid after calling \a Write, but its state is otherwise undefined. All
  157. /// bytes in \a data must have been written before calling \a on_writable
  158. /// unless an error has occurred.
  159. ///
  160. /// There can be at most one outstanding write per Endpoint at any given
  161. /// time. An outstanding write is one in which the \a on_writable callback
  162. /// has not yet been executed for some previous call to \a Write. If an
  163. /// attempt is made to call \a Write while a previous write is still
  164. /// outstanding, the \a EventEngine must abort.
  165. ///
  166. /// For failed write operations, implementations should pass the appropriate
  167. /// statuses to \a on_writable. For example, callbacks might expect to
  168. /// receive CANCELLED on endpoint shutdown.
  169. virtual void Write(std::function<void(absl::Status)> on_writable,
  170. SliceBuffer* data) = 0;
  171. /// Returns an address in the format described in DNSResolver. The returned
  172. /// values are expected to remain valid for the life of the Endpoint.
  173. virtual const ResolvedAddress& GetPeerAddress() const = 0;
  174. virtual const ResolvedAddress& GetLocalAddress() const = 0;
  175. };
  176. /// Called when a new connection is established.
  177. ///
  178. /// If the connection attempt was not successful, implementations should pass
  179. /// the appropriate statuses to this callback. For example, callbacks might
  180. /// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or
  181. /// CANCELLED statuses on EventEngine shutdown.
  182. using OnConnectCallback =
  183. std::function<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>;
  184. /// Listens for incoming connection requests from gRPC clients and initiates
  185. /// request processing once connections are established.
  186. class Listener {
  187. public:
  188. /// Called when the listener has accepted a new client connection.
  189. using AcceptCallback = std::function<void(
  190. std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>;
  191. virtual ~Listener() = default;
  192. /// Bind an address/port to this Listener.
  193. ///
  194. /// It is expected that multiple addresses/ports can be bound to this
  195. /// Listener before Listener::Start has been called. Returns either the
  196. /// bound port or an appropriate error status.
  197. virtual absl::StatusOr<int> Bind(const ResolvedAddress& addr) = 0;
  198. virtual absl::Status Start() = 0;
  199. };
  200. /// Factory method to create a network listener / server.
  201. ///
  202. /// Once a \a Listener is created and started, the \a on_accept callback will
  203. /// be called once asynchronously for each established connection. This method
  204. /// may return a non-OK status immediately if an error was encountered in any
  205. /// synchronous steps required to create the Listener. In this case,
  206. /// \a on_shutdown will never be called.
  207. ///
  208. /// If this method returns a Listener, then \a on_shutdown will be invoked
  209. /// exactly once, when the Listener is shut down. The status passed to it will
  210. /// indicate if there was a problem during shutdown.
  211. ///
  212. /// The provided \a MemoryAllocatorFactory is used to create \a
  213. /// MemoryAllocators for Endpoint construction.
  214. virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
  215. Listener::AcceptCallback on_accept,
  216. std::function<void(absl::Status)> on_shutdown,
  217. const EndpointConfig& config,
  218. std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
  219. /// Creates a client network connection to a remote network listener.
  220. ///
  221. /// Even in the event of an error, it is expected that the \a on_connect
  222. /// callback will be asynchronously executed exactly once by the EventEngine.
  223. /// A connection attempt can be cancelled using the \a CancelConnect method.
  224. ///
  225. /// Implementation Note: it is important that the \a memory_allocator be used
  226. /// for all read/write buffer allocations in the EventEngine implementation.
  227. /// This allows gRPC's \a ResourceQuota system to monitor and control memory
  228. /// usage with graceful degradation mechanisms. Please see the \a
  229. /// MemoryAllocator API for more information.
  230. virtual ConnectionHandle Connect(OnConnectCallback on_connect,
  231. const ResolvedAddress& addr,
  232. const EndpointConfig& args,
  233. MemoryAllocator memory_allocator,
  234. absl::Time deadline) = 0;
  235. /// Request cancellation of a connection attempt.
  236. ///
  237. /// If the associated connection has already been completed, it will not be
  238. /// cancelled, and this method will return false.
  239. ///
  240. /// If the associated connection has not been completed, it will be cancelled,
  241. /// and this method will return true. The \a OnConnectCallback will not be
  242. /// called.
  243. virtual bool CancelConnect(ConnectionHandle handle) = 0;
  244. /// Provides asynchronous resolution.
  245. class DNSResolver {
  246. public:
  247. /// Task handle for DNS Resolution requests.
  248. struct LookupTaskHandle {
  249. intptr_t key[2];
  250. };
  251. /// DNS SRV record type.
  252. struct SRVRecord {
  253. std::string host;
  254. int port = 0;
  255. int priority = 0;
  256. int weight = 0;
  257. };
  258. /// Called with the collection of sockaddrs that were resolved from a given
  259. /// target address.
  260. using LookupHostnameCallback =
  261. std::function<void(absl::StatusOr<std::vector<ResolvedAddress>>)>;
  262. /// Called with a collection of SRV records.
  263. using LookupSRVCallback =
  264. std::function<void(absl::StatusOr<std::vector<SRVRecord>>)>;
  265. /// Called with the result of a TXT record lookup
  266. using LookupTXTCallback = std::function<void(absl::StatusOr<std::string>)>;
  267. virtual ~DNSResolver() = default;
  268. /// Asynchronously resolve an address.
  269. ///
  270. /// \a default_port may be a non-numeric named service port, and will only
  271. /// be used if \a address does not already contain a port component.
  272. ///
  273. /// When the lookup is complete, the \a on_resolve callback will be invoked
  274. /// with a status indicating the success or failure of the lookup.
  275. /// Implementations should pass the appropriate statuses to the callback.
  276. /// For example, callbacks might expect to receive DEADLINE_EXCEEDED or
  277. /// NOT_FOUND.
  278. ///
  279. /// If cancelled, \a on_resolve will not be executed.
  280. virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
  281. absl::string_view address,
  282. absl::string_view default_port,
  283. absl::Time deadline) = 0;
  284. /// Asynchronously perform an SRV record lookup.
  285. ///
  286. /// \a on_resolve has the same meaning and expectations as \a
  287. /// LookupHostname's \a on_resolve callback.
  288. virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
  289. absl::string_view name,
  290. absl::Time deadline) = 0;
  291. /// Asynchronously perform a TXT record lookup.
  292. ///
  293. /// \a on_resolve has the same meaning and expectations as \a
  294. /// LookupHostname's \a on_resolve callback.
  295. virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
  296. absl::string_view name,
  297. absl::Time deadline) = 0;
  298. /// Cancel an asynchronous lookup operation.
  299. ///
  300. /// This shares the same semantics with \a EventEngine::Cancel: successfully
  301. /// cancelled lookups will not have their callbacks executed, and this
  302. /// method returns true.
  303. virtual bool CancelLookup(LookupTaskHandle handle) = 0;
  304. };
  305. /// At time of destruction, the EventEngine must have no active
  306. /// responsibilities. EventEngine users (applications) are responsible for
  307. /// cancelling all tasks and DNS lookups, shutting down listeners and
  308. /// endpoints, prior to EventEngine destruction. If there are any outstanding
  309. /// tasks, any running listeners, etc. at time of EventEngine destruction,
  310. /// that is an invalid use of the API, and it will result in undefined
  311. /// behavior.
  312. virtual ~EventEngine() = default;
  313. // TODO(nnoble): consider whether we can remove this method before we
  314. // de-experimentalize this API.
  315. virtual bool IsWorkerThread() = 0;
  316. /// Creates and returns an instance of a DNSResolver.
  317. virtual std::unique_ptr<DNSResolver> GetDNSResolver() = 0;
  318. /// Asynchronously executes a task as soon as possible.
  319. ///
  320. /// \a Closures scheduled with \a Run cannot be cancelled. The \a closure will
  321. /// not be deleted after it has been run, ownership remains with the caller.
  322. virtual void Run(Closure* closure) = 0;
  323. /// Asynchronously executes a task as soon as possible.
  324. ///
  325. /// \a Closures scheduled with \a Run cannot be cancelled. Unlike the
  326. /// overloaded \a Closure alternative, the std::function version's \a closure
  327. /// will be deleted by the EventEngine after the closure has been run.
  328. ///
  329. /// This version of \a Run may be less performant than the \a Closure version
  330. /// in some scenarios. This overload is useful in situations where performance
  331. /// is not a critical concern.
  332. virtual void Run(std::function<void()> closure) = 0;
  333. /// Synonymous with scheduling an alarm to run at time \a when.
  334. ///
  335. /// The \a closure will execute when time \a when arrives unless it has been
  336. /// cancelled via the \a Cancel method. If cancelled, the closure will not be
  337. /// run, nor will it be deleted. Ownership remains with the caller.
  338. virtual TaskHandle RunAt(absl::Time when, Closure* closure) = 0;
  339. /// Synonymous with scheduling an alarm to run at time \a when.
  340. ///
  341. /// The \a closure will execute when time \a when arrives unless it has been
  342. /// cancelled via the \a Cancel method. If cancelled, the closure will not be
  343. /// run. Unilke the overloaded \a Closure alternative, the std::function
  344. /// version's \a closure will be deleted by the EventEngine after the closure
  345. /// has been run, or upon cancellation.
  346. ///
  347. /// This version of \a RunAt may be less performant than the \a Closure
  348. /// version in some scenarios. This overload is useful in situations where
  349. /// performance is not a critical concern.
  350. virtual TaskHandle RunAt(absl::Time when, std::function<void()> closure) = 0;
  351. /// Request cancellation of a task.
  352. ///
  353. /// If the associated closure has already been scheduled to run, it will not
  354. /// be cancelled, and this function will return false.
  355. ///
  356. /// If the associated callback has not been scheduled to run, it will be
  357. /// cancelled, and the associated std::function or \a Closure* will not be
  358. /// executed. In this case, Cancel will return true.
  359. virtual bool Cancel(TaskHandle handle) = 0;
  360. };
  361. /// Replace gRPC's default EventEngine factory.
  362. ///
  363. /// Applications may call \a SetDefaultEventEngineFactory at any time to replace
  364. /// the default factory used within gRPC. EventEngines will be created when
  365. /// necessary, when they are otherwise not provided by the application.
  366. ///
  367. /// To be certain that none of the gRPC-provided built-in EventEngines are
  368. /// created, applications must set a custom EventEngine factory method *before*
  369. /// grpc is initialized.
  370. void SetDefaultEventEngineFactory(
  371. const std::function<std::unique_ptr<EventEngine>()>* factory);
  372. /// Create an EventEngine using the default factory.
  373. std::unique_ptr<EventEngine> CreateEventEngine();
  374. } // namespace experimental
  375. } // namespace grpc_event_engine
  376. #endif // GRPC_EVENT_ENGINE_EVENT_ENGINE_H