123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- /*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- #import <XCTest/XCTest.h>
- #include "src/core/lib/iomgr/port.h"
- #ifdef GRPC_CFSTREAM
- #include <netinet/in.h>
- #include <grpc/grpc.h>
- #include <grpc/impl/codegen/sync.h>
- #include <grpc/support/sync.h>
- #include "src/core/lib/address_utils/parse_address.h"
- #include "src/core/lib/address_utils/sockaddr_utils.h"
- #include "src/core/lib/iomgr/endpoint.h"
- #include "src/core/lib/iomgr/resolve_address.h"
- #include "src/core/lib/iomgr/tcp_client.h"
- #include "src/core/lib/resource_quota/api.h"
- #include "test/core/util/test_config.h"
- #include <chrono>
- #include <future>
- static const int kConnectTimeout = 5;
- static const int kWriteTimeout = 5;
- static const int kReadTimeout = 5;
- static const int kBufferSize = 10000;
- static const int kRunLoopTimeout = 1;
- static void set_error_handle_promise(void *arg, grpc_error_handle error) {
- std::promise<grpc_error_handle> *p = static_cast<std::promise<grpc_error_handle> *>(arg);
- p->set_value(error);
- }
- static void init_event_closure(grpc_closure *closure,
- std::promise<grpc_error_handle> *error_handle) {
- GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast<void *>(error_handle),
- grpc_schedule_on_exec_ctx);
- }
- static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
- size_t buffer_len) {
- if (slices->length != buffer_len) {
- return false;
- }
- for (int i = 0; i < slices->count; i++) {
- grpc_slice slice = slices->slices[i];
- if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
- return false;
- }
- buffer += GRPC_SLICE_LENGTH(slice);
- }
- return true;
- }
- @interface CFStreamEndpointTests : XCTestCase
- @end
- @implementation CFStreamEndpointTests {
- grpc_endpoint *ep_;
- int svr_fd_;
- }
- - (BOOL)waitForEvent:(std::future<grpc_error_handle> *)event timeout:(int)timeout {
- grpc_core::ExecCtx::Get()->Flush();
- return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout;
- }
- + (void)setUp {
- grpc_init();
- }
- + (void)tearDown {
- grpc_shutdown();
- }
- - (void)setUp {
- self.continueAfterFailure = NO;
- // Set up CFStream connection before testing the endpoint
- grpc_core::ExecCtx exec_ctx;
- grpc_resolved_address resolved_addr;
- struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr.addr);
- int svr_fd;
- int r;
- std::promise<grpc_error_handle> connected_promise;
- grpc_closure done;
- gpr_log(GPR_DEBUG, "test_succeeds");
- GPR_ASSERT(grpc_string_to_sockaddr(&resolved_addr, "127.0.0.1", 0) == GRPC_ERROR_NONE);
- /* create a phony server */
- svr_fd = socket(AF_INET, SOCK_STREAM, 0);
- XCTAssertGreaterThanOrEqual(svr_fd, 0);
- XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len), 0);
- XCTAssertEqual(listen(svr_fd, 1), 0);
- /* connect to it */
- XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0);
- init_event_closure(&done, &connected_promise);
- const grpc_channel_args *args =
- grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
- nullptr);
- grpc_tcp_client_connect(&done, &ep_, nullptr, args, &resolved_addr,
- grpc_core::Timestamp::InfFuture());
- grpc_channel_args_destroy(args);
- /* await the connection */
- do {
- resolved_addr.len = sizeof(addr);
- r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
- reinterpret_cast<socklen_t *>(&resolved_addr.len));
- } while (r == -1 && errno == EINTR);
- XCTAssertGreaterThanOrEqual(r, 0, @"connection failed with return code %@ and errno %@", @(r),
- @(errno));
- svr_fd_ = r;
- /* wait for the connection callback to finish */
- std::future<grpc_error_handle> connected_future = connected_promise.get_future();
- XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES);
- XCTAssertEqual(connected_future.get(), GRPC_ERROR_NONE);
- }
- - (void)tearDown {
- grpc_core::ExecCtx exec_ctx;
- close(svr_fd_);
- grpc_endpoint_destroy(ep_);
- }
- - (void)testReadWrite {
- grpc_core::ExecCtx exec_ctx;
- grpc_closure read_done;
- grpc_slice_buffer read_slices;
- grpc_slice_buffer read_one_slice;
- std::promise<grpc_error_handle> write_promise;
- grpc_closure write_done;
- grpc_slice_buffer write_slices;
- grpc_slice slice;
- char write_buffer[kBufferSize];
- char read_buffer[kBufferSize];
- size_t recv_size = 0;
- grpc_slice_buffer_init(&write_slices);
- slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
- grpc_slice_buffer_add(&write_slices, slice);
- init_event_closure(&write_done, &write_promise);
- grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
- std::future<grpc_error_handle> write_future = write_promise.get_future();
- XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
- XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
- while (recv_size < kBufferSize) {
- ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
- XCTAssertGreaterThanOrEqual(size, 0);
- recv_size += size;
- }
- XCTAssertEqual(recv_size, kBufferSize);
- XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
- ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
- XCTAssertGreaterThanOrEqual(send_size, 0);
- grpc_slice_buffer_init(&read_slices);
- grpc_slice_buffer_init(&read_one_slice);
- while (read_slices.length < kBufferSize) {
- std::promise<grpc_error_handle> read_promise;
- init_event_closure(&read_done, &read_promise);
- grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false);
- std::future<grpc_error_handle> read_future = read_promise.get_future();
- XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
- XCTAssertEqual(read_future.get(), GRPC_ERROR_NONE);
- grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
- XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
- }
- XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
- grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
- grpc_slice_buffer_reset_and_unref(&read_slices);
- grpc_slice_buffer_reset_and_unref(&write_slices);
- grpc_slice_buffer_reset_and_unref(&read_one_slice);
- }
- - (void)testShutdownBeforeRead {
- grpc_core::ExecCtx exec_ctx;
- std::promise<grpc_error_handle> read_promise;
- grpc_closure read_done;
- grpc_slice_buffer read_slices;
- std::promise<grpc_error_handle> write_promise;
- grpc_closure write_done;
- grpc_slice_buffer write_slices;
- grpc_slice slice;
- char write_buffer[kBufferSize];
- char read_buffer[kBufferSize];
- size_t recv_size = 0;
- grpc_slice_buffer_init(&read_slices);
- init_event_closure(&read_done, &read_promise);
- grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
- grpc_slice_buffer_init(&write_slices);
- slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
- grpc_slice_buffer_add(&write_slices, slice);
- init_event_closure(&write_done, &write_promise);
- grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
- std::future<grpc_error_handle> write_future = write_promise.get_future();
- XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
- XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
- while (recv_size < kBufferSize) {
- ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
- XCTAssertGreaterThanOrEqual(size, 0);
- recv_size += size;
- }
- XCTAssertEqual(recv_size, kBufferSize);
- XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
- std::future<grpc_error_handle> read_future = read_promise.get_future();
- XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO);
- grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
- grpc_core::ExecCtx::Get()->Flush();
- XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
- XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
- grpc_slice_buffer_reset_and_unref(&read_slices);
- grpc_slice_buffer_reset_and_unref(&write_slices);
- }
- - (void)testRemoteClosed {
- grpc_core::ExecCtx exec_ctx;
- std::promise<grpc_error_handle> read_promise;
- grpc_closure read_done;
- grpc_slice_buffer read_slices;
- std::promise<grpc_error_handle> write_promise;
- grpc_closure write_done;
- grpc_slice_buffer write_slices;
- grpc_slice slice;
- char write_buffer[kBufferSize];
- char read_buffer[kBufferSize];
- size_t recv_size = 0;
- init_event_closure(&read_done, &read_promise);
- grpc_slice_buffer_init(&read_slices);
- grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
- grpc_slice_buffer_init(&write_slices);
- slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
- grpc_slice_buffer_add(&write_slices, slice);
- init_event_closure(&write_done, &write_promise);
- grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
- std::future<grpc_error_handle> write_future = write_promise.get_future();
- XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
- XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
- while (recv_size < kBufferSize) {
- ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
- XCTAssertGreaterThanOrEqual(size, 0);
- recv_size += size;
- }
- XCTAssertEqual(recv_size, kBufferSize);
- XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
- close(svr_fd_);
- std::future<grpc_error_handle> read_future = read_promise.get_future();
- XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
- XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
- grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
- grpc_slice_buffer_reset_and_unref(&read_slices);
- grpc_slice_buffer_reset_and_unref(&write_slices);
- }
- - (void)testRemoteReset {
- grpc_core::ExecCtx exec_ctx;
- std::promise<grpc_error_handle> read_promise;
- grpc_closure read_done;
- grpc_slice_buffer read_slices;
- init_event_closure(&read_done, &read_promise);
- grpc_slice_buffer_init(&read_slices);
- grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
- struct linger so_linger;
- so_linger.l_onoff = 1;
- so_linger.l_linger = 0;
- setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
- close(svr_fd_);
- std::future<grpc_error_handle> read_future = read_promise.get_future();
- XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
- XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
- grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
- grpc_slice_buffer_reset_and_unref(&read_slices);
- }
- @end
- #else // GRPC_CFSTREAM
- // Phony test suite
- @interface CFStreamEndpointTests : XCTestCase
- @end
- @implementation CFStreamEndpointTests
- - (void)setUp {
- [super setUp];
- }
- - (void)tearDown {
- [super tearDown];
- }
- @end
- #endif // GRPC_CFSTREAM
|