1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- # Copyright 2019 The gRPC Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """This example sends out rich error status from server-side."""
- from concurrent import futures
- import logging
- import threading
- from google.protobuf import any_pb2
- from google.rpc import code_pb2
- from google.rpc import error_details_pb2
- from google.rpc import status_pb2
- import grpc
- from grpc_status import rpc_status
- from examples.protos import helloworld_pb2
- from examples.protos import helloworld_pb2_grpc
- def create_greet_limit_exceed_error_status(name):
- detail = any_pb2.Any()
- detail.Pack(
- error_details_pb2.QuotaFailure(violations=[
- error_details_pb2.QuotaFailure.Violation(
- subject="name: %s" % name,
- description="Limit one greeting per person",
- )
- ],))
- return status_pb2.Status(
- code=code_pb2.RESOURCE_EXHAUSTED,
- message='Request limit exceeded.',
- details=[detail],
- )
- class LimitedGreeter(helloworld_pb2_grpc.GreeterServicer):
- def __init__(self):
- self._lock = threading.RLock()
- self._greeted = set()
- def SayHello(self, request, context):
- with self._lock:
- if request.name in self._greeted:
- rich_status = create_greet_limit_exceed_error_status(
- request.name)
- context.abort_with_status(rpc_status.to_status(rich_status))
- else:
- self._greeted.add(request.name)
- return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
- def create_server(server_address):
- server = grpc.server(futures.ThreadPoolExecutor())
- helloworld_pb2_grpc.add_GreeterServicer_to_server(LimitedGreeter(), server)
- port = server.add_insecure_port(server_address)
- return server, port
- def serve(server):
- server.start()
- server.wait_for_termination()
- def main():
- server, unused_port = create_server('[::]:50051')
- serve(server)
- if __name__ == '__main__':
- logging.basicConfig()
- main()
|