CFStreamEndpointTests.mm 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #import <XCTest/XCTest.h>
  19. #include "src/core/lib/iomgr/port.h"
  20. #ifdef GRPC_CFSTREAM
  21. #include <netinet/in.h>
  22. #include <grpc/grpc.h>
  23. #include <grpc/impl/codegen/sync.h>
  24. #include <grpc/support/sync.h>
  25. #include "src/core/lib/address_utils/parse_address.h"
  26. #include "src/core/lib/address_utils/sockaddr_utils.h"
  27. #include "src/core/lib/iomgr/endpoint.h"
  28. #include "src/core/lib/iomgr/resolve_address.h"
  29. #include "src/core/lib/iomgr/tcp_client.h"
  30. #include "src/core/lib/resource_quota/api.h"
  31. #include "test/core/util/test_config.h"
  32. #include <chrono>
  33. #include <future>
  34. static const int kConnectTimeout = 5;
  35. static const int kWriteTimeout = 5;
  36. static const int kReadTimeout = 5;
  37. static const int kBufferSize = 10000;
  38. static const int kRunLoopTimeout = 1;
  39. static void set_error_handle_promise(void *arg, grpc_error_handle error) {
  40. std::promise<grpc_error_handle> *p = static_cast<std::promise<grpc_error_handle> *>(arg);
  41. p->set_value(error);
  42. }
  43. static void init_event_closure(grpc_closure *closure,
  44. std::promise<grpc_error_handle> *error_handle) {
  45. GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast<void *>(error_handle),
  46. grpc_schedule_on_exec_ctx);
  47. }
  48. static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
  49. size_t buffer_len) {
  50. if (slices->length != buffer_len) {
  51. return false;
  52. }
  53. for (int i = 0; i < slices->count; i++) {
  54. grpc_slice slice = slices->slices[i];
  55. if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
  56. return false;
  57. }
  58. buffer += GRPC_SLICE_LENGTH(slice);
  59. }
  60. return true;
  61. }
  62. @interface CFStreamEndpointTests : XCTestCase
  63. @end
  64. @implementation CFStreamEndpointTests {
  65. grpc_endpoint *ep_;
  66. int svr_fd_;
  67. }
  68. - (BOOL)waitForEvent:(std::future<grpc_error_handle> *)event timeout:(int)timeout {
  69. grpc_core::ExecCtx::Get()->Flush();
  70. return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout;
  71. }
  72. + (void)setUp {
  73. grpc_init();
  74. }
  75. + (void)tearDown {
  76. grpc_shutdown();
  77. }
  78. - (void)setUp {
  79. self.continueAfterFailure = NO;
  80. // Set up CFStream connection before testing the endpoint
  81. grpc_core::ExecCtx exec_ctx;
  82. grpc_resolved_address resolved_addr;
  83. struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr.addr);
  84. int svr_fd;
  85. int r;
  86. std::promise<grpc_error_handle> connected_promise;
  87. grpc_closure done;
  88. gpr_log(GPR_DEBUG, "test_succeeds");
  89. GPR_ASSERT(grpc_string_to_sockaddr(&resolved_addr, "127.0.0.1", 0) == GRPC_ERROR_NONE);
  90. /* create a phony server */
  91. svr_fd = socket(AF_INET, SOCK_STREAM, 0);
  92. XCTAssertGreaterThanOrEqual(svr_fd, 0);
  93. XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len), 0);
  94. XCTAssertEqual(listen(svr_fd, 1), 0);
  95. /* connect to it */
  96. XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0);
  97. init_event_closure(&done, &connected_promise);
  98. const grpc_channel_args *args =
  99. grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
  100. nullptr);
  101. grpc_tcp_client_connect(&done, &ep_, nullptr, args, &resolved_addr,
  102. grpc_core::Timestamp::InfFuture());
  103. grpc_channel_args_destroy(args);
  104. /* await the connection */
  105. do {
  106. resolved_addr.len = sizeof(addr);
  107. r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
  108. reinterpret_cast<socklen_t *>(&resolved_addr.len));
  109. } while (r == -1 && errno == EINTR);
  110. XCTAssertGreaterThanOrEqual(r, 0, @"connection failed with return code %@ and errno %@", @(r),
  111. @(errno));
  112. svr_fd_ = r;
  113. /* wait for the connection callback to finish */
  114. std::future<grpc_error_handle> connected_future = connected_promise.get_future();
  115. XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES);
  116. XCTAssertEqual(connected_future.get(), GRPC_ERROR_NONE);
  117. }
  118. - (void)tearDown {
  119. grpc_core::ExecCtx exec_ctx;
  120. close(svr_fd_);
  121. grpc_endpoint_destroy(ep_);
  122. }
  123. - (void)testReadWrite {
  124. grpc_core::ExecCtx exec_ctx;
  125. grpc_closure read_done;
  126. grpc_slice_buffer read_slices;
  127. grpc_slice_buffer read_one_slice;
  128. std::promise<grpc_error_handle> write_promise;
  129. grpc_closure write_done;
  130. grpc_slice_buffer write_slices;
  131. grpc_slice slice;
  132. char write_buffer[kBufferSize];
  133. char read_buffer[kBufferSize];
  134. size_t recv_size = 0;
  135. grpc_slice_buffer_init(&write_slices);
  136. slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
  137. grpc_slice_buffer_add(&write_slices, slice);
  138. init_event_closure(&write_done, &write_promise);
  139. grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
  140. std::future<grpc_error_handle> write_future = write_promise.get_future();
  141. XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
  142. XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
  143. while (recv_size < kBufferSize) {
  144. ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
  145. XCTAssertGreaterThanOrEqual(size, 0);
  146. recv_size += size;
  147. }
  148. XCTAssertEqual(recv_size, kBufferSize);
  149. XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
  150. ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
  151. XCTAssertGreaterThanOrEqual(send_size, 0);
  152. grpc_slice_buffer_init(&read_slices);
  153. grpc_slice_buffer_init(&read_one_slice);
  154. while (read_slices.length < kBufferSize) {
  155. std::promise<grpc_error_handle> read_promise;
  156. init_event_closure(&read_done, &read_promise);
  157. grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false);
  158. std::future<grpc_error_handle> read_future = read_promise.get_future();
  159. XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
  160. XCTAssertEqual(read_future.get(), GRPC_ERROR_NONE);
  161. grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
  162. XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
  163. }
  164. XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
  165. grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
  166. grpc_slice_buffer_reset_and_unref(&read_slices);
  167. grpc_slice_buffer_reset_and_unref(&write_slices);
  168. grpc_slice_buffer_reset_and_unref(&read_one_slice);
  169. }
  170. - (void)testShutdownBeforeRead {
  171. grpc_core::ExecCtx exec_ctx;
  172. std::promise<grpc_error_handle> read_promise;
  173. grpc_closure read_done;
  174. grpc_slice_buffer read_slices;
  175. std::promise<grpc_error_handle> write_promise;
  176. grpc_closure write_done;
  177. grpc_slice_buffer write_slices;
  178. grpc_slice slice;
  179. char write_buffer[kBufferSize];
  180. char read_buffer[kBufferSize];
  181. size_t recv_size = 0;
  182. grpc_slice_buffer_init(&read_slices);
  183. init_event_closure(&read_done, &read_promise);
  184. grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
  185. grpc_slice_buffer_init(&write_slices);
  186. slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
  187. grpc_slice_buffer_add(&write_slices, slice);
  188. init_event_closure(&write_done, &write_promise);
  189. grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
  190. std::future<grpc_error_handle> write_future = write_promise.get_future();
  191. XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
  192. XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
  193. while (recv_size < kBufferSize) {
  194. ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
  195. XCTAssertGreaterThanOrEqual(size, 0);
  196. recv_size += size;
  197. }
  198. XCTAssertEqual(recv_size, kBufferSize);
  199. XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
  200. std::future<grpc_error_handle> read_future = read_promise.get_future();
  201. XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO);
  202. grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
  203. grpc_core::ExecCtx::Get()->Flush();
  204. XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
  205. XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
  206. grpc_slice_buffer_reset_and_unref(&read_slices);
  207. grpc_slice_buffer_reset_and_unref(&write_slices);
  208. }
  209. - (void)testRemoteClosed {
  210. grpc_core::ExecCtx exec_ctx;
  211. std::promise<grpc_error_handle> read_promise;
  212. grpc_closure read_done;
  213. grpc_slice_buffer read_slices;
  214. std::promise<grpc_error_handle> write_promise;
  215. grpc_closure write_done;
  216. grpc_slice_buffer write_slices;
  217. grpc_slice slice;
  218. char write_buffer[kBufferSize];
  219. char read_buffer[kBufferSize];
  220. size_t recv_size = 0;
  221. init_event_closure(&read_done, &read_promise);
  222. grpc_slice_buffer_init(&read_slices);
  223. grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
  224. grpc_slice_buffer_init(&write_slices);
  225. slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
  226. grpc_slice_buffer_add(&write_slices, slice);
  227. init_event_closure(&write_done, &write_promise);
  228. grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
  229. std::future<grpc_error_handle> write_future = write_promise.get_future();
  230. XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
  231. XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
  232. while (recv_size < kBufferSize) {
  233. ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
  234. XCTAssertGreaterThanOrEqual(size, 0);
  235. recv_size += size;
  236. }
  237. XCTAssertEqual(recv_size, kBufferSize);
  238. XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
  239. close(svr_fd_);
  240. std::future<grpc_error_handle> read_future = read_promise.get_future();
  241. XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
  242. XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
  243. grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
  244. grpc_slice_buffer_reset_and_unref(&read_slices);
  245. grpc_slice_buffer_reset_and_unref(&write_slices);
  246. }
  247. - (void)testRemoteReset {
  248. grpc_core::ExecCtx exec_ctx;
  249. std::promise<grpc_error_handle> read_promise;
  250. grpc_closure read_done;
  251. grpc_slice_buffer read_slices;
  252. init_event_closure(&read_done, &read_promise);
  253. grpc_slice_buffer_init(&read_slices);
  254. grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
  255. struct linger so_linger;
  256. so_linger.l_onoff = 1;
  257. so_linger.l_linger = 0;
  258. setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
  259. close(svr_fd_);
  260. std::future<grpc_error_handle> read_future = read_promise.get_future();
  261. XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
  262. XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
  263. grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
  264. grpc_slice_buffer_reset_and_unref(&read_slices);
  265. }
  266. @end
  267. #else // GRPC_CFSTREAM
  268. // Phony test suite
  269. @interface CFStreamEndpointTests : XCTestCase
  270. @end
  271. @implementation CFStreamEndpointTests
  272. - (void)setUp {
  273. [super setUp];
  274. }
  275. - (void)tearDown {
  276. [super tearDown];
  277. }
  278. @end
  279. #endif // GRPC_CFSTREAM