upload_rbe_results.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. #!/usr/bin/env python
  2. # Copyright 2017 gRPC authors.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """Uploads RBE results to BigQuery"""
  16. import argparse
  17. import json
  18. import os
  19. import ssl
  20. import sys
  21. import urllib.error
  22. import urllib.parse
  23. import urllib.request
  24. import uuid
  25. gcp_utils_dir = os.path.abspath(
  26. os.path.join(os.path.dirname(__file__), '../../gcp/utils'))
  27. sys.path.append(gcp_utils_dir)
  28. import big_query_utils
  29. _DATASET_ID = 'jenkins_test_results'
  30. _DESCRIPTION = 'Test results from master RBE builds on Kokoro'
  31. # 365 days in milliseconds
  32. _EXPIRATION_MS = 365 * 24 * 60 * 60 * 1000
  33. _PARTITION_TYPE = 'DAY'
  34. _PROJECT_ID = 'grpc-testing'
  35. _RESULTS_SCHEMA = [
  36. ('job_name', 'STRING', 'Name of Kokoro job'),
  37. ('build_id', 'INTEGER', 'Build ID of Kokoro job'),
  38. ('build_url', 'STRING', 'URL of Kokoro build'),
  39. ('test_target', 'STRING', 'Bazel target path'),
  40. ('test_class_name', 'STRING', 'Name of test class'),
  41. ('test_case', 'STRING', 'Name of test case'),
  42. ('result', 'STRING', 'Test or build result'),
  43. ('timestamp', 'TIMESTAMP', 'Timestamp of test run'),
  44. ('duration', 'FLOAT', 'Duration of the test run'),
  45. ]
  46. _TABLE_ID = 'rbe_test_results'
  47. def _get_api_key():
  48. """Returns string with API key to access ResultStore.
  49. Intended to be used in Kokoro environment."""
  50. api_key_directory = os.getenv('KOKORO_GFILE_DIR')
  51. api_key_file = os.path.join(api_key_directory, 'resultstore_api_key')
  52. assert os.path.isfile(api_key_file), 'Must add --api_key arg if not on ' \
  53. 'Kokoro or Kokoro environment is not set up properly.'
  54. with open(api_key_file, 'r') as f:
  55. return f.read().replace('\n', '')
  56. def _get_invocation_id():
  57. """Returns String of Bazel invocation ID. Intended to be used in
  58. Kokoro environment."""
  59. bazel_id_directory = os.getenv('KOKORO_ARTIFACTS_DIR')
  60. bazel_id_file = os.path.join(bazel_id_directory, 'bazel_invocation_ids')
  61. assert os.path.isfile(bazel_id_file), 'bazel_invocation_ids file, written ' \
  62. 'by RBE initialization script, expected but not found.'
  63. with open(bazel_id_file, 'r') as f:
  64. return f.read().replace('\n', '')
  65. def _parse_test_duration(duration_str):
  66. """Parse test duration string in '123.567s' format"""
  67. try:
  68. if duration_str.endswith('s'):
  69. duration_str = duration_str[:-1]
  70. return float(duration_str)
  71. except:
  72. return None
  73. def _upload_results_to_bq(rows):
  74. """Upload test results to a BQ table.
  75. Args:
  76. rows: A list of dictionaries containing data for each row to insert
  77. """
  78. bq = big_query_utils.create_big_query()
  79. big_query_utils.create_partitioned_table(bq,
  80. _PROJECT_ID,
  81. _DATASET_ID,
  82. _TABLE_ID,
  83. _RESULTS_SCHEMA,
  84. _DESCRIPTION,
  85. partition_type=_PARTITION_TYPE,
  86. expiration_ms=_EXPIRATION_MS)
  87. max_retries = 3
  88. for attempt in range(max_retries):
  89. if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, _TABLE_ID,
  90. rows):
  91. break
  92. else:
  93. if attempt < max_retries - 1:
  94. print('Error uploading result to bigquery, will retry.')
  95. else:
  96. print(
  97. 'Error uploading result to bigquery, all attempts failed.')
  98. sys.exit(1)
  99. def _get_resultstore_data(api_key, invocation_id):
  100. """Returns dictionary of test results by querying ResultStore API.
  101. Args:
  102. api_key: String of ResultStore API key
  103. invocation_id: String of ResultStore invocation ID to results from
  104. """
  105. all_actions = []
  106. page_token = ''
  107. # ResultStore's API returns data on a limited number of tests. When we exceed
  108. # that limit, the 'nextPageToken' field is included in the request to get
  109. # subsequent data, so keep requesting until 'nextPageToken' field is omitted.
  110. while True:
  111. req = urllib.request.Request(
  112. url=
  113. 'https://resultstore.googleapis.com/v2/invocations/%s/targets/-/configuredTargets/-/actions?key=%s&pageToken=%s&fields=next_page_token,actions.id,actions.status_attributes,actions.timing,actions.test_action'
  114. % (invocation_id, api_key, page_token),
  115. headers={'Content-Type': 'application/json'})
  116. ctx_dict = {}
  117. if os.getenv("PYTHONHTTPSVERIFY") == "0":
  118. ctx = ssl.create_default_context()
  119. ctx.check_hostname = False
  120. ctx.verify_mode = ssl.CERT_NONE
  121. ctx_dict = {"context": ctx}
  122. raw_resp = urllib.request.urlopen(req, **ctx_dict).read()
  123. decoded_resp = raw_resp if isinstance(
  124. raw_resp, str) else raw_resp.decode('utf-8', 'ignore')
  125. results = json.loads(decoded_resp)
  126. all_actions.extend(results['actions'])
  127. if 'nextPageToken' not in results:
  128. break
  129. page_token = results['nextPageToken']
  130. return all_actions
  131. if __name__ == "__main__":
  132. # Arguments are necessary if running in a non-Kokoro environment.
  133. argp = argparse.ArgumentParser(
  134. description=
  135. 'Fetches results for given RBE invocation and uploads them to BigQuery table.'
  136. )
  137. argp.add_argument('--api_key',
  138. default='',
  139. type=str,
  140. help='The API key to read from ResultStore API')
  141. argp.add_argument('--invocation_id',
  142. default='',
  143. type=str,
  144. help='UUID of bazel invocation to fetch.')
  145. argp.add_argument('--bq_dump_file',
  146. default=None,
  147. type=str,
  148. help='Dump JSON data to file just before uploading')
  149. argp.add_argument('--resultstore_dump_file',
  150. default=None,
  151. type=str,
  152. help='Dump JSON data as received from ResultStore API')
  153. argp.add_argument('--skip_upload',
  154. default=False,
  155. action='store_const',
  156. const=True,
  157. help='Skip uploading to bigquery')
  158. args = argp.parse_args()
  159. api_key = args.api_key or _get_api_key()
  160. invocation_id = args.invocation_id or _get_invocation_id()
  161. resultstore_actions = _get_resultstore_data(api_key, invocation_id)
  162. if args.resultstore_dump_file:
  163. with open(args.resultstore_dump_file, 'w') as f:
  164. json.dump(resultstore_actions, f, indent=4, sort_keys=True)
  165. print(
  166. ('Dumped resultstore data to file %s' % args.resultstore_dump_file))
  167. # google.devtools.resultstore.v2.Action schema:
  168. # https://github.com/googleapis/googleapis/blob/master/google/devtools/resultstore/v2/action.proto
  169. bq_rows = []
  170. for index, action in enumerate(resultstore_actions):
  171. # Filter out non-test related data, such as build results.
  172. if 'testAction' not in action:
  173. continue
  174. # Some test results contain the fileProcessingErrors field, which indicates
  175. # an issue with parsing results individual test cases.
  176. if 'fileProcessingErrors' in action:
  177. test_cases = [{
  178. 'testCase': {
  179. 'caseName': str(action['id']['actionId']),
  180. }
  181. }]
  182. # Test timeouts have a different dictionary structure compared to pass and
  183. # fail results.
  184. elif action['statusAttributes']['status'] == 'TIMED_OUT':
  185. test_cases = [{
  186. 'testCase': {
  187. 'caseName': str(action['id']['actionId']),
  188. 'timedOut': True
  189. }
  190. }]
  191. # When RBE believes its infrastructure is failing, it will abort and
  192. # mark running tests as UNKNOWN. These infrastructure failures may be
  193. # related to our tests, so we should investigate if specific tests are
  194. # repeatedly being marked as UNKNOWN.
  195. elif action['statusAttributes']['status'] == 'UNKNOWN':
  196. test_cases = [{
  197. 'testCase': {
  198. 'caseName': str(action['id']['actionId']),
  199. 'unknown': True
  200. }
  201. }]
  202. # Take the timestamp from the previous action, which should be
  203. # a close approximation.
  204. action['timing'] = {
  205. 'startTime':
  206. resultstore_actions[index - 1]['timing']['startTime']
  207. }
  208. elif 'testSuite' not in action['testAction']:
  209. continue
  210. elif 'tests' not in action['testAction']['testSuite']:
  211. continue
  212. else:
  213. test_cases = []
  214. for tests_item in action['testAction']['testSuite']['tests']:
  215. test_cases += tests_item['testSuite']['tests']
  216. for test_case in test_cases:
  217. if any(s in test_case['testCase'] for s in ['errors', 'failures']):
  218. result = 'FAILED'
  219. elif 'timedOut' in test_case['testCase']:
  220. result = 'TIMEOUT'
  221. elif 'unknown' in test_case['testCase']:
  222. result = 'UNKNOWN'
  223. else:
  224. result = 'PASSED'
  225. try:
  226. bq_rows.append({
  227. 'insertId': str(uuid.uuid4()),
  228. 'json': {
  229. 'job_name':
  230. os.getenv('KOKORO_JOB_NAME'),
  231. 'build_id':
  232. os.getenv('KOKORO_BUILD_NUMBER'),
  233. 'build_url':
  234. 'https://source.cloud.google.com/results/invocations/%s'
  235. % invocation_id,
  236. 'test_target':
  237. action['id']['targetId'],
  238. 'test_class_name':
  239. test_case['testCase'].get('className', ''),
  240. 'test_case':
  241. test_case['testCase']['caseName'],
  242. 'result':
  243. result,
  244. 'timestamp':
  245. action['timing']['startTime'],
  246. 'duration':
  247. _parse_test_duration(action['timing']['duration']),
  248. }
  249. })
  250. except Exception as e:
  251. print(('Failed to parse test result. Error: %s' % str(e)))
  252. print((json.dumps(test_case, indent=4)))
  253. bq_rows.append({
  254. 'insertId': str(uuid.uuid4()),
  255. 'json': {
  256. 'job_name':
  257. os.getenv('KOKORO_JOB_NAME'),
  258. 'build_id':
  259. os.getenv('KOKORO_BUILD_NUMBER'),
  260. 'build_url':
  261. 'https://source.cloud.google.com/results/invocations/%s'
  262. % invocation_id,
  263. 'test_target':
  264. action['id']['targetId'],
  265. 'test_class_name':
  266. 'N/A',
  267. 'test_case':
  268. 'N/A',
  269. 'result':
  270. 'UNPARSEABLE',
  271. 'timestamp':
  272. 'N/A',
  273. }
  274. })
  275. if args.bq_dump_file:
  276. with open(args.bq_dump_file, 'w') as f:
  277. json.dump(bq_rows, f, indent=4, sort_keys=True)
  278. print(('Dumped BQ data to file %s' % args.bq_dump_file))
  279. if not args.skip_upload:
  280. # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time.
  281. MAX_ROWS = 1000
  282. for i in range(0, len(bq_rows), MAX_ROWS):
  283. _upload_results_to_bq(bq_rows[i:i + MAX_ROWS])
  284. else:
  285. print('Skipped upload to bigquery.')