big_query_utils.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import print_function
  15. import argparse
  16. import json
  17. import uuid
  18. from apiclient import discovery
  19. from apiclient.errors import HttpError
  20. import httplib2
  21. from oauth2client.client import GoogleCredentials
  22. # 30 days in milliseconds
  23. _EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
  24. NUM_RETRIES = 3
  25. def create_big_query():
  26. """Authenticates with cloud platform and gets a BiqQuery service object
  27. """
  28. creds = GoogleCredentials.get_application_default()
  29. return discovery.build('bigquery',
  30. 'v2',
  31. credentials=creds,
  32. cache_discovery=False)
  33. def create_dataset(biq_query, project_id, dataset_id):
  34. is_success = True
  35. body = {
  36. 'datasetReference': {
  37. 'projectId': project_id,
  38. 'datasetId': dataset_id
  39. }
  40. }
  41. try:
  42. dataset_req = biq_query.datasets().insert(projectId=project_id,
  43. body=body)
  44. dataset_req.execute(num_retries=NUM_RETRIES)
  45. except HttpError as http_error:
  46. if http_error.resp.status == 409:
  47. print('Warning: The dataset %s already exists' % dataset_id)
  48. else:
  49. # Note: For more debugging info, print "http_error.content"
  50. print('Error in creating dataset: %s. Err: %s' %
  51. (dataset_id, http_error))
  52. is_success = False
  53. return is_success
  54. def create_table(big_query, project_id, dataset_id, table_id, table_schema,
  55. description):
  56. fields = [{
  57. 'name': field_name,
  58. 'type': field_type,
  59. 'description': field_description
  60. } for (field_name, field_type, field_description) in table_schema]
  61. return create_table2(big_query, project_id, dataset_id, table_id, fields,
  62. description)
  63. def create_partitioned_table(big_query,
  64. project_id,
  65. dataset_id,
  66. table_id,
  67. table_schema,
  68. description,
  69. partition_type='DAY',
  70. expiration_ms=_EXPIRATION_MS):
  71. """Creates a partitioned table. By default, a date-paritioned table is created with
  72. each partition lasting 30 days after it was last modified.
  73. """
  74. fields = [{
  75. 'name': field_name,
  76. 'type': field_type,
  77. 'description': field_description
  78. } for (field_name, field_type, field_description) in table_schema]
  79. return create_table2(big_query, project_id, dataset_id, table_id, fields,
  80. description, partition_type, expiration_ms)
  81. def create_table2(big_query,
  82. project_id,
  83. dataset_id,
  84. table_id,
  85. fields_schema,
  86. description,
  87. partition_type=None,
  88. expiration_ms=None):
  89. is_success = True
  90. body = {
  91. 'description': description,
  92. 'schema': {
  93. 'fields': fields_schema
  94. },
  95. 'tableReference': {
  96. 'datasetId': dataset_id,
  97. 'projectId': project_id,
  98. 'tableId': table_id
  99. }
  100. }
  101. if partition_type and expiration_ms:
  102. body["timePartitioning"] = {
  103. "type": partition_type,
  104. "expirationMs": expiration_ms
  105. }
  106. try:
  107. table_req = big_query.tables().insert(projectId=project_id,
  108. datasetId=dataset_id,
  109. body=body)
  110. res = table_req.execute(num_retries=NUM_RETRIES)
  111. print('Successfully created %s "%s"' % (res['kind'], res['id']))
  112. except HttpError as http_error:
  113. if http_error.resp.status == 409:
  114. print('Warning: Table %s already exists' % table_id)
  115. else:
  116. print('Error in creating table: %s. Err: %s' %
  117. (table_id, http_error))
  118. is_success = False
  119. return is_success
  120. def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
  121. is_success = True
  122. body = {
  123. 'schema': {
  124. 'fields': fields_schema
  125. },
  126. 'tableReference': {
  127. 'datasetId': dataset_id,
  128. 'projectId': project_id,
  129. 'tableId': table_id
  130. }
  131. }
  132. try:
  133. table_req = big_query.tables().patch(projectId=project_id,
  134. datasetId=dataset_id,
  135. tableId=table_id,
  136. body=body)
  137. res = table_req.execute(num_retries=NUM_RETRIES)
  138. print('Successfully patched %s "%s"' % (res['kind'], res['id']))
  139. except HttpError as http_error:
  140. print('Error in creating table: %s. Err: %s' % (table_id, http_error))
  141. is_success = False
  142. return is_success
  143. def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
  144. is_success = True
  145. body = {'rows': rows_list}
  146. try:
  147. insert_req = big_query.tabledata().insertAll(projectId=project_id,
  148. datasetId=dataset_id,
  149. tableId=table_id,
  150. body=body)
  151. res = insert_req.execute(num_retries=NUM_RETRIES)
  152. if res.get('insertErrors', None):
  153. print('Error inserting rows! Response: %s' % res)
  154. is_success = False
  155. except HttpError as http_error:
  156. print('Error inserting rows to the table %s' % table_id)
  157. print('Error message: %s' % http_error)
  158. is_success = False
  159. return is_success
  160. def sync_query_job(big_query, project_id, query, timeout=5000):
  161. query_data = {'query': query, 'timeoutMs': timeout}
  162. query_job = None
  163. try:
  164. query_job = big_query.jobs().query(
  165. projectId=project_id,
  166. body=query_data).execute(num_retries=NUM_RETRIES)
  167. except HttpError as http_error:
  168. print('Query execute job failed with error: %s' % http_error)
  169. print(http_error.content)
  170. return query_job
  171. # List of (column name, column type, description) tuples
  172. def make_row(unique_row_id, row_values_dict):
  173. """row_values_dict is a dictionary of column name and column value.
  174. """
  175. return {'insertId': unique_row_id, 'json': row_values_dict}