server.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # Copyright 2019 the gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """An example of cancelling requests in gRPC."""
  15. from __future__ import absolute_import
  16. from __future__ import division
  17. from __future__ import print_function
  18. import argparse
  19. from concurrent import futures
  20. import logging
  21. import threading
  22. import grpc
  23. import search
  24. from examples.python.cancellation import hash_name_pb2
  25. from examples.python.cancellation import hash_name_pb2_grpc
  26. _LOGGER = logging.getLogger(__name__)
  27. _SERVER_HOST = 'localhost'
  28. _DESCRIPTION = "A server for finding hashes similar to names."
  29. class HashFinder(hash_name_pb2_grpc.HashFinderServicer):
  30. def __init__(self, maximum_hashes):
  31. super(HashFinder, self).__init__()
  32. self._maximum_hashes = maximum_hashes
  33. def Find(self, request, context):
  34. stop_event = threading.Event()
  35. def on_rpc_done():
  36. _LOGGER.debug("Attempting to regain servicer thread.")
  37. stop_event.set()
  38. context.add_callback(on_rpc_done)
  39. candidates = []
  40. try:
  41. candidates = list(
  42. search.search(request.desired_name,
  43. request.ideal_hamming_distance, stop_event,
  44. self._maximum_hashes))
  45. except search.ResourceLimitExceededError:
  46. _LOGGER.info("Cancelling RPC due to exhausted resources.")
  47. context.cancel()
  48. _LOGGER.debug("Servicer thread returning.")
  49. if not candidates:
  50. return hash_name_pb2.HashNameResponse()
  51. return candidates[-1]
  52. def FindRange(self, request, context):
  53. stop_event = threading.Event()
  54. def on_rpc_done():
  55. _LOGGER.debug("Attempting to regain servicer thread.")
  56. stop_event.set()
  57. context.add_callback(on_rpc_done)
  58. secret_generator = search.search(
  59. request.desired_name,
  60. request.ideal_hamming_distance,
  61. stop_event,
  62. self._maximum_hashes,
  63. interesting_hamming_distance=request.interesting_hamming_distance)
  64. try:
  65. for candidate in secret_generator:
  66. yield candidate
  67. except search.ResourceLimitExceededError:
  68. _LOGGER.info("Cancelling RPC due to exhausted resources.")
  69. context.cancel()
  70. _LOGGER.debug("Regained servicer thread.")
  71. def _running_server(port, maximum_hashes):
  72. # We use only a single servicer thread here to demonstrate that, if managed
  73. # carefully, cancelled RPCs can need not continue occupying servicers
  74. # threads.
  75. server = grpc.server(futures.ThreadPoolExecutor(max_workers=1),
  76. maximum_concurrent_rpcs=1)
  77. hash_name_pb2_grpc.add_HashFinderServicer_to_server(
  78. HashFinder(maximum_hashes), server)
  79. address = '{}:{}'.format(_SERVER_HOST, port)
  80. actual_port = server.add_insecure_port(address)
  81. server.start()
  82. print("Server listening at '{}'".format(address))
  83. return server
  84. def main():
  85. parser = argparse.ArgumentParser(description=_DESCRIPTION)
  86. parser.add_argument('--port',
  87. type=int,
  88. default=50051,
  89. nargs='?',
  90. help='The port on which the server will listen.')
  91. parser.add_argument(
  92. '--maximum-hashes',
  93. type=int,
  94. default=1000000,
  95. nargs='?',
  96. help='The maximum number of hashes to search before cancelling.')
  97. args = parser.parse_args()
  98. server = _running_server(args.port, args.maximum_hashes)
  99. server.wait_for_termination()
  100. if __name__ == "__main__":
  101. logging.basicConfig()
  102. main()