123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538 |
- /*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- #include "src/core/lib/iomgr/port.h"
- // This test won't work except with posix sockets enabled
- #ifdef GRPC_POSIX_SOCKET_EV
- #include <ctype.h>
- #include <errno.h>
- #include <fcntl.h>
- #include <netinet/in.h>
- #include <poll.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <sys/time.h>
- #include <unistd.h>
- #include <grpc/grpc.h>
- #include <grpc/support/alloc.h>
- #include <grpc/support/log.h>
- #include <grpc/support/sync.h>
- #include <grpc/support/time.h>
- #include "src/core/lib/iomgr/ev_posix.h"
- #include "src/core/lib/iomgr/iomgr.h"
- #include "src/core/lib/iomgr/socket_utils_posix.h"
- #include "test/core/util/test_config.h"
- static gpr_mu* g_mu;
- static grpc_pollset* g_pollset;
- /* buffer size used to send and receive data.
- 1024 is the minimal value to set TCP send and receive buffer. */
- #define BUF_SIZE 1024
- /* Create a test socket with the right properties for testing.
- port is the TCP port to listen or connect to.
- Return a socket FD and sockaddr_in. */
- static void create_test_socket(int port, int* socket_fd,
- struct sockaddr_in* sin) {
- int fd;
- int one = 1;
- int buffer_size_bytes = BUF_SIZE;
- int flags;
- fd = socket(AF_INET, SOCK_STREAM, 0);
- setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
- /* Reset the size of socket send buffer to the minimal value to facilitate
- buffer filling up and triggering notify_on_write */
- GPR_ASSERT(grpc_set_socket_sndbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_set_socket_rcvbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
- /* Make fd non-blocking */
- flags = fcntl(fd, F_GETFL, 0);
- GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
- *socket_fd = fd;
- /* Use local address for test */
- sin->sin_family = AF_INET;
- sin->sin_addr.s_addr = htonl(0x7f000001);
- GPR_ASSERT(port >= 0 && port < 65536);
- sin->sin_port = htons(static_cast<uint16_t>(port));
- }
- /* Phony gRPC callback */
- void no_op_cb(void* /*arg*/, int /*success*/) {}
- /* =======An upload server to test notify_on_read===========
- The server simply reads and counts a stream of bytes. */
- /* An upload server. */
- typedef struct {
- grpc_fd* em_fd; /* listening fd */
- ssize_t read_bytes_total; /* total number of received bytes */
- int done; /* set to 1 when a server finishes serving */
- grpc_closure listen_closure;
- } server;
- static void server_init(server* sv) {
- sv->read_bytes_total = 0;
- sv->done = 0;
- }
- /* An upload session.
- Created when a new upload request arrives in the server. */
- typedef struct {
- server* sv; /* not owned by a single session */
- grpc_fd* em_fd; /* fd to read upload bytes */
- char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
- grpc_closure session_read_closure;
- } session;
- /* Called when an upload session can be safely shutdown.
- Close session FD and start to shutdown listen FD. */
- static void session_shutdown_cb(void* arg, /*session */
- bool /*success*/) {
- session* se = static_cast<session*>(arg);
- server* sv = se->sv;
- grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
- gpr_free(se);
- /* Start to shutdown listen fd. */
- grpc_fd_shutdown(sv->em_fd,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb"));
- }
- /* Called when data become readable in a session. */
- static void session_read_cb(void* arg, /*session */
- grpc_error_handle error) {
- session* se = static_cast<session*>(arg);
- int fd = grpc_fd_wrapped_fd(se->em_fd);
- ssize_t read_once = 0;
- ssize_t read_total = 0;
- if (error != GRPC_ERROR_NONE) {
- session_shutdown_cb(arg, true);
- return;
- }
- do {
- read_once = read(fd, se->read_buf, BUF_SIZE);
- if (read_once > 0) read_total += read_once;
- } while (read_once > 0);
- se->sv->read_bytes_total += read_total;
- /* read() returns 0 to indicate the TCP connection was closed by the client.
- read(fd, read_buf, 0) also returns 0 which should never be called as such.
- It is possible to read nothing due to spurious edge event or data has
- been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
- if (read_once == 0) {
- session_shutdown_cb(arg, true);
- } else if (read_once == -1) {
- if (errno == EAGAIN) {
- /* An edge triggered event is cached in the kernel until next poll.
- In the current single thread implementation, session_read_cb is called
- in the polling thread, such that polling only happens after this
- callback, and will catch read edge event if data is available again
- before notify_on_read.
- TODO(chenw): in multi-threaded version, callback and polling can be
- run in different threads. polling may catch a persist read edge event
- before notify_on_read is called. */
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
- } else {
- gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
- abort();
- }
- }
- }
- /* Called when the listen FD can be safely shutdown.
- Close listen FD and signal that server can be shutdown. */
- static void listen_shutdown_cb(void* arg /*server*/, int /*success*/) {
- server* sv = static_cast<server*>(arg);
- grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
- gpr_mu_lock(g_mu);
- sv->done = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
- gpr_mu_unlock(g_mu);
- }
- /* Called when a new TCP connection request arrives in the listening port. */
- static void listen_cb(void* arg, /*=sv_arg*/
- grpc_error_handle error) {
- server* sv = static_cast<server*>(arg);
- int fd;
- int flags;
- session* se;
- struct sockaddr_storage ss;
- socklen_t slen = sizeof(ss);
- grpc_fd* listen_em_fd = sv->em_fd;
- if (error != GRPC_ERROR_NONE) {
- listen_shutdown_cb(arg, 1);
- return;
- }
- fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
- reinterpret_cast<struct sockaddr*>(&ss), &slen);
- GPR_ASSERT(fd >= 0);
- GPR_ASSERT(fd < FD_SETSIZE);
- flags = fcntl(fd, F_GETFL, 0);
- fcntl(fd, F_SETFL, flags | O_NONBLOCK);
- se = static_cast<session*>(gpr_malloc(sizeof(*se)));
- se->sv = sv;
- se->em_fd = grpc_fd_create(fd, "listener", false);
- grpc_pollset_add_fd(g_pollset, se->em_fd);
- GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
- grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
- grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
- }
- /* Max number of connections pending to be accepted by listen(). */
- #define MAX_NUM_FD 1024
- /* Start a test server, return the TCP listening port bound to listen_fd.
- listen_cb() is registered to be interested in reading from listen_fd.
- When connection request arrives, listen_cb() is called to accept the
- connection request. */
- static int server_start(server* sv) {
- int port = 0;
- int fd;
- struct sockaddr_in sin;
- socklen_t addr_len;
- create_test_socket(port, &fd, &sin);
- addr_len = sizeof(sin);
- GPR_ASSERT(bind(fd, (struct sockaddr*)&sin, addr_len) == 0);
- GPR_ASSERT(getsockname(fd, (struct sockaddr*)&sin, &addr_len) == 0);
- port = ntohs(sin.sin_port);
- GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
- sv->em_fd = grpc_fd_create(fd, "server", false);
- grpc_pollset_add_fd(g_pollset, sv->em_fd);
- /* Register to be interested in reading from listen_fd. */
- GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
- grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
- return port;
- }
- /* Wait and shutdown a sever. */
- static void server_wait_and_shutdown(server* sv) {
- gpr_mu_lock(g_mu);
- while (!sv->done) {
- grpc_core::ExecCtx exec_ctx;
- grpc_pollset_worker* worker = nullptr;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker,
- grpc_core::Timestamp::InfFuture())));
- gpr_mu_unlock(g_mu);
- gpr_mu_lock(g_mu);
- }
- gpr_mu_unlock(g_mu);
- }
- /* ===An upload client to test notify_on_write=== */
- /* Client write buffer size */
- #define CLIENT_WRITE_BUF_SIZE 10
- /* Total number of times that the client fills up the write buffer */
- #define CLIENT_TOTAL_WRITE_CNT 3
- /* An upload client. */
- typedef struct {
- grpc_fd* em_fd;
- char write_buf[CLIENT_WRITE_BUF_SIZE];
- ssize_t write_bytes_total;
- /* Number of times that the client fills up the write buffer and calls
- notify_on_write to schedule another write. */
- int client_write_cnt;
- int done; /* set to 1 when a client finishes sending */
- grpc_closure write_closure;
- } client;
- static void client_init(client* cl) {
- memset(cl->write_buf, 0, sizeof(cl->write_buf));
- cl->write_bytes_total = 0;
- cl->client_write_cnt = 0;
- cl->done = 0;
- }
- /* Called when a client upload session is ready to shutdown. */
- static void client_session_shutdown_cb(void* arg /*client*/, int /*success*/) {
- client* cl = static_cast<client*>(arg);
- grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
- cl->done = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
- }
- /* Write as much as possible, then register notify_on_write. */
- static void client_session_write(void* arg, /*client */
- grpc_error_handle error) {
- client* cl = static_cast<client*>(arg);
- int fd = grpc_fd_wrapped_fd(cl->em_fd);
- ssize_t write_once = 0;
- if (error != GRPC_ERROR_NONE) {
- gpr_mu_lock(g_mu);
- client_session_shutdown_cb(arg, 1);
- gpr_mu_unlock(g_mu);
- return;
- }
- do {
- write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
- if (write_once > 0) cl->write_bytes_total += write_once;
- } while (write_once > 0);
- if (errno == EAGAIN) {
- gpr_mu_lock(g_mu);
- if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
- GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
- grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
- cl->client_write_cnt++;
- } else {
- client_session_shutdown_cb(arg, 1);
- }
- gpr_mu_unlock(g_mu);
- } else {
- gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
- abort();
- }
- }
- /* Start a client to send a stream of bytes. */
- static void client_start(client* cl, int port) {
- int fd;
- struct sockaddr_in sin;
- create_test_socket(port, &fd, &sin);
- if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
- -1) {
- if (errno == EINPROGRESS) {
- struct pollfd pfd;
- pfd.fd = fd;
- pfd.events = POLLOUT;
- pfd.revents = 0;
- if (poll(&pfd, 1, -1) == -1) {
- gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
- abort();
- }
- } else {
- gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
- abort();
- }
- }
- cl->em_fd = grpc_fd_create(fd, "client", false);
- grpc_pollset_add_fd(g_pollset, cl->em_fd);
- client_session_write(cl, GRPC_ERROR_NONE);
- }
- /* Wait for the signal to shutdown a client. */
- static void client_wait_and_shutdown(client* cl) {
- gpr_mu_lock(g_mu);
- while (!cl->done) {
- grpc_pollset_worker* worker = nullptr;
- grpc_core::ExecCtx exec_ctx;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker,
- grpc_core::Timestamp::InfFuture())));
- gpr_mu_unlock(g_mu);
- gpr_mu_lock(g_mu);
- }
- gpr_mu_unlock(g_mu);
- }
- /* Test grpc_fd. Start an upload server and client, upload a stream of
- bytes from the client to the server, and verify that the total number of
- sent bytes is equal to the total number of received bytes. */
- static void test_grpc_fd(void) {
- server sv;
- client cl;
- int port;
- grpc_core::ExecCtx exec_ctx;
- server_init(&sv);
- port = server_start(&sv);
- client_init(&cl);
- client_start(&cl, port);
- client_wait_and_shutdown(&cl);
- server_wait_and_shutdown(&sv);
- GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
- gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total);
- }
- typedef struct fd_change_data {
- grpc_iomgr_cb_func cb_that_ran;
- } fd_change_data;
- void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
- void destroy_change_data(fd_change_data* /*fdc*/) {}
- static void first_read_callback(void* arg /* fd_change_data */,
- grpc_error_handle /*error*/) {
- fd_change_data* fdc = static_cast<fd_change_data*>(arg);
- gpr_mu_lock(g_mu);
- fdc->cb_that_ran = first_read_callback;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
- gpr_mu_unlock(g_mu);
- }
- static void second_read_callback(void* arg /* fd_change_data */,
- grpc_error_handle /*error*/) {
- fd_change_data* fdc = static_cast<fd_change_data*>(arg);
- gpr_mu_lock(g_mu);
- fdc->cb_that_ran = second_read_callback;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
- gpr_mu_unlock(g_mu);
- }
- /* Test that changing the callback we use for notify_on_read actually works.
- Note that we have two different but almost identical callbacks above -- the
- point is to have two different function pointers and two different data
- pointers and make sure that changing both really works. */
- static void test_grpc_fd_change(void) {
- grpc_fd* em_fd;
- fd_change_data a, b;
- int flags;
- int sv[2];
- char data;
- ssize_t result;
- grpc_closure first_closure;
- grpc_closure second_closure;
- grpc_core::ExecCtx exec_ctx;
- GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
- grpc_schedule_on_exec_ctx);
- init_change_data(&a);
- init_change_data(&b);
- GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
- flags = fcntl(sv[0], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
- flags = fcntl(sv[1], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
- em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
- grpc_pollset_add_fd(g_pollset, em_fd);
- /* Register the first callback, then make its FD readable */
- grpc_fd_notify_on_read(em_fd, &first_closure);
- data = 0;
- result = write(sv[1], &data, 1);
- GPR_ASSERT(result == 1);
- /* And now wait for it to run. */
- gpr_mu_lock(g_mu);
- while (a.cb_that_ran == nullptr) {
- grpc_pollset_worker* worker = nullptr;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker,
- grpc_core::Timestamp::InfFuture())));
- gpr_mu_unlock(g_mu);
- gpr_mu_lock(g_mu);
- }
- GPR_ASSERT(a.cb_that_ran == first_read_callback);
- gpr_mu_unlock(g_mu);
- /* And drain the socket so we can generate a new read edge */
- result = read(sv[0], &data, 1);
- GPR_ASSERT(result == 1);
- /* Now register a second callback with distinct change data, and do the same
- thing again. */
- grpc_fd_notify_on_read(em_fd, &second_closure);
- data = 0;
- result = write(sv[1], &data, 1);
- GPR_ASSERT(result == 1);
- gpr_mu_lock(g_mu);
- while (b.cb_that_ran == nullptr) {
- grpc_pollset_worker* worker = nullptr;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker,
- grpc_core::Timestamp::InfFuture())));
- gpr_mu_unlock(g_mu);
- gpr_mu_lock(g_mu);
- }
- /* Except now we verify that second_read_callback ran instead */
- GPR_ASSERT(b.cb_that_ran == second_read_callback);
- gpr_mu_unlock(g_mu);
- grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
- destroy_change_data(&a);
- destroy_change_data(&b);
- close(sv[1]);
- }
- static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
- }
- int main(int argc, char** argv) {
- grpc_closure destroyed;
- grpc::testing::TestEnvironment env(argc, argv);
- grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(g_pollset, &g_mu);
- test_grpc_fd();
- test_grpc_fd_change();
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
- grpc_core::ExecCtx::Get()->Flush();
- gpr_free(g_pollset);
- }
- grpc_shutdown();
- return 0;
- }
- #else /* GRPC_POSIX_SOCKET_EV */
- int main(int argc, char** argv) { return 1; }
- #endif /* GRPC_POSIX_SOCKET_EV */
|