/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #import #include "src/core/lib/iomgr/port.h" #ifdef GRPC_CFSTREAM #include #include #include #include #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/resource_quota/api.h" #include "test/core/util/test_config.h" #include #include static const int kConnectTimeout = 5; static const int kWriteTimeout = 5; static const int kReadTimeout = 5; static const int kBufferSize = 10000; static const int kRunLoopTimeout = 1; static void set_error_handle_promise(void *arg, grpc_error_handle error) { std::promise *p = static_cast *>(arg); p->set_value(error); } static void init_event_closure(grpc_closure *closure, std::promise *error_handle) { GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast(error_handle), grpc_schedule_on_exec_ctx); } static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer, size_t buffer_len) { if (slices->length != buffer_len) { return false; } for (int i = 0; i < slices->count; i++) { grpc_slice slice = slices->slices[i]; if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) { return false; } buffer += GRPC_SLICE_LENGTH(slice); } return true; } @interface CFStreamEndpointTests : XCTestCase @end @implementation CFStreamEndpointTests { grpc_endpoint *ep_; int svr_fd_; } - (BOOL)waitForEvent:(std::future *)event timeout:(int)timeout { grpc_core::ExecCtx::Get()->Flush(); return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout; } + (void)setUp { grpc_init(); } + (void)tearDown { grpc_shutdown(); } - (void)setUp { self.continueAfterFailure = NO; // Set up CFStream connection before testing the endpoint grpc_core::ExecCtx exec_ctx; grpc_resolved_address resolved_addr; struct sockaddr_in *addr = reinterpret_cast(resolved_addr.addr); int svr_fd; int r; std::promise connected_promise; grpc_closure done; gpr_log(GPR_DEBUG, "test_succeeds"); GPR_ASSERT(grpc_string_to_sockaddr(&resolved_addr, "127.0.0.1", 0) == GRPC_ERROR_NONE); /* create a phony server */ svr_fd = socket(AF_INET, SOCK_STREAM, 0); XCTAssertGreaterThanOrEqual(svr_fd, 0); XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len), 0); XCTAssertEqual(listen(svr_fd, 1), 0); /* connect to it */ XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0); init_event_closure(&done, &connected_promise); const grpc_channel_args *args = grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs( nullptr); grpc_tcp_client_connect(&done, &ep_, nullptr, args, &resolved_addr, grpc_core::Timestamp::InfFuture()); grpc_channel_args_destroy(args); /* await the connection */ do { resolved_addr.len = sizeof(addr); r = accept(svr_fd, reinterpret_cast(addr), reinterpret_cast(&resolved_addr.len)); } while (r == -1 && errno == EINTR); XCTAssertGreaterThanOrEqual(r, 0, @"connection failed with return code %@ and errno %@", @(r), @(errno)); svr_fd_ = r; /* wait for the connection callback to finish */ std::future connected_future = connected_promise.get_future(); XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES); XCTAssertEqual(connected_future.get(), GRPC_ERROR_NONE); } - (void)tearDown { grpc_core::ExecCtx exec_ctx; close(svr_fd_); grpc_endpoint_destroy(ep_); } - (void)testReadWrite { grpc_core::ExecCtx exec_ctx; grpc_closure read_done; grpc_slice_buffer read_slices; grpc_slice_buffer read_one_slice; std::promise write_promise; grpc_closure write_done; grpc_slice_buffer write_slices; grpc_slice slice; char write_buffer[kBufferSize]; char read_buffer[kBufferSize]; size_t recv_size = 0; grpc_slice_buffer_init(&write_slices); slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); grpc_slice_buffer_add(&write_slices, slice); init_event_closure(&write_done, &write_promise); grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr); std::future write_future = write_promise.get_future(); XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE); while (recv_size < kBufferSize) { ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); XCTAssertGreaterThanOrEqual(size, 0); recv_size += size; } XCTAssertEqual(recv_size, kBufferSize); XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0); XCTAssertGreaterThanOrEqual(send_size, 0); grpc_slice_buffer_init(&read_slices); grpc_slice_buffer_init(&read_one_slice); while (read_slices.length < kBufferSize) { std::promise read_promise; init_event_closure(&read_done, &read_promise); grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false); std::future read_future = read_promise.get_future(); XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); XCTAssertEqual(read_future.get(), GRPC_ERROR_NONE); grpc_slice_buffer_move_into(&read_one_slice, &read_slices); XCTAssertLessThanOrEqual(read_slices.length, kBufferSize); } XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize)); grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE); grpc_slice_buffer_reset_and_unref(&read_slices); grpc_slice_buffer_reset_and_unref(&write_slices); grpc_slice_buffer_reset_and_unref(&read_one_slice); } - (void)testShutdownBeforeRead { grpc_core::ExecCtx exec_ctx; std::promise read_promise; grpc_closure read_done; grpc_slice_buffer read_slices; std::promise write_promise; grpc_closure write_done; grpc_slice_buffer write_slices; grpc_slice slice; char write_buffer[kBufferSize]; char read_buffer[kBufferSize]; size_t recv_size = 0; grpc_slice_buffer_init(&read_slices); init_event_closure(&read_done, &read_promise); grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); grpc_slice_buffer_init(&write_slices); slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); grpc_slice_buffer_add(&write_slices, slice); init_event_closure(&write_done, &write_promise); grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr); std::future write_future = write_promise.get_future(); XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE); while (recv_size < kBufferSize) { ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); XCTAssertGreaterThanOrEqual(size, 0); recv_size += size; } XCTAssertEqual(recv_size, kBufferSize); XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); std::future read_future = read_promise.get_future(); XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO); grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE); grpc_slice_buffer_reset_and_unref(&read_slices); grpc_slice_buffer_reset_and_unref(&write_slices); } - (void)testRemoteClosed { grpc_core::ExecCtx exec_ctx; std::promise read_promise; grpc_closure read_done; grpc_slice_buffer read_slices; std::promise write_promise; grpc_closure write_done; grpc_slice_buffer write_slices; grpc_slice slice; char write_buffer[kBufferSize]; char read_buffer[kBufferSize]; size_t recv_size = 0; init_event_closure(&read_done, &read_promise); grpc_slice_buffer_init(&read_slices); grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); grpc_slice_buffer_init(&write_slices); slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); grpc_slice_buffer_add(&write_slices, slice); init_event_closure(&write_done, &write_promise); grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr); std::future write_future = write_promise.get_future(); XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE); while (recv_size < kBufferSize) { ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); XCTAssertGreaterThanOrEqual(size, 0); recv_size += size; } XCTAssertEqual(recv_size, kBufferSize); XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); close(svr_fd_); std::future read_future = read_promise.get_future(); XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE); grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE); grpc_slice_buffer_reset_and_unref(&read_slices); grpc_slice_buffer_reset_and_unref(&write_slices); } - (void)testRemoteReset { grpc_core::ExecCtx exec_ctx; std::promise read_promise; grpc_closure read_done; grpc_slice_buffer read_slices; init_event_closure(&read_done, &read_promise); grpc_slice_buffer_init(&read_slices); grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false); struct linger so_linger; so_linger.l_onoff = 1; so_linger.l_linger = 0; setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); close(svr_fd_); std::future read_future = read_promise.get_future(); XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE); grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE); grpc_slice_buffer_reset_and_unref(&read_slices); } @end #else // GRPC_CFSTREAM // Phony test suite @interface CFStreamEndpointTests : XCTestCase @end @implementation CFStreamEndpointTests - (void)setUp { [super setUp]; } - (void)tearDown { [super tearDown]; } @end #endif // GRPC_CFSTREAM