123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- #!/usr/bin/env ruby
- # Copyright 2015 gRPC authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # pubsub_demo demos accesses the Google PubSub API via its gRPC interface
- #
- # $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \
- # path/to/pubsub_demo.rb \
- # [--action=<chosen_demo_action> ]
- #
- # There are options related to the chosen action, see #parse_args below.
- # - the possible actions are given by the method names of NamedAction class
- # - the default action is list_some_topics
- this_dir = File.expand_path(File.dirname(__FILE__))
- lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
- $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
- $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
- require 'optparse'
- require 'grpc'
- require 'googleauth'
- require 'google/protobuf'
- require 'google/protobuf/empty'
- require 'tech/pubsub/proto/pubsub'
- require 'tech/pubsub/proto/pubsub_services'
- # creates a SSL Credentials from the production certificates.
- def ssl_creds
- GRPC::Core::ChannelCredentials.new()
- end
- # Builds the metadata authentication update proc.
- def auth_proc(opts)
- auth_creds = Google::Auth.get_application_default
- return auth_creds.updater_proc
- end
- # Creates a stub for accessing the publisher service.
- def publisher_stub(opts)
- address = "#{opts.host}:#{opts.port}"
- stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
- GRPC.logger.info("... access PublisherService at #{address}")
- call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
- combined_creds = ssl_creds.compose(call_creds)
- stub_clz.new(address, creds: combined_creds,
- GRPC::Core::Channel::SSL_TARGET => opts.host)
- end
- # Creates a stub for accessing the subscriber service.
- def subscriber_stub(opts)
- address = "#{opts.host}:#{opts.port}"
- stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
- GRPC.logger.info("... access SubscriberService at #{address}")
- call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
- combined_creds = ssl_creds.compose(call_creds)
- stub_clz.new(address, creds: combined_creds,
- GRPC::Core::Channel::SSL_TARGET => opts.host)
- end
- # defines methods corresponding to each interop test case.
- class NamedActions
- include Tech::Pubsub
- # Initializes NamedActions
- #
- # @param pub [Stub] a stub for accessing the publisher service
- # @param sub [Stub] a stub for accessing the publisher service
- # @param args [Args] provides access to the command line
- def initialize(pub, sub, args)
- @pub = pub
- @sub = sub
- @args = args
- end
- # Removes the test topic if it exists
- def remove_topic
- name = test_topic_name
- p "... removing Topic #{name}"
- @pub.delete_topic(DeleteTopicRequest.new(topic: name))
- p "removed Topic: #{name} OK"
- rescue GRPC::BadStatus => e
- p "Could not delete a topics: rpc failed with '#{e}'"
- end
- # Creates a test topic
- def create_topic
- name = test_topic_name
- p "... creating Topic #{name}"
- resp = @pub.create_topic(Topic.new(name: name))
- p "created Topic: #{resp.name} OK"
- rescue GRPC::BadStatus => e
- p "Could not create a topics: rpc failed with '#{e}'"
- end
- # Lists topics in the project
- def list_some_topics
- p 'Listing topics'
- p '-------------_'
- list_project_topics.topic.each { |t| p t.name }
- rescue GRPC::BadStatus => e
- p "Could not list topics: rpc failed with '#{e}'"
- end
- # Checks if a topics exists in a project
- def check_exists
- name = test_topic_name
- p "... checking for topic #{name}"
- exists = topic_exists?(name)
- p "#{name} is a topic" if exists
- p "#{name} is not a topic" unless exists
- rescue GRPC::BadStatus => e
- p "Could not check for a topics: rpc failed with '#{e}'"
- end
- # Publishes some messages
- def random_pub_sub
- topic_name, sub_name = test_topic_name, test_sub_name
- create_topic_if_needed(topic_name)
- @sub.create_subscription(Subscription.new(name: sub_name,
- topic: topic_name))
- msg_count = rand(10..30)
- msg_count.times do |x|
- msg = PubsubMessage.new(data: "message #{x}")
- @pub.publish(PublishRequest.new(topic: topic_name, message: msg))
- end
- p "Sent #{msg_count} messages to #{topic_name}, checking for them now."
- batch = @sub.pull_batch(PullBatchRequest.new(subscription: sub_name,
- max_events: msg_count))
- ack_ids = batch.pull_responses.map { |x| x.ack_id }
- p "Got #{ack_ids.size} messages; acknowledging them.."
- @sub.acknowledge(AcknowledgeRequest.new(subscription: sub_name,
- ack_id: ack_ids))
- p "Test messages were acknowledged OK, deleting the subscription"
- del_req = DeleteSubscriptionRequest.new(subscription: sub_name)
- @sub.delete_subscription(del_req)
- rescue GRPC::BadStatus => e
- p "Could not do random pub sub: rpc failed with '#{e}'"
- end
- private
- # test_topic_name is the topic name to use in this test.
- def test_topic_name
- unless @args.topic_name.nil?
- return "/topics/#{@args.project_id}/#{@args.topic_name}"
- end
- now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
- "/topics/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
- end
- # test_sub_name is the subscription name to use in this test.
- def test_sub_name
- unless @args.sub_name.nil?
- return "/subscriptions/#{@args.project_id}/#{@args.sub_name}"
- end
- now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
- "/subscriptions/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
- end
- # determines if the topic name exists
- def topic_exists?(name)
- topics = list_project_topics.topic.map { |t| t.name }
- topics.include?(name)
- end
- def create_topic_if_needed(name)
- return if topic_exists?(name)
- @pub.create_topic(Topic.new(name: name))
- end
- def list_project_topics
- q = "cloud.googleapis.com/project in (/projects/#{@args.project_id})"
- @pub.list_topics(ListTopicsRequest.new(query: q))
- end
- end
- # Args is used to hold the command line info.
- Args = Struct.new(:host, :port, :action, :project_id, :topic_name,
- :sub_name)
- # validates the command line options, returning them as an Arg.
- def parse_args
- args = Args.new('pubsub-staging.googleapis.com',
- 443, 'list_some_topics', 'stoked-keyword-656')
- OptionParser.new do |opts|
- opts.on('--server_host SERVER_HOST', 'server hostname') do |v|
- args.host = v
- end
- opts.on('--server_port SERVER_PORT', 'server port') do |v|
- args.port = v
- end
- # instance_methods(false) gives only the methods defined in that class.
- scenes = NamedActions.instance_methods(false).map { |t| t.to_s }
- scene_list = scenes.join(',')
- opts.on("--action CODE", scenes, {}, 'pick a demo action',
- " (#{scene_list})") do |v|
- args.action = v
- end
- # Set the remaining values.
- %w(project_id topic_name sub_name).each do |o|
- opts.on("--#{o} VALUE", "#{o}") do |v|
- args[o] = v
- end
- end
- end.parse!
- _check_args(args)
- end
- def _check_args(args)
- %w(host port action).each do |a|
- if args[a].nil?
- raise OptionParser::MissingArgument.new("please specify --#{a}")
- end
- end
- args
- end
- def main
- args = parse_args
- pub, sub = publisher_stub(args), subscriber_stub(args)
- NamedActions.new(pub, sub, args).method(args.action).call
- end
- main
|