client.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. # Copyright 2019 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """An example of multiprocessing concurrency with gRPC."""
  15. from __future__ import absolute_import
  16. from __future__ import division
  17. from __future__ import print_function
  18. import argparse
  19. import atexit
  20. import logging
  21. import multiprocessing
  22. import operator
  23. import sys
  24. import grpc
  25. import prime_pb2
  26. import prime_pb2_grpc
  27. _PROCESS_COUNT = 8
  28. _MAXIMUM_CANDIDATE = 10000
  29. # Each worker process initializes a single channel after forking.
  30. # It's regrettable, but to ensure that each subprocess only has to instantiate
  31. # a single channel to be reused across all RPCs, we use globals.
  32. _worker_channel_singleton = None
  33. _worker_stub_singleton = None
  34. _LOGGER = logging.getLogger(__name__)
  35. def _shutdown_worker():
  36. _LOGGER.info('Shutting worker process down.')
  37. if _worker_channel_singleton is not None:
  38. _worker_channel_singleton.stop()
  39. def _initialize_worker(server_address):
  40. global _worker_channel_singleton # pylint: disable=global-statement
  41. global _worker_stub_singleton # pylint: disable=global-statement
  42. _LOGGER.info('Initializing worker process.')
  43. _worker_channel_singleton = grpc.insecure_channel(server_address)
  44. _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
  45. _worker_channel_singleton)
  46. atexit.register(_shutdown_worker)
  47. def _run_worker_query(primality_candidate):
  48. _LOGGER.info('Checking primality of %s.', primality_candidate)
  49. return _worker_stub_singleton.check(
  50. prime_pb2.PrimeCandidate(candidate=primality_candidate))
  51. def _calculate_primes(server_address):
  52. worker_pool = multiprocessing.Pool(processes=_PROCESS_COUNT,
  53. initializer=_initialize_worker,
  54. initargs=(server_address,))
  55. check_range = range(2, _MAXIMUM_CANDIDATE)
  56. primality = worker_pool.map(_run_worker_query, check_range)
  57. primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
  58. return tuple(primes)
  59. def main():
  60. msg = 'Determine the primality of the first {} integers.'.format(
  61. _MAXIMUM_CANDIDATE)
  62. parser = argparse.ArgumentParser(description=msg)
  63. parser.add_argument('server_address',
  64. help='The address of the server (e.g. localhost:50051)')
  65. args = parser.parse_args()
  66. primes = _calculate_primes(args.server_address)
  67. print(primes)
  68. if __name__ == '__main__':
  69. handler = logging.StreamHandler(sys.stdout)
  70. formatter = logging.Formatter('[PID %(process)d] %(message)s')
  71. handler.setFormatter(formatter)
  72. _LOGGER.addHandler(handler)
  73. _LOGGER.setLevel(logging.INFO)
  74. main()