test-poll.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include <errno.h>
  22. #ifdef _WIN32
  23. # include <fcntl.h>
  24. #else
  25. # include <sys/socket.h>
  26. # include <unistd.h>
  27. #endif
  28. #include "uv.h"
  29. #include "task.h"
  30. #ifdef __linux__
  31. # include <sys/epoll.h>
  32. #endif
  33. #ifdef UV_HAVE_KQUEUE
  34. # include <sys/types.h>
  35. # include <sys/event.h>
  36. # include <sys/time.h>
  37. #endif
  38. #define NUM_CLIENTS 5
  39. #define TRANSFER_BYTES (1 << 16)
  40. #undef MIN
  41. #define MIN(a, b) (((a) < (b)) ? (a) : (b));
  42. typedef enum {
  43. UNIDIRECTIONAL,
  44. DUPLEX
  45. } test_mode_t;
  46. typedef struct connection_context_s {
  47. uv_poll_t poll_handle;
  48. uv_timer_t timer_handle;
  49. uv_os_sock_t sock;
  50. size_t read, sent;
  51. int is_server_connection;
  52. int open_handles;
  53. int got_fin, sent_fin, got_disconnect;
  54. unsigned int events, delayed_events;
  55. } connection_context_t;
  56. typedef struct server_context_s {
  57. uv_poll_t poll_handle;
  58. uv_os_sock_t sock;
  59. int connections;
  60. } server_context_t;
  61. static void delay_timer_cb(uv_timer_t* timer);
  62. static test_mode_t test_mode = DUPLEX;
  63. static int closed_connections = 0;
  64. static int valid_writable_wakeups = 0;
  65. static int spurious_writable_wakeups = 0;
  66. #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
  67. static int disconnects = 0;
  68. #endif /* !__sun && !_AIX && !__MVS__ */
  69. static int got_eagain(void) {
  70. #ifdef _WIN32
  71. return WSAGetLastError() == WSAEWOULDBLOCK;
  72. #else
  73. return errno == EAGAIN
  74. || errno == EINPROGRESS
  75. #ifdef EWOULDBLOCK
  76. || errno == EWOULDBLOCK;
  77. #endif
  78. ;
  79. #endif
  80. }
  81. static uv_os_sock_t create_bound_socket (struct sockaddr_in bind_addr) {
  82. uv_os_sock_t sock;
  83. int r;
  84. sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  85. #ifdef _WIN32
  86. ASSERT(sock != INVALID_SOCKET);
  87. #else
  88. ASSERT(sock >= 0);
  89. #endif
  90. #ifndef _WIN32
  91. {
  92. /* Allow reuse of the port. */
  93. int yes = 1;
  94. r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
  95. ASSERT(r == 0);
  96. }
  97. #endif
  98. r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr);
  99. ASSERT(r == 0);
  100. return sock;
  101. }
  102. static void close_socket(uv_os_sock_t sock) {
  103. int r;
  104. #ifdef _WIN32
  105. r = closesocket(sock);
  106. #else
  107. r = close(sock);
  108. #endif
  109. /* On FreeBSD close() can fail with ECONNRESET if the socket was shutdown by
  110. * the peer before all pending data was delivered.
  111. */
  112. ASSERT(r == 0 || errno == ECONNRESET);
  113. }
  114. static connection_context_t* create_connection_context(
  115. uv_os_sock_t sock, int is_server_connection) {
  116. int r;
  117. connection_context_t* context;
  118. context = (connection_context_t*) malloc(sizeof *context);
  119. ASSERT(context != NULL);
  120. context->sock = sock;
  121. context->is_server_connection = is_server_connection;
  122. context->read = 0;
  123. context->sent = 0;
  124. context->open_handles = 0;
  125. context->events = 0;
  126. context->delayed_events = 0;
  127. context->got_fin = 0;
  128. context->sent_fin = 0;
  129. context->got_disconnect = 0;
  130. r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
  131. context->open_handles++;
  132. context->poll_handle.data = context;
  133. ASSERT(r == 0);
  134. r = uv_timer_init(uv_default_loop(), &context->timer_handle);
  135. context->open_handles++;
  136. context->timer_handle.data = context;
  137. ASSERT(r == 0);
  138. return context;
  139. }
  140. static void connection_close_cb(uv_handle_t* handle) {
  141. connection_context_t* context = (connection_context_t*) handle->data;
  142. if (--context->open_handles == 0) {
  143. if (test_mode == DUPLEX || context->is_server_connection) {
  144. ASSERT(context->read == TRANSFER_BYTES);
  145. } else {
  146. ASSERT(context->read == 0);
  147. }
  148. if (test_mode == DUPLEX || !context->is_server_connection) {
  149. ASSERT(context->sent == TRANSFER_BYTES);
  150. } else {
  151. ASSERT(context->sent == 0);
  152. }
  153. closed_connections++;
  154. free(context);
  155. }
  156. }
  157. static void destroy_connection_context(connection_context_t* context) {
  158. uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb);
  159. uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb);
  160. }
  161. static void connection_poll_cb(uv_poll_t* handle, int status, int events) {
  162. connection_context_t* context = (connection_context_t*) handle->data;
  163. unsigned int new_events;
  164. int r;
  165. ASSERT(status == 0);
  166. ASSERT(events & context->events);
  167. ASSERT(!(events & ~context->events));
  168. new_events = context->events;
  169. if (events & UV_READABLE) {
  170. int action = rand() % 7;
  171. switch (action) {
  172. case 0:
  173. case 1: {
  174. /* Read a couple of bytes. */
  175. static char buffer[74];
  176. do
  177. r = recv(context->sock, buffer, sizeof buffer, 0);
  178. while (r == -1 && errno == EINTR);
  179. ASSERT(r >= 0);
  180. if (r > 0) {
  181. context->read += r;
  182. } else {
  183. /* Got FIN. */
  184. context->got_fin = 1;
  185. new_events &= ~UV_READABLE;
  186. }
  187. break;
  188. }
  189. case 2:
  190. case 3: {
  191. /* Read until EAGAIN. */
  192. static char buffer[931];
  193. for (;;) {
  194. do
  195. r = recv(context->sock, buffer, sizeof buffer, 0);
  196. while (r == -1 && errno == EINTR);
  197. if (r <= 0)
  198. break;
  199. context->read += r;
  200. }
  201. if (r == 0) {
  202. /* Got FIN. */
  203. context->got_fin = 1;
  204. new_events &= ~UV_READABLE;
  205. } else {
  206. ASSERT(got_eagain());
  207. }
  208. break;
  209. }
  210. case 4:
  211. /* Ignore. */
  212. break;
  213. case 5:
  214. /* Stop reading for a while. Restart in timer callback. */
  215. new_events &= ~UV_READABLE;
  216. if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
  217. context->delayed_events = UV_READABLE;
  218. uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0);
  219. } else {
  220. context->delayed_events |= UV_READABLE;
  221. }
  222. break;
  223. case 6:
  224. /* Fudge with the event mask. */
  225. uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb);
  226. uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb);
  227. context->events = UV_READABLE;
  228. break;
  229. default:
  230. ASSERT(0);
  231. }
  232. }
  233. if (events & UV_WRITABLE) {
  234. if (context->sent < TRANSFER_BYTES &&
  235. !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) {
  236. /* We have to send more bytes. */
  237. int action = rand() % 7;
  238. switch (action) {
  239. case 0:
  240. case 1: {
  241. /* Send a couple of bytes. */
  242. static char buffer[103];
  243. int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
  244. ASSERT(send_bytes > 0);
  245. do
  246. r = send(context->sock, buffer, send_bytes, 0);
  247. while (r == -1 && errno == EINTR);
  248. if (r < 0) {
  249. ASSERT(got_eagain());
  250. spurious_writable_wakeups++;
  251. break;
  252. }
  253. ASSERT(r > 0);
  254. context->sent += r;
  255. valid_writable_wakeups++;
  256. break;
  257. }
  258. case 2:
  259. case 3: {
  260. /* Send until EAGAIN. */
  261. static char buffer[1234];
  262. int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
  263. ASSERT(send_bytes > 0);
  264. do
  265. r = send(context->sock, buffer, send_bytes, 0);
  266. while (r == -1 && errno == EINTR);
  267. if (r < 0) {
  268. ASSERT(got_eagain());
  269. spurious_writable_wakeups++;
  270. break;
  271. }
  272. ASSERT(r > 0);
  273. valid_writable_wakeups++;
  274. context->sent += r;
  275. while (context->sent < TRANSFER_BYTES) {
  276. send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
  277. ASSERT(send_bytes > 0);
  278. do
  279. r = send(context->sock, buffer, send_bytes, 0);
  280. while (r == -1 && errno == EINTR);
  281. ASSERT(r != 0);
  282. if (r < 0) {
  283. ASSERT(got_eagain());
  284. break;
  285. }
  286. context->sent += r;
  287. }
  288. break;
  289. }
  290. case 4:
  291. /* Ignore. */
  292. break;
  293. case 5:
  294. /* Stop sending for a while. Restart in timer callback. */
  295. new_events &= ~UV_WRITABLE;
  296. if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
  297. context->delayed_events = UV_WRITABLE;
  298. uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0);
  299. } else {
  300. context->delayed_events |= UV_WRITABLE;
  301. }
  302. break;
  303. case 6:
  304. /* Fudge with the event mask. */
  305. uv_poll_start(&context->poll_handle,
  306. UV_READABLE,
  307. connection_poll_cb);
  308. uv_poll_start(&context->poll_handle,
  309. UV_WRITABLE,
  310. connection_poll_cb);
  311. context->events = UV_WRITABLE;
  312. break;
  313. default:
  314. ASSERT(0);
  315. }
  316. } else {
  317. /* Nothing more to write. Send FIN. */
  318. int r;
  319. #ifdef _WIN32
  320. r = shutdown(context->sock, SD_SEND);
  321. #else
  322. r = shutdown(context->sock, SHUT_WR);
  323. #endif
  324. ASSERT(r == 0);
  325. context->sent_fin = 1;
  326. new_events &= ~UV_WRITABLE;
  327. }
  328. }
  329. #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
  330. if (events & UV_DISCONNECT) {
  331. context->got_disconnect = 1;
  332. ++disconnects;
  333. new_events &= ~UV_DISCONNECT;
  334. }
  335. if (context->got_fin && context->sent_fin && context->got_disconnect) {
  336. #else /* __sun && _AIX && __MVS__ */
  337. if (context->got_fin && context->sent_fin) {
  338. #endif /* !__sun && !_AIX && !__MVS__ */
  339. /* Sent and received FIN. Close and destroy context. */
  340. close_socket(context->sock);
  341. destroy_connection_context(context);
  342. context->events = 0;
  343. } else if (new_events != context->events) {
  344. /* Poll mask changed. Call uv_poll_start again. */
  345. context->events = new_events;
  346. uv_poll_start(handle, new_events, connection_poll_cb);
  347. }
  348. /* Assert that uv_is_active works correctly for poll handles. */
  349. if (context->events != 0) {
  350. ASSERT(1 == uv_is_active((uv_handle_t*) handle));
  351. } else {
  352. ASSERT(0 == uv_is_active((uv_handle_t*) handle));
  353. }
  354. }
  355. static void delay_timer_cb(uv_timer_t* timer) {
  356. connection_context_t* context = (connection_context_t*) timer->data;
  357. int r;
  358. /* Timer should auto stop. */
  359. ASSERT(0 == uv_is_active((uv_handle_t*) timer));
  360. /* Add the requested events to the poll mask. */
  361. ASSERT(context->delayed_events != 0);
  362. context->events |= context->delayed_events;
  363. context->delayed_events = 0;
  364. r = uv_poll_start(&context->poll_handle,
  365. context->events,
  366. connection_poll_cb);
  367. ASSERT(r == 0);
  368. }
  369. static server_context_t* create_server_context(
  370. uv_os_sock_t sock) {
  371. int r;
  372. server_context_t* context;
  373. context = (server_context_t*) malloc(sizeof *context);
  374. ASSERT(context != NULL);
  375. context->sock = sock;
  376. context->connections = 0;
  377. r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
  378. context->poll_handle.data = context;
  379. ASSERT(r == 0);
  380. return context;
  381. }
  382. static void server_close_cb(uv_handle_t* handle) {
  383. server_context_t* context = (server_context_t*) handle->data;
  384. free(context);
  385. }
  386. static void destroy_server_context(server_context_t* context) {
  387. uv_close((uv_handle_t*) &context->poll_handle, server_close_cb);
  388. }
  389. static void server_poll_cb(uv_poll_t* handle, int status, int events) {
  390. server_context_t* server_context = (server_context_t*)
  391. handle->data;
  392. connection_context_t* connection_context;
  393. struct sockaddr_in addr;
  394. socklen_t addr_len;
  395. uv_os_sock_t sock;
  396. int r;
  397. addr_len = sizeof addr;
  398. sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len);
  399. #ifdef _WIN32
  400. ASSERT(sock != INVALID_SOCKET);
  401. #else
  402. ASSERT(sock >= 0);
  403. #endif
  404. connection_context = create_connection_context(sock, 1);
  405. connection_context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
  406. r = uv_poll_start(&connection_context->poll_handle,
  407. UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
  408. connection_poll_cb);
  409. ASSERT(r == 0);
  410. if (++server_context->connections == NUM_CLIENTS) {
  411. close_socket(server_context->sock);
  412. destroy_server_context(server_context);
  413. }
  414. }
  415. static void start_server(void) {
  416. server_context_t* context;
  417. struct sockaddr_in addr;
  418. uv_os_sock_t sock;
  419. int r;
  420. ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
  421. sock = create_bound_socket(addr);
  422. context = create_server_context(sock);
  423. r = listen(sock, 100);
  424. ASSERT(r == 0);
  425. r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb);
  426. ASSERT(r == 0);
  427. }
  428. static void start_client(void) {
  429. uv_os_sock_t sock;
  430. connection_context_t* context;
  431. struct sockaddr_in server_addr;
  432. struct sockaddr_in addr;
  433. int r;
  434. ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
  435. ASSERT(0 == uv_ip4_addr("0.0.0.0", 0, &addr));
  436. sock = create_bound_socket(addr);
  437. context = create_connection_context(sock, 0);
  438. context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
  439. r = uv_poll_start(&context->poll_handle,
  440. UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
  441. connection_poll_cb);
  442. ASSERT(r == 0);
  443. r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr);
  444. ASSERT(r == 0 || got_eagain());
  445. }
  446. static void start_poll_test(void) {
  447. int i, r;
  448. #ifdef _WIN32
  449. {
  450. struct WSAData wsa_data;
  451. int r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
  452. ASSERT(r == 0);
  453. }
  454. #endif
  455. start_server();
  456. for (i = 0; i < NUM_CLIENTS; i++)
  457. start_client();
  458. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  459. ASSERT(r == 0);
  460. /* Assert that at most five percent of the writable wakeups was spurious. */
  461. ASSERT(spurious_writable_wakeups == 0 ||
  462. (valid_writable_wakeups + spurious_writable_wakeups) /
  463. spurious_writable_wakeups > 20);
  464. ASSERT(closed_connections == NUM_CLIENTS * 2);
  465. #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
  466. ASSERT(disconnects == NUM_CLIENTS * 2);
  467. #endif
  468. MAKE_VALGRIND_HAPPY();
  469. }
  470. TEST_IMPL(poll_duplex) {
  471. #if defined(NO_SELF_CONNECT)
  472. RETURN_SKIP(NO_SELF_CONNECT);
  473. #endif
  474. test_mode = DUPLEX;
  475. start_poll_test();
  476. return 0;
  477. }
  478. TEST_IMPL(poll_unidirectional) {
  479. #if defined(NO_SELF_CONNECT)
  480. RETURN_SKIP(NO_SELF_CONNECT);
  481. #endif
  482. test_mode = UNIDIRECTIONAL;
  483. start_poll_test();
  484. return 0;
  485. }
  486. /* Windows won't let you open a directory so we open a file instead.
  487. * OS X lets you poll a file so open the $PWD instead. Both fail
  488. * on Linux so it doesn't matter which one we pick. Both succeed
  489. * on FreeBSD, Solaris and AIX so skip the test on those platforms.
  490. */
  491. TEST_IMPL(poll_bad_fdtype) {
  492. #if !defined(__DragonFly__) && !defined(__FreeBSD__) && !defined(__sun) && \
  493. !defined(_AIX) && !defined(__MVS__) && !defined(__FreeBSD_kernel__) && \
  494. !defined(__OpenBSD__) && !defined(__CYGWIN__) && !defined(__MSYS__) && \
  495. !defined(__NetBSD__)
  496. uv_poll_t poll_handle;
  497. int fd;
  498. #if defined(_WIN32)
  499. fd = open("test/fixtures/empty_file", O_RDONLY);
  500. #else
  501. fd = open(".", O_RDONLY);
  502. #endif
  503. ASSERT(fd != -1);
  504. ASSERT(0 != uv_poll_init(uv_default_loop(), &poll_handle, fd));
  505. ASSERT(0 == close(fd));
  506. #endif
  507. MAKE_VALGRIND_HAPPY();
  508. return 0;
  509. }
  510. #ifdef __linux__
  511. TEST_IMPL(poll_nested_epoll) {
  512. uv_poll_t poll_handle;
  513. int fd;
  514. fd = epoll_create(1);
  515. ASSERT(fd != -1);
  516. ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd));
  517. ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
  518. ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT));
  519. uv_close((uv_handle_t*) &poll_handle, NULL);
  520. ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
  521. ASSERT(0 == close(fd));
  522. MAKE_VALGRIND_HAPPY();
  523. return 0;
  524. }
  525. #endif /* __linux__ */
  526. #ifdef UV_HAVE_KQUEUE
  527. TEST_IMPL(poll_nested_kqueue) {
  528. uv_poll_t poll_handle;
  529. int fd;
  530. fd = kqueue();
  531. ASSERT(fd != -1);
  532. ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd));
  533. ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
  534. ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT));
  535. uv_close((uv_handle_t*) &poll_handle, NULL);
  536. ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
  537. ASSERT(0 == close(fd));
  538. MAKE_VALGRIND_HAPPY();
  539. return 0;
  540. }
  541. #endif /* UV_HAVE_KQUEUE */