pubsub_demo.rb 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #!/usr/bin/env ruby
  2. # Copyright 2015 gRPC authors.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # pubsub_demo demos accesses the Google PubSub API via its gRPC interface
  16. #
  17. # $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \
  18. # path/to/pubsub_demo.rb \
  19. # [--action=<chosen_demo_action> ]
  20. #
  21. # There are options related to the chosen action, see #parse_args below.
  22. # - the possible actions are given by the method names of NamedAction class
  23. # - the default action is list_some_topics
  24. this_dir = File.expand_path(File.dirname(__FILE__))
  25. lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
  26. $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
  27. $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
  28. require 'optparse'
  29. require 'grpc'
  30. require 'googleauth'
  31. require 'google/protobuf'
  32. require 'google/protobuf/empty'
  33. require 'tech/pubsub/proto/pubsub'
  34. require 'tech/pubsub/proto/pubsub_services'
  35. # creates a SSL Credentials from the production certificates.
  36. def ssl_creds
  37. GRPC::Core::ChannelCredentials.new()
  38. end
  39. # Builds the metadata authentication update proc.
  40. def auth_proc(opts)
  41. auth_creds = Google::Auth.get_application_default
  42. return auth_creds.updater_proc
  43. end
  44. # Creates a stub for accessing the publisher service.
  45. def publisher_stub(opts)
  46. address = "#{opts.host}:#{opts.port}"
  47. stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
  48. GRPC.logger.info("... access PublisherService at #{address}")
  49. call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
  50. combined_creds = ssl_creds.compose(call_creds)
  51. stub_clz.new(address, creds: combined_creds,
  52. GRPC::Core::Channel::SSL_TARGET => opts.host)
  53. end
  54. # Creates a stub for accessing the subscriber service.
  55. def subscriber_stub(opts)
  56. address = "#{opts.host}:#{opts.port}"
  57. stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
  58. GRPC.logger.info("... access SubscriberService at #{address}")
  59. call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
  60. combined_creds = ssl_creds.compose(call_creds)
  61. stub_clz.new(address, creds: combined_creds,
  62. GRPC::Core::Channel::SSL_TARGET => opts.host)
  63. end
  64. # defines methods corresponding to each interop test case.
  65. class NamedActions
  66. include Tech::Pubsub
  67. # Initializes NamedActions
  68. #
  69. # @param pub [Stub] a stub for accessing the publisher service
  70. # @param sub [Stub] a stub for accessing the publisher service
  71. # @param args [Args] provides access to the command line
  72. def initialize(pub, sub, args)
  73. @pub = pub
  74. @sub = sub
  75. @args = args
  76. end
  77. # Removes the test topic if it exists
  78. def remove_topic
  79. name = test_topic_name
  80. p "... removing Topic #{name}"
  81. @pub.delete_topic(DeleteTopicRequest.new(topic: name))
  82. p "removed Topic: #{name} OK"
  83. rescue GRPC::BadStatus => e
  84. p "Could not delete a topics: rpc failed with '#{e}'"
  85. end
  86. # Creates a test topic
  87. def create_topic
  88. name = test_topic_name
  89. p "... creating Topic #{name}"
  90. resp = @pub.create_topic(Topic.new(name: name))
  91. p "created Topic: #{resp.name} OK"
  92. rescue GRPC::BadStatus => e
  93. p "Could not create a topics: rpc failed with '#{e}'"
  94. end
  95. # Lists topics in the project
  96. def list_some_topics
  97. p 'Listing topics'
  98. p '-------------_'
  99. list_project_topics.topic.each { |t| p t.name }
  100. rescue GRPC::BadStatus => e
  101. p "Could not list topics: rpc failed with '#{e}'"
  102. end
  103. # Checks if a topics exists in a project
  104. def check_exists
  105. name = test_topic_name
  106. p "... checking for topic #{name}"
  107. exists = topic_exists?(name)
  108. p "#{name} is a topic" if exists
  109. p "#{name} is not a topic" unless exists
  110. rescue GRPC::BadStatus => e
  111. p "Could not check for a topics: rpc failed with '#{e}'"
  112. end
  113. # Publishes some messages
  114. def random_pub_sub
  115. topic_name, sub_name = test_topic_name, test_sub_name
  116. create_topic_if_needed(topic_name)
  117. @sub.create_subscription(Subscription.new(name: sub_name,
  118. topic: topic_name))
  119. msg_count = rand(10..30)
  120. msg_count.times do |x|
  121. msg = PubsubMessage.new(data: "message #{x}")
  122. @pub.publish(PublishRequest.new(topic: topic_name, message: msg))
  123. end
  124. p "Sent #{msg_count} messages to #{topic_name}, checking for them now."
  125. batch = @sub.pull_batch(PullBatchRequest.new(subscription: sub_name,
  126. max_events: msg_count))
  127. ack_ids = batch.pull_responses.map { |x| x.ack_id }
  128. p "Got #{ack_ids.size} messages; acknowledging them.."
  129. @sub.acknowledge(AcknowledgeRequest.new(subscription: sub_name,
  130. ack_id: ack_ids))
  131. p "Test messages were acknowledged OK, deleting the subscription"
  132. del_req = DeleteSubscriptionRequest.new(subscription: sub_name)
  133. @sub.delete_subscription(del_req)
  134. rescue GRPC::BadStatus => e
  135. p "Could not do random pub sub: rpc failed with '#{e}'"
  136. end
  137. private
  138. # test_topic_name is the topic name to use in this test.
  139. def test_topic_name
  140. unless @args.topic_name.nil?
  141. return "/topics/#{@args.project_id}/#{@args.topic_name}"
  142. end
  143. now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
  144. "/topics/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
  145. end
  146. # test_sub_name is the subscription name to use in this test.
  147. def test_sub_name
  148. unless @args.sub_name.nil?
  149. return "/subscriptions/#{@args.project_id}/#{@args.sub_name}"
  150. end
  151. now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
  152. "/subscriptions/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
  153. end
  154. # determines if the topic name exists
  155. def topic_exists?(name)
  156. topics = list_project_topics.topic.map { |t| t.name }
  157. topics.include?(name)
  158. end
  159. def create_topic_if_needed(name)
  160. return if topic_exists?(name)
  161. @pub.create_topic(Topic.new(name: name))
  162. end
  163. def list_project_topics
  164. q = "cloud.googleapis.com/project in (/projects/#{@args.project_id})"
  165. @pub.list_topics(ListTopicsRequest.new(query: q))
  166. end
  167. end
  168. # Args is used to hold the command line info.
  169. Args = Struct.new(:host, :port, :action, :project_id, :topic_name,
  170. :sub_name)
  171. # validates the command line options, returning them as an Arg.
  172. def parse_args
  173. args = Args.new('pubsub-staging.googleapis.com',
  174. 443, 'list_some_topics', 'stoked-keyword-656')
  175. OptionParser.new do |opts|
  176. opts.on('--server_host SERVER_HOST', 'server hostname') do |v|
  177. args.host = v
  178. end
  179. opts.on('--server_port SERVER_PORT', 'server port') do |v|
  180. args.port = v
  181. end
  182. # instance_methods(false) gives only the methods defined in that class.
  183. scenes = NamedActions.instance_methods(false).map { |t| t.to_s }
  184. scene_list = scenes.join(',')
  185. opts.on("--action CODE", scenes, {}, 'pick a demo action',
  186. " (#{scene_list})") do |v|
  187. args.action = v
  188. end
  189. # Set the remaining values.
  190. %w(project_id topic_name sub_name).each do |o|
  191. opts.on("--#{o} VALUE", "#{o}") do |v|
  192. args[o] = v
  193. end
  194. end
  195. end.parse!
  196. _check_args(args)
  197. end
  198. def _check_args(args)
  199. %w(host port action).each do |a|
  200. if args[a].nil?
  201. raise OptionParser::MissingArgument.new("please specify --#{a}")
  202. end
  203. end
  204. args
  205. end
  206. def main
  207. args = parse_args
  208. pub, sub = publisher_stub(args), subscriber_stub(args)
  209. NamedActions.new(pub, sub, args).method(args.action).call
  210. end
  211. main