passthru_endpoint.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include "test/core/util/passthru_endpoint.h"
  19. #include <inttypes.h>
  20. #include <string.h>
  21. #include <string>
  22. #include "absl/strings/str_format.h"
  23. #include <grpc/support/alloc.h>
  24. #include <grpc/support/string_util.h>
  25. #include "src/core/lib/gprpp/time.h"
  26. #include "src/core/lib/iomgr/sockaddr.h"
  27. #include "src/core/lib/iomgr/timer.h"
  28. #include "src/core/lib/slice/slice_internal.h"
  29. typedef struct passthru_endpoint passthru_endpoint;
  30. typedef struct {
  31. bool is_armed;
  32. grpc_endpoint* ep;
  33. grpc_slice_buffer* slices;
  34. grpc_closure* cb;
  35. } pending_op;
  36. typedef struct {
  37. grpc_timer timer;
  38. uint64_t allowed_write_bytes;
  39. uint64_t allowed_read_bytes;
  40. std::vector<grpc_passthru_endpoint_channel_action> actions;
  41. std::function<void()> on_complete;
  42. } grpc_passthru_endpoint_channel_effects;
  43. typedef struct {
  44. grpc_endpoint base;
  45. passthru_endpoint* parent;
  46. grpc_slice_buffer read_buffer;
  47. grpc_slice_buffer write_buffer;
  48. grpc_slice_buffer* on_read_out;
  49. grpc_closure* on_read;
  50. pending_op pending_read_op;
  51. pending_op pending_write_op;
  52. uint64_t bytes_read_so_far;
  53. uint64_t bytes_written_so_far;
  54. } half;
  55. struct passthru_endpoint {
  56. gpr_mu mu;
  57. int halves;
  58. grpc_passthru_endpoint_stats* stats;
  59. grpc_passthru_endpoint_channel_effects* channel_effects;
  60. bool simulate_channel_actions;
  61. bool shutdown;
  62. half client;
  63. half server;
  64. };
  65. static void do_pending_read_op_locked(half* m, grpc_error_handle error) {
  66. GPR_ASSERT(m->pending_read_op.is_armed);
  67. GPR_ASSERT(m->bytes_read_so_far <=
  68. m->parent->channel_effects->allowed_read_bytes);
  69. if (m->parent->shutdown) {
  70. grpc_core::ExecCtx::Run(
  71. DEBUG_LOCATION, m->pending_read_op.cb,
  72. GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
  73. // Move any pending data into pending_read_op.slices so that it may be
  74. // free'ed by the executing callback.
  75. grpc_slice_buffer_move_into(&m->read_buffer, m->pending_read_op.slices);
  76. m->pending_read_op.is_armed = false;
  77. return;
  78. }
  79. if (m->bytes_read_so_far == m->parent->channel_effects->allowed_read_bytes) {
  80. // Keep it in pending state.
  81. return;
  82. }
  83. // This delayed processing should only be invoked when read_buffer has
  84. // something in it.
  85. GPR_ASSERT(m->read_buffer.count > 0);
  86. uint64_t readable_length = std::min<uint64_t>(
  87. m->read_buffer.length,
  88. m->parent->channel_effects->allowed_read_bytes - m->bytes_read_so_far);
  89. GPR_ASSERT(readable_length > 0);
  90. grpc_slice_buffer_move_first(&m->read_buffer, readable_length,
  91. m->pending_read_op.slices);
  92. grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_read_op.cb, error);
  93. if (m->parent->simulate_channel_actions) {
  94. m->bytes_read_so_far += readable_length;
  95. }
  96. m->pending_read_op.is_armed = false;
  97. }
  98. static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
  99. grpc_closure* cb, bool /*urgent*/) {
  100. half* m = reinterpret_cast<half*>(ep);
  101. gpr_mu_lock(&m->parent->mu);
  102. if (m->parent->shutdown) {
  103. grpc_core::ExecCtx::Run(
  104. DEBUG_LOCATION, cb,
  105. GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
  106. } else if (m->read_buffer.count > 0) {
  107. GPR_ASSERT(!m->pending_read_op.is_armed);
  108. GPR_ASSERT(!m->on_read);
  109. m->pending_read_op.is_armed = true;
  110. m->pending_read_op.cb = cb;
  111. m->pending_read_op.ep = ep;
  112. m->pending_read_op.slices = slices;
  113. do_pending_read_op_locked(m, GRPC_ERROR_NONE);
  114. } else {
  115. GPR_ASSERT(!m->pending_read_op.is_armed);
  116. m->on_read = cb;
  117. m->on_read_out = slices;
  118. }
  119. gpr_mu_unlock(&m->parent->mu);
  120. }
  121. // Copy src slice and split the copy at n bytes into two separate slices
  122. void grpc_slice_copy_split(grpc_slice src, uint64_t n, grpc_slice& split1,
  123. grpc_slice& split2) {
  124. GPR_ASSERT(n <= GRPC_SLICE_LENGTH(src));
  125. if (n == GRPC_SLICE_LENGTH(src)) {
  126. split1 = grpc_slice_copy(src);
  127. split2 = grpc_empty_slice();
  128. return;
  129. }
  130. split1 = GRPC_SLICE_MALLOC(n);
  131. memcpy(GRPC_SLICE_START_PTR(split1), GRPC_SLICE_START_PTR(src), n);
  132. split2 = GRPC_SLICE_MALLOC(GRPC_SLICE_LENGTH(src) - n);
  133. memcpy(GRPC_SLICE_START_PTR(split2), GRPC_SLICE_START_PTR(src) + n,
  134. GRPC_SLICE_LENGTH(src) - n);
  135. }
  136. static half* other_half(half* h) {
  137. if (h == &h->parent->client) return &h->parent->server;
  138. return &h->parent->client;
  139. }
  140. static void do_pending_write_op_locked(half* m, grpc_error_handle error) {
  141. GPR_ASSERT(m->pending_write_op.is_armed);
  142. GPR_ASSERT(m->bytes_written_so_far <=
  143. m->parent->channel_effects->allowed_write_bytes);
  144. if (m->parent->shutdown) {
  145. grpc_core::ExecCtx::Run(
  146. DEBUG_LOCATION, m->pending_write_op.cb,
  147. GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
  148. m->pending_write_op.is_armed = false;
  149. grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices);
  150. return;
  151. }
  152. if (m->bytes_written_so_far ==
  153. m->parent->channel_effects->allowed_write_bytes) {
  154. // Keep it in pending state.
  155. return;
  156. }
  157. half* other = other_half(m);
  158. uint64_t max_writable =
  159. std::min<uint64_t>(m->pending_write_op.slices->length,
  160. m->parent->channel_effects->allowed_write_bytes -
  161. m->bytes_written_so_far);
  162. uint64_t max_readable = other->parent->channel_effects->allowed_read_bytes -
  163. other->bytes_read_so_far;
  164. uint64_t immediate_bytes_read =
  165. other->on_read != nullptr ? std::min<uint64_t>(max_readable, max_writable)
  166. : 0;
  167. GPR_ASSERT(max_writable > 0);
  168. GPR_ASSERT(max_readable >= 0);
  169. // At the end of this process, we should have written max_writable bytes;
  170. if (m->parent->simulate_channel_actions) {
  171. m->bytes_written_so_far += max_writable;
  172. }
  173. // Estimate if the original write would still be pending at the end of this
  174. // process
  175. bool would_write_be_pending =
  176. max_writable < m->pending_write_op.slices->length;
  177. if (!m->parent->simulate_channel_actions) {
  178. GPR_ASSERT(!would_write_be_pending);
  179. }
  180. grpc_slice_buffer* slices = m->pending_write_op.slices;
  181. grpc_slice_buffer* dest =
  182. other->on_read != nullptr ? other->on_read_out : &other->read_buffer;
  183. while (max_writable > 0) {
  184. grpc_slice slice = grpc_slice_buffer_take_first(slices);
  185. uint64_t slice_length = GPR_SLICE_LENGTH(slice);
  186. GPR_ASSERT(slice_length > 0);
  187. grpc_slice split1, split2;
  188. uint64_t split_length = 0;
  189. if (slice_length <= max_readable) {
  190. split_length = std::min<uint64_t>(slice_length, max_writable);
  191. } else if (max_readable > 0) {
  192. // slice_length > max_readable
  193. split_length = std::min<uint64_t>(max_readable, max_writable);
  194. } else {
  195. // slice_length still > max_readable but max_readable is 0.
  196. // In this case put the bytes into other->read_buffer. During a future
  197. // read if max_readable still remains zero at the time of read, the
  198. // pending read logic will kick in.
  199. dest = &other->read_buffer;
  200. split_length = std::min<uint64_t>(slice_length, max_writable);
  201. }
  202. grpc_slice_copy_split(slice, split_length, split1, split2);
  203. grpc_slice_unref_internal(slice);
  204. // Write a copy of the slice to the destination to be read
  205. grpc_slice_buffer_add_indexed(dest, split1);
  206. // Re-insert split2 into source for next iteration.
  207. if (GPR_SLICE_LENGTH(split2) > 0) {
  208. grpc_slice_buffer_undo_take_first(slices, split2);
  209. } else {
  210. grpc_slice_unref_internal(split2);
  211. }
  212. if (max_readable > 0) {
  213. GPR_ASSERT(max_readable >= static_cast<uint64_t>(split_length));
  214. max_readable -= split_length;
  215. }
  216. GPR_ASSERT(max_writable >= static_cast<uint64_t>(split_length));
  217. max_writable -= split_length;
  218. }
  219. if (immediate_bytes_read > 0) {
  220. GPR_ASSERT(!other->pending_read_op.is_armed);
  221. if (m->parent->simulate_channel_actions) {
  222. other->bytes_read_so_far += immediate_bytes_read;
  223. }
  224. grpc_core::ExecCtx::Run(DEBUG_LOCATION, other->on_read, error);
  225. other->on_read = nullptr;
  226. }
  227. if (!would_write_be_pending) {
  228. // No slices should be left
  229. GPR_ASSERT(m->pending_write_op.slices->count == 0);
  230. grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices);
  231. m->pending_write_op.is_armed = false;
  232. grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_write_op.cb, error);
  233. }
  234. }
  235. static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
  236. grpc_closure* cb, void* /*arg*/) {
  237. half* m = reinterpret_cast<half*>(ep);
  238. gpr_mu_lock(&m->parent->mu);
  239. gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
  240. if (m->parent->shutdown) {
  241. grpc_core::ExecCtx::Run(
  242. DEBUG_LOCATION, cb,
  243. GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"));
  244. } else {
  245. GPR_ASSERT(!m->pending_write_op.is_armed);
  246. // Copy slices into m->pending_write_op.slices
  247. m->pending_write_op.slices = &m->write_buffer;
  248. GPR_ASSERT(m->pending_write_op.slices->count == 0);
  249. for (int i = 0; i < static_cast<int>(slices->count); i++) {
  250. if (GPR_SLICE_LENGTH(slices->slices[i]) > 0) {
  251. grpc_slice_buffer_add_indexed(m->pending_write_op.slices,
  252. grpc_slice_copy(slices->slices[i]));
  253. }
  254. }
  255. if (m->pending_write_op.slices->count > 0) {
  256. m->pending_write_op.is_armed = true;
  257. m->pending_write_op.cb = cb;
  258. m->pending_write_op.ep = ep;
  259. do_pending_write_op_locked(m, GRPC_ERROR_NONE);
  260. } else {
  261. // There is nothing to write. Schedule callback to be run right away.
  262. grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
  263. }
  264. }
  265. gpr_mu_unlock(&m->parent->mu);
  266. }
  267. void flush_pending_ops_locked(half* m, grpc_error_handle error) {
  268. if (m->pending_read_op.is_armed) {
  269. do_pending_read_op_locked(m, error);
  270. }
  271. if (m->pending_write_op.is_armed) {
  272. do_pending_write_op_locked(m, error);
  273. }
  274. }
  275. static void me_add_to_pollset(grpc_endpoint* /*ep*/,
  276. grpc_pollset* /*pollset*/) {}
  277. static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
  278. grpc_pollset_set* /*pollset*/) {}
  279. static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
  280. grpc_pollset_set* /*pollset*/) {}
  281. static void shutdown_locked(half* m, grpc_error_handle why) {
  282. m->parent->shutdown = true;
  283. flush_pending_ops_locked(m, GRPC_ERROR_NONE);
  284. if (m->on_read) {
  285. grpc_core::ExecCtx::Run(
  286. DEBUG_LOCATION, m->on_read,
  287. GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
  288. m->on_read = nullptr;
  289. }
  290. m = other_half(m);
  291. flush_pending_ops_locked(m, GRPC_ERROR_NONE);
  292. if (m->on_read) {
  293. grpc_core::ExecCtx::Run(
  294. DEBUG_LOCATION, m->on_read,
  295. GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
  296. m->on_read = nullptr;
  297. }
  298. }
  299. static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
  300. half* m = reinterpret_cast<half*>(ep);
  301. gpr_mu_lock(&m->parent->mu);
  302. shutdown_locked(m, why);
  303. gpr_mu_unlock(&m->parent->mu);
  304. GRPC_ERROR_UNREF(why);
  305. }
  306. void grpc_passthru_endpoint_destroy(passthru_endpoint* p) {
  307. gpr_mu_destroy(&p->mu);
  308. grpc_passthru_endpoint_stats_destroy(p->stats);
  309. delete p->channel_effects;
  310. grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
  311. grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
  312. grpc_slice_buffer_destroy_internal(&p->client.write_buffer);
  313. grpc_slice_buffer_destroy_internal(&p->server.write_buffer);
  314. gpr_free(p);
  315. }
  316. static void me_destroy(grpc_endpoint* ep) {
  317. passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
  318. gpr_mu_lock(&p->mu);
  319. if (0 == --p->halves && p->channel_effects->actions.empty()) {
  320. // no pending channel actions exist
  321. gpr_mu_unlock(&p->mu);
  322. grpc_passthru_endpoint_destroy(p);
  323. } else {
  324. if (p->halves == 0 && p->simulate_channel_actions) {
  325. grpc_timer_cancel(&p->channel_effects->timer);
  326. }
  327. gpr_mu_unlock(&p->mu);
  328. }
  329. }
  330. static absl::string_view me_get_peer(grpc_endpoint* ep) {
  331. passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
  332. return (reinterpret_cast<half*>(ep)) == &p->client
  333. ? "fake:mock_client_endpoint"
  334. : "fake:mock_server_endpoint";
  335. }
  336. static absl::string_view me_get_local_address(grpc_endpoint* ep) {
  337. passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
  338. return (reinterpret_cast<half*>(ep)) == &p->client
  339. ? "fake:mock_client_endpoint"
  340. : "fake:mock_server_endpoint";
  341. }
  342. static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; }
  343. static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
  344. static const grpc_endpoint_vtable vtable = {
  345. me_read,
  346. me_write,
  347. me_add_to_pollset,
  348. me_add_to_pollset_set,
  349. me_delete_from_pollset_set,
  350. me_shutdown,
  351. me_destroy,
  352. me_get_peer,
  353. me_get_local_address,
  354. me_get_fd,
  355. me_can_track_err,
  356. };
  357. static void half_init(half* m, passthru_endpoint* parent,
  358. const char* half_name) {
  359. m->base.vtable = &vtable;
  360. m->parent = parent;
  361. grpc_slice_buffer_init(&m->read_buffer);
  362. grpc_slice_buffer_init(&m->write_buffer);
  363. m->pending_write_op.slices = nullptr;
  364. m->on_read = nullptr;
  365. m->bytes_read_so_far = 0;
  366. m->bytes_written_so_far = 0;
  367. m->pending_write_op.is_armed = false;
  368. m->pending_read_op.is_armed = false;
  369. std::string name =
  370. absl::StrFormat("passthru_endpoint_%s_%p", half_name, parent);
  371. }
  372. void grpc_passthru_endpoint_create(grpc_endpoint** client,
  373. grpc_endpoint** server,
  374. grpc_passthru_endpoint_stats* stats,
  375. bool simulate_channel_actions) {
  376. passthru_endpoint* m =
  377. static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
  378. m->halves = 2;
  379. m->shutdown = false;
  380. if (stats == nullptr) {
  381. m->stats = grpc_passthru_endpoint_stats_create();
  382. } else {
  383. gpr_ref(&stats->refs);
  384. m->stats = stats;
  385. }
  386. m->channel_effects = new grpc_passthru_endpoint_channel_effects();
  387. m->simulate_channel_actions = simulate_channel_actions;
  388. if (!simulate_channel_actions) {
  389. m->channel_effects->allowed_read_bytes = UINT64_MAX;
  390. m->channel_effects->allowed_write_bytes = UINT64_MAX;
  391. }
  392. half_init(&m->client, m, "client");
  393. half_init(&m->server, m, "server");
  394. gpr_mu_init(&m->mu);
  395. *client = &m->client.base;
  396. *server = &m->server.base;
  397. }
  398. grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
  399. grpc_passthru_endpoint_stats* stats =
  400. static_cast<grpc_passthru_endpoint_stats*>(
  401. gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
  402. memset(stats, 0, sizeof(*stats));
  403. gpr_ref_init(&stats->refs, 1);
  404. return stats;
  405. }
  406. void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
  407. if (gpr_unref(&stats->refs)) {
  408. gpr_free(stats);
  409. }
  410. }
  411. static void sched_next_channel_action_locked(half* m);
  412. static void do_next_sched_channel_action(void* arg, grpc_error_handle error) {
  413. half* m = reinterpret_cast<half*>(arg);
  414. gpr_mu_lock(&m->parent->mu);
  415. GPR_ASSERT(!m->parent->channel_effects->actions.empty());
  416. if (m->parent->halves == 0) {
  417. gpr_mu_unlock(&m->parent->mu);
  418. grpc_passthru_endpoint_destroy(m->parent);
  419. return;
  420. }
  421. auto curr_action = m->parent->channel_effects->actions[0];
  422. m->parent->channel_effects->actions.erase(
  423. m->parent->channel_effects->actions.begin());
  424. m->parent->channel_effects->allowed_read_bytes +=
  425. curr_action.add_n_readable_bytes;
  426. m->parent->channel_effects->allowed_write_bytes +=
  427. curr_action.add_n_writable_bytes;
  428. flush_pending_ops_locked(m, error);
  429. flush_pending_ops_locked(other_half(m), error);
  430. sched_next_channel_action_locked(m);
  431. gpr_mu_unlock(&m->parent->mu);
  432. }
  433. static void sched_next_channel_action_locked(half* m) {
  434. if (m->parent->channel_effects->actions.empty()) {
  435. grpc_error_handle err =
  436. GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel actions complete");
  437. shutdown_locked(m, err);
  438. GRPC_ERROR_UNREF(err);
  439. return;
  440. }
  441. grpc_timer_init(&m->parent->channel_effects->timer,
  442. grpc_core::Duration::Milliseconds(
  443. m->parent->channel_effects->actions[0].wait_ms) +
  444. grpc_core::ExecCtx::Get()->Now(),
  445. GRPC_CLOSURE_CREATE(do_next_sched_channel_action, m,
  446. grpc_schedule_on_exec_ctx));
  447. }
  448. void start_scheduling_grpc_passthru_endpoint_channel_effects(
  449. grpc_endpoint* ep,
  450. const std::vector<grpc_passthru_endpoint_channel_action>& actions) {
  451. half* m = reinterpret_cast<half*>(ep);
  452. gpr_mu_lock(&m->parent->mu);
  453. if (!m->parent->simulate_channel_actions || m->parent->shutdown) {
  454. gpr_mu_unlock(&m->parent->mu);
  455. return;
  456. }
  457. m->parent->channel_effects->actions = actions;
  458. sched_next_channel_action_locked(m);
  459. gpr_mu_unlock(&m->parent->mu);
  460. }