proxy.cc 15 KB


  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include "test/core/end2end/fixtures/proxy.h"
  19. #include <string.h>
  20. #include <grpc/support/alloc.h>
  21. #include <grpc/support/log.h>
  22. #include <grpc/support/sync.h>
  23. #include "src/core/lib/gpr/useful.h"
  24. #include "src/core/lib/gprpp/host_port.h"
  25. #include "src/core/lib/gprpp/memory.h"
  26. #include "src/core/lib/gprpp/thd.h"
  27. #include "src/core/lib/surface/call.h"
  28. #include "test/core/util/port.h"
  29. struct grpc_end2end_proxy {
  30. grpc_end2end_proxy()
  31. : cq(nullptr),
  32. server(nullptr),
  33. client(nullptr),
  34. shutdown(false),
  35. new_call(nullptr) {
  36. memset(&new_call_details, 0, sizeof(new_call_details));
  37. memset(&new_call_metadata, 0, sizeof(new_call_metadata));
  38. }
  39. grpc_core::Thread thd;
  40. std::string proxy_port;
  41. std::string server_port;
  42. grpc_completion_queue* cq;
  43. grpc_server* server;
  44. grpc_channel* client;
  45. int shutdown;
  46. /* requested call */
  47. grpc_call* new_call;
  48. grpc_call_details new_call_details;
  49. grpc_metadata_array new_call_metadata;
  50. };
  51. typedef struct {
  52. void (*func)(void* arg, int success);
  53. void* arg;
  54. } closure;
  55. typedef struct {
  56. gpr_refcount refs;
  57. grpc_end2end_proxy* proxy;
  58. grpc_call* c2p;
  59. grpc_call* p2s;
  60. grpc_metadata_array c2p_initial_metadata;
  61. grpc_metadata_array p2s_initial_metadata;
  62. grpc_byte_buffer* c2p_msg;
  63. grpc_byte_buffer* p2s_msg;
  64. grpc_metadata_array p2s_trailing_metadata;
  65. grpc_status_code p2s_status;
  66. grpc_slice p2s_status_details;
  67. int c2p_server_cancelled;
  68. } proxy_call;
  69. static void thread_main(void* arg);
  70. static void request_call(grpc_end2end_proxy* proxy);
  71. grpc_end2end_proxy* grpc_end2end_proxy_create(
  72. const grpc_end2end_proxy_def* def, const grpc_channel_args* client_args,
  73. const grpc_channel_args* server_args) {
  74. int proxy_port = grpc_pick_unused_port_or_die();
  75. int server_port = grpc_pick_unused_port_or_die();
  76. grpc_end2end_proxy* proxy = new grpc_end2end_proxy();
  77. proxy->proxy_port = grpc_core::JoinHostPort("localhost", proxy_port);
  78. proxy->server_port = grpc_core::JoinHostPort("localhost", server_port);
  79. gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port.c_str(),
  80. proxy->server_port.c_str());
  81. proxy->cq = grpc_completion_queue_create_for_next(nullptr);
  82. proxy->server = def->create_server(proxy->proxy_port.c_str(), server_args);
  83. const char* arg_to_remove = GRPC_ARG_ENABLE_RETRIES;
  84. grpc_arg arg_to_add = grpc_channel_arg_integer_create(
  85. const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0);
  86. const grpc_channel_args* proxy_client_args =
  87. grpc_channel_args_copy_and_add_and_remove(client_args, &arg_to_remove, 1,
  88. &arg_to_add, 1);
  89. proxy->client =
  90. def->create_client(proxy->server_port.c_str(), proxy_client_args);
  91. grpc_channel_args_destroy(proxy_client_args);
  92. grpc_server_register_completion_queue(proxy->server, proxy->cq, nullptr);
  93. grpc_server_start(proxy->server);
  94. grpc_call_details_init(&proxy->new_call_details);
  95. proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
  96. proxy->thd.Start();
  97. request_call(proxy);
  98. return proxy;
  99. }
  100. static closure* new_closure(void (*func)(void* arg, int success), void* arg) {
  101. closure* cl = static_cast<closure*>(gpr_malloc(sizeof(*cl)));
  102. cl->func = func;
  103. cl->arg = arg;
  104. return cl;
  105. }
  106. static void shutdown_complete(void* arg, int /*success*/) {
  107. grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
  108. proxy->shutdown = 1;
  109. grpc_completion_queue_shutdown(proxy->cq);
  110. }
  111. void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
  112. grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
  113. new_closure(shutdown_complete, proxy));
  114. proxy->thd.Join();
  115. grpc_server_destroy(proxy->server);
  116. grpc_channel_destroy(proxy->client);
  117. grpc_completion_queue_destroy(proxy->cq);
  118. grpc_call_details_destroy(&proxy->new_call_details);
  119. delete proxy;
  120. }
  121. static void unrefpc(proxy_call* pc, const char* /*reason*/) {
  122. if (gpr_unref(&pc->refs)) {
  123. grpc_call_unref(pc->c2p);
  124. grpc_call_unref(pc->p2s);
  125. grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
  126. grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
  127. grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
  128. grpc_slice_unref(pc->p2s_status_details);
  129. gpr_free(pc);
  130. }
  131. }
  132. static void refpc(proxy_call* pc, const char* /*reason*/) {
  133. gpr_ref(&pc->refs);
  134. }
  135. static void on_c2p_sent_initial_metadata(void* arg, int /*success*/) {
  136. proxy_call* pc = static_cast<proxy_call*>(arg);
  137. unrefpc(pc, "on_c2p_sent_initial_metadata");
  138. }
  139. static void on_p2s_recv_initial_metadata(void* arg, int /*success*/) {
  140. proxy_call* pc = static_cast<proxy_call*>(arg);
  141. grpc_op op;
  142. grpc_call_error err;
  143. memset(&op, 0, sizeof(op));
  144. if (!pc->proxy->shutdown && !grpc_call_is_trailers_only(pc->p2s)) {
  145. op.op = GRPC_OP_SEND_INITIAL_METADATA;
  146. op.flags = 0;
  147. op.reserved = nullptr;
  148. op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
  149. op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
  150. refpc(pc, "on_c2p_sent_initial_metadata");
  151. err = grpc_call_start_batch(pc->c2p, &op, 1,
  152. new_closure(on_c2p_sent_initial_metadata, pc),
  153. nullptr);
  154. GPR_ASSERT(err == GRPC_CALL_OK);
  155. }
  156. unrefpc(pc, "on_p2s_recv_initial_metadata");
  157. }
  158. static void on_p2s_sent_initial_metadata(void* arg, int /*success*/) {
  159. proxy_call* pc = static_cast<proxy_call*>(arg);
  160. unrefpc(pc, "on_p2s_sent_initial_metadata");
  161. }
  162. static void on_c2p_recv_msg(void* arg, int success);
  163. static void on_p2s_sent_message(void* arg, int success) {
  164. proxy_call* pc = static_cast<proxy_call*>(arg);
  165. grpc_op op;
  166. grpc_call_error err;
  167. grpc_byte_buffer_destroy(pc->c2p_msg);
  168. if (!pc->proxy->shutdown && success) {
  169. op.op = GRPC_OP_RECV_MESSAGE;
  170. op.flags = 0;
  171. op.reserved = nullptr;
  172. op.data.recv_message.recv_message = &pc->c2p_msg;
  173. refpc(pc, "on_c2p_recv_msg");
  174. err = grpc_call_start_batch(pc->c2p, &op, 1,
  175. new_closure(on_c2p_recv_msg, pc), nullptr);
  176. GPR_ASSERT(err == GRPC_CALL_OK);
  177. }
  178. unrefpc(pc, "on_p2s_sent_message");
  179. }
  180. static void on_p2s_sent_close(void* arg, int /*success*/) {
  181. proxy_call* pc = static_cast<proxy_call*>(arg);
  182. unrefpc(pc, "on_p2s_sent_close");
  183. }
  184. static void on_c2p_recv_msg(void* arg, int success) {
  185. proxy_call* pc = static_cast<proxy_call*>(arg);
  186. grpc_op op;
  187. grpc_call_error err;
  188. if (!pc->proxy->shutdown && success) {
  189. if (pc->c2p_msg != nullptr) {
  190. op.op = GRPC_OP_SEND_MESSAGE;
  191. op.flags = 0;
  192. op.reserved = nullptr;
  193. op.data.send_message.send_message = pc->c2p_msg;
  194. refpc(pc, "on_p2s_sent_message");
  195. err = grpc_call_start_batch(
  196. pc->p2s, &op, 1, new_closure(on_p2s_sent_message, pc), nullptr);
  197. GPR_ASSERT(err == GRPC_CALL_OK);
  198. } else {
  199. op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
  200. op.flags = 0;
  201. op.reserved = nullptr;
  202. refpc(pc, "on_p2s_sent_close");
  203. err = grpc_call_start_batch(pc->p2s, &op, 1,
  204. new_closure(on_p2s_sent_close, pc), nullptr);
  205. GPR_ASSERT(err == GRPC_CALL_OK);
  206. }
  207. } else {
  208. if (pc->c2p_msg != nullptr) {
  209. grpc_byte_buffer_destroy(pc->c2p_msg);
  210. }
  211. }
  212. unrefpc(pc, "on_c2p_recv_msg");
  213. }
  214. static void on_p2s_recv_msg(void* arg, int success);
  215. static void on_c2p_sent_message(void* arg, int success) {
  216. proxy_call* pc = static_cast<proxy_call*>(arg);
  217. grpc_op op;
  218. grpc_call_error err;
  219. grpc_byte_buffer_destroy(pc->p2s_msg);
  220. if (!pc->proxy->shutdown && success) {
  221. op.op = GRPC_OP_RECV_MESSAGE;
  222. op.flags = 0;
  223. op.reserved = nullptr;
  224. op.data.recv_message.recv_message = &pc->p2s_msg;
  225. refpc(pc, "on_p2s_recv_msg");
  226. err = grpc_call_start_batch(pc->p2s, &op, 1,
  227. new_closure(on_p2s_recv_msg, pc), nullptr);
  228. GPR_ASSERT(err == GRPC_CALL_OK);
  229. }
  230. unrefpc(pc, "on_c2p_sent_message");
  231. }
  232. static void on_p2s_recv_msg(void* arg, int success) {
  233. proxy_call* pc = static_cast<proxy_call*>(arg);
  234. grpc_op op;
  235. grpc_call_error err;
  236. if (!pc->proxy->shutdown && success && pc->p2s_msg) {
  237. op.op = GRPC_OP_SEND_MESSAGE;
  238. op.flags = 0;
  239. op.reserved = nullptr;
  240. op.data.send_message.send_message = pc->p2s_msg;
  241. refpc(pc, "on_c2p_sent_message");
  242. err = grpc_call_start_batch(pc->c2p, &op, 1,
  243. new_closure(on_c2p_sent_message, pc), nullptr);
  244. GPR_ASSERT(err == GRPC_CALL_OK);
  245. } else {
  246. grpc_byte_buffer_destroy(pc->p2s_msg);
  247. }
  248. unrefpc(pc, "on_p2s_recv_msg");
  249. }
  250. static void on_c2p_sent_status(void* arg, int /*success*/) {
  251. proxy_call* pc = static_cast<proxy_call*>(arg);
  252. unrefpc(pc, "on_c2p_sent_status");
  253. }
  254. static void on_p2s_status(void* arg, int success) {
  255. proxy_call* pc = static_cast<proxy_call*>(arg);
  256. grpc_op op[2]; // Possibly send empty initial metadata also if trailers-only
  257. grpc_call_error err;
  258. memset(op, 0, sizeof(op));
  259. if (!pc->proxy->shutdown) {
  260. GPR_ASSERT(success);
  261. int op_count = 0;
  262. if (grpc_call_is_trailers_only(pc->p2s)) {
  263. op[op_count].op = GRPC_OP_SEND_INITIAL_METADATA;
  264. op_count++;
  265. }
  266. op[op_count].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
  267. op[op_count].flags = 0;
  268. op[op_count].reserved = nullptr;
  269. op[op_count].data.send_status_from_server.trailing_metadata_count =
  270. pc->p2s_trailing_metadata.count;
  271. op[op_count].data.send_status_from_server.trailing_metadata =
  272. pc->p2s_trailing_metadata.metadata;
  273. op[op_count].data.send_status_from_server.status = pc->p2s_status;
  274. op[op_count].data.send_status_from_server.status_details =
  275. &pc->p2s_status_details;
  276. op_count++;
  277. refpc(pc, "on_c2p_sent_status");
  278. err = grpc_call_start_batch(pc->c2p, op, op_count,
  279. new_closure(on_c2p_sent_status, pc), nullptr);
  280. GPR_ASSERT(err == GRPC_CALL_OK);
  281. }
  282. unrefpc(pc, "on_p2s_status");
  283. }
  284. static void on_c2p_closed(void* arg, int /*success*/) {
  285. proxy_call* pc = static_cast<proxy_call*>(arg);
  286. unrefpc(pc, "on_c2p_closed");
  287. }
  288. static void on_new_call(void* arg, int success) {
  289. grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
  290. grpc_call_error err;
  291. if (success) {
  292. grpc_op op;
  293. memset(&op, 0, sizeof(op));
  294. proxy_call* pc = static_cast<proxy_call*>(gpr_malloc(sizeof(*pc)));
  295. memset(pc, 0, sizeof(*pc));
  296. pc->proxy = proxy;
  297. std::swap(pc->c2p_initial_metadata, proxy->new_call_metadata);
  298. pc->c2p = proxy->new_call;
  299. pc->p2s = grpc_channel_create_call(
  300. proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
  301. proxy->new_call_details.method, &proxy->new_call_details.host,
  302. proxy->new_call_details.deadline, nullptr);
  303. gpr_ref_init(&pc->refs, 1);
  304. op.reserved = nullptr;
  305. op.op = GRPC_OP_RECV_INITIAL_METADATA;
  306. op.flags = 0;
  307. op.data.recv_initial_metadata.recv_initial_metadata =
  308. &pc->p2s_initial_metadata;
  309. refpc(pc, "on_p2s_recv_initial_metadata");
  310. err = grpc_call_start_batch(pc->p2s, &op, 1,
  311. new_closure(on_p2s_recv_initial_metadata, pc),
  312. nullptr);
  313. GPR_ASSERT(err == GRPC_CALL_OK);
  314. op.op = GRPC_OP_SEND_INITIAL_METADATA;
  315. op.flags = proxy->new_call_details.flags;
  316. op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
  317. op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
  318. refpc(pc, "on_p2s_sent_initial_metadata");
  319. err = grpc_call_start_batch(pc->p2s, &op, 1,
  320. new_closure(on_p2s_sent_initial_metadata, pc),
  321. nullptr);
  322. GPR_ASSERT(err == GRPC_CALL_OK);
  323. op.op = GRPC_OP_RECV_MESSAGE;
  324. op.flags = 0;
  325. op.data.recv_message.recv_message = &pc->c2p_msg;
  326. refpc(pc, "on_c2p_recv_msg");
  327. err = grpc_call_start_batch(pc->c2p, &op, 1,
  328. new_closure(on_c2p_recv_msg, pc), nullptr);
  329. GPR_ASSERT(err == GRPC_CALL_OK);
  330. op.op = GRPC_OP_RECV_MESSAGE;
  331. op.flags = 0;
  332. op.data.recv_message.recv_message = &pc->p2s_msg;
  333. refpc(pc, "on_p2s_recv_msg");
  334. err = grpc_call_start_batch(pc->p2s, &op, 1,
  335. new_closure(on_p2s_recv_msg, pc), nullptr);
  336. GPR_ASSERT(err == GRPC_CALL_OK);
  337. op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
  338. op.flags = 0;
  339. op.data.recv_status_on_client.trailing_metadata =
  340. &pc->p2s_trailing_metadata;
  341. op.data.recv_status_on_client.status = &pc->p2s_status;
  342. op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
  343. refpc(pc, "on_p2s_status");
  344. err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
  345. nullptr);
  346. GPR_ASSERT(err == GRPC_CALL_OK);
  347. op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
  348. op.flags = 0;
  349. op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
  350. refpc(pc, "on_c2p_closed");
  351. err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
  352. nullptr);
  353. GPR_ASSERT(err == GRPC_CALL_OK);
  354. request_call(proxy);
  355. grpc_call_details_destroy(&proxy->new_call_details);
  356. grpc_call_details_init(&proxy->new_call_details);
  357. unrefpc(pc, "init");
  358. } else {
  359. GPR_ASSERT(proxy->new_call == nullptr);
  360. }
  361. }
  362. static void request_call(grpc_end2end_proxy* proxy) {
  363. proxy->new_call = nullptr;
  364. GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
  365. proxy->server, &proxy->new_call,
  366. &proxy->new_call_details,
  367. &proxy->new_call_metadata, proxy->cq,
  368. proxy->cq, new_closure(on_new_call, proxy)));
  369. }
  370. static void thread_main(void* arg) {
  371. grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
  372. closure* cl;
  373. for (;;) {
  374. grpc_event ev = grpc_completion_queue_next(
  375. proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
  376. switch (ev.type) {
  377. case GRPC_QUEUE_TIMEOUT:
  378. gpr_log(GPR_ERROR, "Should never reach here");
  379. abort();
  380. case GRPC_QUEUE_SHUTDOWN:
  381. return;
  382. case GRPC_OP_COMPLETE:
  383. cl = static_cast<closure*>(ev.tag);
  384. cl->func(cl->arg, ev.success);
  385. gpr_free(cl);
  386. break;
  387. }
  388. }
  389. }
  390. const char* grpc_end2end_proxy_get_client_target(grpc_end2end_proxy* proxy) {
  391. return proxy->proxy_port.c_str();
  392. }
  393. const char* grpc_end2end_proxy_get_server_port(grpc_end2end_proxy* proxy) {
  394. return proxy->server_port.c_str();
  395. }