test-ipc.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "uv.h"
  22. #include "task.h"
  23. #include <stdio.h>
  24. #include <string.h>
  25. static uv_pipe_t channel;
  26. static uv_tcp_t tcp_server;
  27. static uv_tcp_t tcp_server2;
  28. static uv_tcp_t tcp_connection;
  29. static int exit_cb_called;
  30. static int read_cb_called;
  31. static int tcp_write_cb_called;
  32. static int tcp_read_cb_called;
  33. static int on_pipe_read_called;
  34. static int local_conn_accepted;
  35. static int remote_conn_accepted;
  36. static int tcp_server_listening;
  37. static uv_write_t write_req;
  38. static uv_write_t write_req2;
  39. static uv_write_t conn_notify_req;
  40. static int close_cb_called;
  41. static int connection_accepted;
  42. static int tcp_conn_read_cb_called;
  43. static int tcp_conn_write_cb_called;
  44. static int closed_handle_data_read;
  45. static int closed_handle_write;
  46. static int send_zero_write;
  47. typedef struct {
  48. uv_connect_t conn_req;
  49. uv_write_t tcp_write_req;
  50. uv_tcp_t conn;
  51. } tcp_conn;
  52. #define CONN_COUNT 100
  53. #define BACKLOG 128
  54. #define LARGE_SIZE 100000
  55. static uv_buf_t large_buf;
  56. static char buffer[LARGE_SIZE];
  57. static uv_write_t write_reqs[300];
  58. static int write_reqs_completed;
  59. static unsigned int write_until_data_queued(void);
  60. static void send_handle_and_close(void);
  61. static void close_server_conn_cb(uv_handle_t* handle) {
  62. free(handle);
  63. }
  64. static void on_connection(uv_stream_t* server, int status) {
  65. uv_tcp_t* conn;
  66. int r;
  67. if (!local_conn_accepted) {
  68. /* Accept the connection and close it. Also and close the server. */
  69. ASSERT_EQ(status, 0);
  70. ASSERT_PTR_EQ(&tcp_server, server);
  71. conn = malloc(sizeof(*conn));
  72. ASSERT_NOT_NULL(conn);
  73. r = uv_tcp_init(server->loop, conn);
  74. ASSERT_EQ(r, 0);
  75. r = uv_accept(server, (uv_stream_t*)conn);
  76. ASSERT_EQ(r, 0);
  77. uv_close((uv_handle_t*)conn, close_server_conn_cb);
  78. uv_close((uv_handle_t*)server, NULL);
  79. local_conn_accepted = 1;
  80. }
  81. }
  82. static void exit_cb(uv_process_t* process,
  83. int64_t exit_status,
  84. int term_signal) {
  85. printf("exit_cb\n");
  86. exit_cb_called++;
  87. ASSERT_EQ(exit_status, 0);
  88. ASSERT_EQ(term_signal, 0);
  89. uv_close((uv_handle_t*)process, NULL);
  90. }
  91. static void on_alloc(uv_handle_t* handle,
  92. size_t suggested_size,
  93. uv_buf_t* buf) {
  94. buf->base = malloc(suggested_size);
  95. buf->len = suggested_size;
  96. }
  97. static void close_client_conn_cb(uv_handle_t* handle) {
  98. tcp_conn* p = (tcp_conn*)handle->data;
  99. free(p);
  100. }
  101. static void connect_cb(uv_connect_t* req, int status) {
  102. uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
  103. }
  104. static void make_many_connections(void) {
  105. tcp_conn* conn;
  106. struct sockaddr_in addr;
  107. int r, i;
  108. for (i = 0; i < CONN_COUNT; i++) {
  109. conn = malloc(sizeof(*conn));
  110. ASSERT_NOT_NULL(conn);
  111. r = uv_tcp_init(uv_default_loop(), &conn->conn);
  112. ASSERT_EQ(r, 0);
  113. ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
  114. r = uv_tcp_connect(&conn->conn_req,
  115. (uv_tcp_t*) &conn->conn,
  116. (const struct sockaddr*) &addr,
  117. connect_cb);
  118. ASSERT_EQ(r, 0);
  119. conn->conn.data = conn;
  120. }
  121. }
  122. static void on_read(uv_stream_t* handle,
  123. ssize_t nread,
  124. const uv_buf_t* buf) {
  125. int r;
  126. uv_pipe_t* pipe;
  127. uv_handle_type pending;
  128. uv_buf_t outbuf;
  129. pipe = (uv_pipe_t*) handle;
  130. if (nread == 0) {
  131. /* Everything OK, but nothing read. */
  132. free(buf->base);
  133. return;
  134. }
  135. if (nread < 0) {
  136. if (nread == UV_EOF) {
  137. free(buf->base);
  138. return;
  139. }
  140. printf("error recving on channel: %s\n", uv_strerror(nread));
  141. abort();
  142. }
  143. fprintf(stderr, "got %d bytes\n", (int)nread);
  144. pending = uv_pipe_pending_type(pipe);
  145. if (!tcp_server_listening) {
  146. ASSERT_EQ(1, uv_pipe_pending_count(pipe));
  147. ASSERT_GT(nread, 0);
  148. ASSERT_NOT_NULL(buf->base);
  149. ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
  150. read_cb_called++;
  151. /* Accept the pending TCP server, and start listening on it. */
  152. ASSERT_EQ(pending, UV_TCP);
  153. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  154. ASSERT_EQ(r, 0);
  155. r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
  156. ASSERT_EQ(r, 0);
  157. r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
  158. ASSERT_EQ(r, 0);
  159. tcp_server_listening = 1;
  160. /* Make sure that the expected data is correctly multiplexed. */
  161. ASSERT_MEM_EQ("hello\n", buf->base, nread);
  162. outbuf = uv_buf_init("world\n", 6);
  163. r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
  164. ASSERT_EQ(r, 0);
  165. /* Create a bunch of connections to get both servers to accept. */
  166. make_many_connections();
  167. } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
  168. /* Remote server has accepted a connection. Close the channel. */
  169. ASSERT_EQ(0, uv_pipe_pending_count(pipe));
  170. ASSERT_EQ(pending, UV_UNKNOWN_HANDLE);
  171. remote_conn_accepted = 1;
  172. uv_close((uv_handle_t*)&channel, NULL);
  173. }
  174. free(buf->base);
  175. }
  176. #ifdef _WIN32
  177. static void on_read_listen_after_bound_twice(uv_stream_t* handle,
  178. ssize_t nread,
  179. const uv_buf_t* buf) {
  180. int r;
  181. uv_pipe_t* pipe;
  182. uv_handle_type pending;
  183. pipe = (uv_pipe_t*) handle;
  184. if (nread == 0) {
  185. /* Everything OK, but nothing read. */
  186. free(buf->base);
  187. return;
  188. }
  189. if (nread < 0) {
  190. if (nread == UV_EOF) {
  191. free(buf->base);
  192. return;
  193. }
  194. printf("error recving on channel: %s\n", uv_strerror(nread));
  195. abort();
  196. }
  197. fprintf(stderr, "got %d bytes\n", (int)nread);
  198. ASSERT_GT(uv_pipe_pending_count(pipe), 0);
  199. pending = uv_pipe_pending_type(pipe);
  200. ASSERT_GT(nread, 0);
  201. ASSERT_NOT_NULL(buf->base);
  202. ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
  203. read_cb_called++;
  204. if (read_cb_called == 1) {
  205. /* Accept the first TCP server, and start listening on it. */
  206. ASSERT_EQ(pending, UV_TCP);
  207. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  208. ASSERT_EQ(r, 0);
  209. r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
  210. ASSERT_EQ(r, 0);
  211. r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
  212. ASSERT_EQ(r, 0);
  213. } else if (read_cb_called == 2) {
  214. /* Accept the second TCP server, and start listening on it. */
  215. ASSERT_EQ(pending, UV_TCP);
  216. r = uv_tcp_init(uv_default_loop(), &tcp_server2);
  217. ASSERT_EQ(r, 0);
  218. r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
  219. ASSERT_EQ(r, 0);
  220. r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection);
  221. ASSERT_EQ(r, UV_EADDRINUSE);
  222. uv_close((uv_handle_t*)&tcp_server, NULL);
  223. uv_close((uv_handle_t*)&tcp_server2, NULL);
  224. ASSERT_EQ(0, uv_pipe_pending_count(pipe));
  225. uv_close((uv_handle_t*)&channel, NULL);
  226. }
  227. free(buf->base);
  228. }
  229. #endif
  230. void spawn_helper(uv_pipe_t* channel,
  231. uv_process_t* process,
  232. const char* helper) {
  233. uv_process_options_t options;
  234. size_t exepath_size;
  235. char exepath[1024];
  236. char* args[3];
  237. int r;
  238. uv_stdio_container_t stdio[3];
  239. r = uv_pipe_init(uv_default_loop(), channel, 1);
  240. ASSERT_EQ(r, 0);
  241. ASSERT_NE(channel->ipc, 0);
  242. exepath_size = sizeof(exepath);
  243. r = uv_exepath(exepath, &exepath_size);
  244. ASSERT_EQ(r, 0);
  245. exepath[exepath_size] = '\0';
  246. args[0] = exepath;
  247. args[1] = (char*)helper;
  248. args[2] = NULL;
  249. memset(&options, 0, sizeof(options));
  250. options.file = exepath;
  251. options.args = args;
  252. options.exit_cb = exit_cb;
  253. options.stdio = stdio;
  254. options.stdio_count = ARRAY_SIZE(stdio);
  255. stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
  256. stdio[0].data.stream = (uv_stream_t*) channel;
  257. stdio[1].flags = UV_INHERIT_FD;
  258. stdio[1].data.fd = 1;
  259. stdio[2].flags = UV_INHERIT_FD;
  260. stdio[2].data.fd = 2;
  261. r = uv_spawn(uv_default_loop(), process, &options);
  262. ASSERT_EQ(r, 0);
  263. }
  264. static void on_tcp_write(uv_write_t* req, int status) {
  265. ASSERT_EQ(status, 0);
  266. ASSERT_PTR_EQ(req->handle, &tcp_connection);
  267. tcp_write_cb_called++;
  268. }
  269. static void on_read_alloc(uv_handle_t* handle,
  270. size_t suggested_size,
  271. uv_buf_t* buf) {
  272. buf->base = malloc(suggested_size);
  273. buf->len = suggested_size;
  274. }
  275. static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
  276. ASSERT_GT(nread, 0);
  277. ASSERT_MEM_EQ("hello again\n", buf->base, nread);
  278. ASSERT_PTR_EQ(tcp, &tcp_connection);
  279. free(buf->base);
  280. tcp_read_cb_called++;
  281. uv_close((uv_handle_t*)tcp, NULL);
  282. uv_close((uv_handle_t*)&channel, NULL);
  283. }
  284. static void on_read_connection(uv_stream_t* handle,
  285. ssize_t nread,
  286. const uv_buf_t* buf) {
  287. int r;
  288. uv_buf_t outbuf;
  289. uv_pipe_t* pipe;
  290. uv_handle_type pending;
  291. pipe = (uv_pipe_t*) handle;
  292. if (nread == 0) {
  293. /* Everything OK, but nothing read. */
  294. free(buf->base);
  295. return;
  296. }
  297. if (nread < 0) {
  298. if (nread == UV_EOF) {
  299. free(buf->base);
  300. return;
  301. }
  302. printf("error recving on channel: %s\n", uv_strerror(nread));
  303. abort();
  304. }
  305. fprintf(stderr, "got %d bytes\n", (int)nread);
  306. ASSERT_EQ(1, uv_pipe_pending_count(pipe));
  307. pending = uv_pipe_pending_type(pipe);
  308. ASSERT_GT(nread, 0);
  309. ASSERT_NOT_NULL(buf->base);
  310. ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
  311. read_cb_called++;
  312. /* Accept the pending TCP connection */
  313. ASSERT_EQ(pending, UV_TCP);
  314. r = uv_tcp_init(uv_default_loop(), &tcp_connection);
  315. ASSERT_EQ(r, 0);
  316. r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
  317. ASSERT_EQ(r, 0);
  318. /* Make sure that the expected data is correctly multiplexed. */
  319. ASSERT_MEM_EQ("hello\n", buf->base, nread);
  320. /* Write/read to/from the connection */
  321. outbuf = uv_buf_init("world\n", 6);
  322. r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
  323. on_tcp_write);
  324. ASSERT_EQ(r, 0);
  325. r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
  326. ASSERT_EQ(r, 0);
  327. free(buf->base);
  328. }
  329. #ifndef _WIN32
  330. static void on_read_closed_handle(uv_stream_t* handle,
  331. ssize_t nread,
  332. const uv_buf_t* buf) {
  333. if (nread == 0 || nread == UV_EOF) {
  334. free(buf->base);
  335. return;
  336. }
  337. if (nread < 0) {
  338. printf("error recving on channel: %s\n", uv_strerror(nread));
  339. abort();
  340. }
  341. closed_handle_data_read += nread;
  342. free(buf->base);
  343. }
  344. #endif
  345. static void on_read_send_zero(uv_stream_t* handle,
  346. ssize_t nread,
  347. const uv_buf_t* buf) {
  348. ASSERT(nread == 0 || nread == UV_EOF);
  349. free(buf->base);
  350. }
  351. static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
  352. uv_process_t process;
  353. int r;
  354. spawn_helper(&channel, &process, helper);
  355. uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
  356. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  357. ASSERT_EQ(r, 0);
  358. MAKE_VALGRIND_HAPPY();
  359. return 0;
  360. }
  361. TEST_IMPL(ipc_listen_before_write) {
  362. #if defined(NO_SEND_HANDLE_ON_PIPE)
  363. RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
  364. #endif
  365. int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
  366. ASSERT_EQ(local_conn_accepted, 1);
  367. ASSERT_EQ(remote_conn_accepted, 1);
  368. ASSERT_EQ(read_cb_called, 1);
  369. ASSERT_EQ(exit_cb_called, 1);
  370. return r;
  371. }
  372. TEST_IMPL(ipc_listen_after_write) {
  373. #if defined(NO_SEND_HANDLE_ON_PIPE)
  374. RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
  375. #endif
  376. int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
  377. ASSERT_EQ(local_conn_accepted, 1);
  378. ASSERT_EQ(remote_conn_accepted, 1);
  379. ASSERT_EQ(read_cb_called, 1);
  380. ASSERT_EQ(exit_cb_called, 1);
  381. return r;
  382. }
  383. TEST_IMPL(ipc_tcp_connection) {
  384. #if defined(NO_SEND_HANDLE_ON_PIPE)
  385. RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
  386. #endif
  387. int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
  388. ASSERT_EQ(read_cb_called, 1);
  389. ASSERT_EQ(tcp_write_cb_called, 1);
  390. ASSERT_EQ(tcp_read_cb_called, 1);
  391. ASSERT_EQ(exit_cb_called, 1);
  392. return r;
  393. }
  394. #ifndef _WIN32
  395. TEST_IMPL(ipc_closed_handle) {
  396. int r;
  397. r = run_ipc_test("ipc_helper_closed_handle", on_read_closed_handle);
  398. ASSERT_EQ(r, 0);
  399. return 0;
  400. }
  401. #endif
  402. #ifdef _WIN32
  403. TEST_IMPL(listen_with_simultaneous_accepts) {
  404. uv_tcp_t server;
  405. int r;
  406. struct sockaddr_in addr;
  407. ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  408. r = uv_tcp_init(uv_default_loop(), &server);
  409. ASSERT_EQ(r, 0);
  410. r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
  411. ASSERT_EQ(r, 0);
  412. r = uv_tcp_simultaneous_accepts(&server, 1);
  413. ASSERT_EQ(r, 0);
  414. r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
  415. ASSERT_EQ(r, 0);
  416. ASSERT_EQ(server.reqs_pending, 32);
  417. MAKE_VALGRIND_HAPPY();
  418. return 0;
  419. }
  420. TEST_IMPL(listen_no_simultaneous_accepts) {
  421. uv_tcp_t server;
  422. int r;
  423. struct sockaddr_in addr;
  424. ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  425. r = uv_tcp_init(uv_default_loop(), &server);
  426. ASSERT_EQ(r, 0);
  427. r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
  428. ASSERT_EQ(r, 0);
  429. r = uv_tcp_simultaneous_accepts(&server, 0);
  430. ASSERT_EQ(r, 0);
  431. r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
  432. ASSERT_EQ(r, 0);
  433. ASSERT_EQ(server.reqs_pending, 1);
  434. MAKE_VALGRIND_HAPPY();
  435. return 0;
  436. }
  437. TEST_IMPL(ipc_listen_after_bind_twice) {
  438. #if defined(NO_SEND_HANDLE_ON_PIPE)
  439. RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
  440. #endif
  441. int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
  442. ASSERT_EQ(read_cb_called, 2);
  443. ASSERT_EQ(exit_cb_called, 1);
  444. return r;
  445. }
  446. #endif
  447. TEST_IMPL(ipc_send_zero) {
  448. int r;
  449. r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero);
  450. ASSERT_EQ(r, 0);
  451. return 0;
  452. }
  453. /* Everything here runs in a child process. */
  454. static tcp_conn conn;
  455. static void close_cb(uv_handle_t* handle) {
  456. close_cb_called++;
  457. }
  458. static void conn_notify_write_cb(uv_write_t* req, int status) {
  459. uv_close((uv_handle_t*)&tcp_server, close_cb);
  460. uv_close((uv_handle_t*)&channel, close_cb);
  461. }
  462. static void tcp_connection_write_cb(uv_write_t* req, int status) {
  463. ASSERT_PTR_EQ(&conn.conn, req->handle);
  464. uv_close((uv_handle_t*)req->handle, close_cb);
  465. uv_close((uv_handle_t*)&channel, close_cb);
  466. uv_close((uv_handle_t*)&tcp_server, close_cb);
  467. tcp_conn_write_cb_called++;
  468. }
  469. static void closed_handle_large_write_cb(uv_write_t* req, int status) {
  470. ASSERT_EQ(status, 0);
  471. ASSERT(closed_handle_data_read = LARGE_SIZE);
  472. if (++write_reqs_completed == ARRAY_SIZE(write_reqs)) {
  473. write_reqs_completed = 0;
  474. if (write_until_data_queued() > 0)
  475. send_handle_and_close();
  476. }
  477. }
  478. static void closed_handle_write_cb(uv_write_t* req, int status) {
  479. ASSERT_EQ(status, UV_EBADF);
  480. closed_handle_write = 1;
  481. }
  482. static void send_zero_write_cb(uv_write_t* req, int status) {
  483. ASSERT_EQ(status, 0);
  484. send_zero_write++;
  485. }
  486. static void on_tcp_child_process_read(uv_stream_t* tcp,
  487. ssize_t nread,
  488. const uv_buf_t* buf) {
  489. uv_buf_t outbuf;
  490. int r;
  491. if (nread < 0) {
  492. if (nread == UV_EOF) {
  493. free(buf->base);
  494. return;
  495. }
  496. printf("error recving on tcp connection: %s\n", uv_strerror(nread));
  497. abort();
  498. }
  499. ASSERT_GT(nread, 0);
  500. ASSERT_MEM_EQ("world\n", buf->base, nread);
  501. on_pipe_read_called++;
  502. free(buf->base);
  503. /* Write to the socket */
  504. outbuf = uv_buf_init("hello again\n", 12);
  505. r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
  506. ASSERT_EQ(r, 0);
  507. tcp_conn_read_cb_called++;
  508. }
  509. static void connect_child_process_cb(uv_connect_t* req, int status) {
  510. int r;
  511. ASSERT_EQ(status, 0);
  512. r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
  513. ASSERT_EQ(r, 0);
  514. }
  515. static void ipc_on_connection(uv_stream_t* server, int status) {
  516. int r;
  517. uv_buf_t buf;
  518. if (!connection_accepted) {
  519. /*
  520. * Accept the connection and close it. Also let the other
  521. * side know.
  522. */
  523. ASSERT_EQ(status, 0);
  524. ASSERT_PTR_EQ(&tcp_server, server);
  525. r = uv_tcp_init(server->loop, &conn.conn);
  526. ASSERT_EQ(r, 0);
  527. r = uv_accept(server, (uv_stream_t*)&conn.conn);
  528. ASSERT_EQ(r, 0);
  529. uv_close((uv_handle_t*)&conn.conn, close_cb);
  530. buf = uv_buf_init("accepted_connection\n", 20);
  531. r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
  532. NULL, conn_notify_write_cb);
  533. ASSERT_EQ(r, 0);
  534. connection_accepted = 1;
  535. }
  536. }
  537. static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
  538. int r;
  539. uv_buf_t buf;
  540. uv_tcp_t* conn;
  541. ASSERT_EQ(status, 0);
  542. ASSERT_PTR_EQ(&tcp_server, server);
  543. conn = malloc(sizeof(*conn));
  544. ASSERT_NOT_NULL(conn);
  545. r = uv_tcp_init(server->loop, conn);
  546. ASSERT_EQ(r, 0);
  547. r = uv_accept(server, (uv_stream_t*)conn);
  548. ASSERT_EQ(r, 0);
  549. /* Send the accepted connection to the other process */
  550. buf = uv_buf_init("hello\n", 6);
  551. r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
  552. (uv_stream_t*)conn, NULL);
  553. ASSERT_EQ(r, 0);
  554. r = uv_read_start((uv_stream_t*) conn,
  555. on_read_alloc,
  556. on_tcp_child_process_read);
  557. ASSERT_EQ(r, 0);
  558. uv_close((uv_handle_t*)conn, close_cb);
  559. }
  560. int ipc_helper(int listen_after_write) {
  561. /*
  562. * This is launched from test-ipc.c. stdin is a duplex channel that we
  563. * over which a handle will be transmitted.
  564. */
  565. struct sockaddr_in addr;
  566. int r;
  567. uv_buf_t buf;
  568. ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  569. r = uv_pipe_init(uv_default_loop(), &channel, 1);
  570. ASSERT_EQ(r, 0);
  571. uv_pipe_open(&channel, 0);
  572. ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
  573. ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
  574. ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
  575. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  576. ASSERT_EQ(r, 0);
  577. r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
  578. ASSERT_EQ(r, 0);
  579. if (!listen_after_write) {
  580. r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
  581. ASSERT_EQ(r, 0);
  582. }
  583. buf = uv_buf_init("hello\n", 6);
  584. r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
  585. (uv_stream_t*)&tcp_server, NULL);
  586. ASSERT_EQ(r, 0);
  587. if (listen_after_write) {
  588. r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
  589. ASSERT_EQ(r, 0);
  590. }
  591. notify_parent_process();
  592. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  593. ASSERT_EQ(r, 0);
  594. ASSERT_EQ(connection_accepted, 1);
  595. ASSERT_EQ(close_cb_called, 3);
  596. MAKE_VALGRIND_HAPPY();
  597. return 0;
  598. }
  599. int ipc_helper_tcp_connection(void) {
  600. /*
  601. * This is launched from test-ipc.c. stdin is a duplex channel
  602. * over which a handle will be transmitted.
  603. */
  604. int r;
  605. struct sockaddr_in addr;
  606. r = uv_pipe_init(uv_default_loop(), &channel, 1);
  607. ASSERT_EQ(r, 0);
  608. uv_pipe_open(&channel, 0);
  609. ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
  610. ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
  611. ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
  612. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  613. ASSERT_EQ(r, 0);
  614. ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  615. r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
  616. ASSERT_EQ(r, 0);
  617. r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn);
  618. ASSERT_EQ(r, 0);
  619. /* Make a connection to the server */
  620. r = uv_tcp_init(uv_default_loop(), &conn.conn);
  621. ASSERT_EQ(r, 0);
  622. ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
  623. r = uv_tcp_connect(&conn.conn_req,
  624. (uv_tcp_t*) &conn.conn,
  625. (const struct sockaddr*) &addr,
  626. connect_child_process_cb);
  627. ASSERT_EQ(r, 0);
  628. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  629. ASSERT_EQ(r, 0);
  630. ASSERT_EQ(tcp_conn_read_cb_called, 1);
  631. ASSERT_EQ(tcp_conn_write_cb_called, 1);
  632. ASSERT_EQ(close_cb_called, 4);
  633. MAKE_VALGRIND_HAPPY();
  634. return 0;
  635. }
  636. static unsigned int write_until_data_queued() {
  637. unsigned int i;
  638. int r;
  639. i = 0;
  640. do {
  641. r = uv_write(&write_reqs[i],
  642. (uv_stream_t*)&channel,
  643. &large_buf,
  644. 1,
  645. closed_handle_large_write_cb);
  646. ASSERT_EQ(r, 0);
  647. i++;
  648. } while (channel.write_queue_size == 0 &&
  649. i < ARRAY_SIZE(write_reqs));
  650. return channel.write_queue_size;
  651. }
  652. static void send_handle_and_close() {
  653. int r;
  654. struct sockaddr_in addr;
  655. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  656. ASSERT_EQ(r, 0);
  657. ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  658. r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
  659. ASSERT_EQ(r, 0);
  660. r = uv_write2(&write_req,
  661. (uv_stream_t*)&channel,
  662. &large_buf,
  663. 1,
  664. (uv_stream_t*)&tcp_server,
  665. closed_handle_write_cb);
  666. ASSERT_EQ(r, 0);
  667. uv_close((uv_handle_t*)&tcp_server, NULL);
  668. }
  669. int ipc_helper_closed_handle(void) {
  670. int r;
  671. memset(buffer, '.', LARGE_SIZE);
  672. large_buf = uv_buf_init(buffer, LARGE_SIZE);
  673. r = uv_pipe_init(uv_default_loop(), &channel, 1);
  674. ASSERT_EQ(r, 0);
  675. uv_pipe_open(&channel, 0);
  676. ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
  677. ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
  678. ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
  679. if (write_until_data_queued() > 0)
  680. send_handle_and_close();
  681. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  682. ASSERT_EQ(r, 0);
  683. ASSERT_EQ(closed_handle_write, 1);
  684. MAKE_VALGRIND_HAPPY();
  685. return 0;
  686. }
  687. int ipc_helper_bind_twice(void) {
  688. /*
  689. * This is launched from test-ipc.c. stdin is a duplex channel
  690. * over which two handles will be transmitted.
  691. */
  692. struct sockaddr_in addr;
  693. int r;
  694. uv_buf_t buf;
  695. ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  696. r = uv_pipe_init(uv_default_loop(), &channel, 1);
  697. ASSERT_EQ(r, 0);
  698. uv_pipe_open(&channel, 0);
  699. ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
  700. ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
  701. ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
  702. buf = uv_buf_init("hello\n", 6);
  703. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  704. ASSERT_EQ(r, 0);
  705. r = uv_tcp_init(uv_default_loop(), &tcp_server2);
  706. ASSERT_EQ(r, 0);
  707. r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
  708. ASSERT_EQ(r, 0);
  709. r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
  710. ASSERT_EQ(r, 0);
  711. r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
  712. (uv_stream_t*)&tcp_server, NULL);
  713. ASSERT_EQ(r, 0);
  714. r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
  715. (uv_stream_t*)&tcp_server2, NULL);
  716. ASSERT_EQ(r, 0);
  717. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  718. ASSERT_EQ(r, 0);
  719. MAKE_VALGRIND_HAPPY();
  720. return 0;
  721. }
  722. int ipc_helper_send_zero(void) {
  723. int r;
  724. uv_buf_t zero_buf;
  725. zero_buf = uv_buf_init(0, 0);
  726. r = uv_pipe_init(uv_default_loop(), &channel, 0);
  727. ASSERT_EQ(r, 0);
  728. uv_pipe_open(&channel, 0);
  729. ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
  730. ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
  731. ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
  732. r = uv_write(&write_req,
  733. (uv_stream_t*)&channel,
  734. &zero_buf,
  735. 1,
  736. send_zero_write_cb);
  737. ASSERT_EQ(r, 0);
  738. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  739. ASSERT_EQ(r, 0);
  740. ASSERT_EQ(send_zero_write, 1);
  741. MAKE_VALGRIND_HAPPY();
  742. return 0;
  743. }