OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
run_benchmark.py
Go to the documentation of this file.
1 import os
2 import timeit
3 import logging
4 import uuid
5 import datetime
6 import json
7 import numpy
8 import pandas as pd
9 import pymapd
10 import re
11 import sys
12 from pandas import DataFrame
13 from argparse import ArgumentParser
14 
15 
16 # For usage info, run: `./<script_name>.py --help`
17 
18 
19 def verify_destinations(**kwargs):
20  """
21  Verify script output destination(s)
22 
23  Kwargs:
24  destinations (list): List of destinations
25  dest_db_server (str): DB output destination server
26  output_file_json (str): Location of .json file output
27  output_file_jenkins (str): Location of .json jenkins file output
28 
29  Returns:
30  True(bool): Destination(s) is/are valid
31  False(bool): Destination(s) is/are not valid
32  """
33  if "mapd_db" in kwargs["destinations"]:
34  valid_destination_set = True
35  if kwargs["dest_db_server"] is None:
36  # If dest_server is not set for mapd_db, then exit
37  logging.error(
38  '"dest_server" is required when destination = "mapd_db"'
39  )
40  if "file_json" in kwargs["destinations"]:
41  valid_destination_set = True
42  if kwargs["output_file_json"] is None:
43  # If output_file_json is not set for file_json, then exit
44  logging.error(
45  '"output_file_json" is required when destination = "file_json"'
46  )
47  if "output" in kwargs["destinations"]:
48  valid_destination_set = True
49  if "jenkins_bench" in kwargs["destinations"]:
50  valid_destination_set = True
51  if kwargs["output_file_jenkins"] is None:
52  # If output_file_jenkins is not set for jenkins_bench, then exit
53  logging.error(
54  '"output_file_jenkins" is required '
55  + 'when destination = "jenkins_bench"'
56  )
57  if not valid_destination_set:
58  return False
59  else:
60  return True
61 
62 
63 def get_connection(**kwargs):
64  """
65  Connects to the db using pymapd
66  https://pymapd.readthedocs.io/en/latest/usage.html#connecting
67 
68  Kwargs:
69  db_user(str): DB username
70  db_passwd(str): DB password
71  db_server(str): DB host
72  db_port(int): DB port
73  db_name(str): DB name
74 
75  Returns:
76  con(class): Connection class
77  False(bool): The connection failed. Exception should be logged.
78  """
79  try:
80  logging.debug("Connecting to mapd db...")
81  con = pymapd.connect(
82  user=kwargs["db_user"],
83  password=kwargs["db_passwd"],
84  host=kwargs["db_server"],
85  port=kwargs["db_port"],
86  dbname=kwargs["db_name"],
87  )
88  logging.info("Succesfully connected to mapd db")
89  return con
90  except (pymapd.exceptions.OperationalError, pymapd.exceptions.Error):
91  logging.exception("Error connecting to database.")
92  return False
93 
94 
95 def get_run_vars(**kwargs):
96  """
97  Gets/sets run-specific vars such as time, uid, etc.
98 
99  Kwargs:
100  con(class 'pymapd.connection.Connection'): Mapd connection
101 
102  Returns:
103  run_vars(dict):::
104  run_guid(str): Run GUID
105  run_timestamp(datetime): Run timestamp
106  run_connection(str): Connection string
107  run_driver(str): Run driver
108  run_version(str): Version of DB
109  run_version_short(str): Shortened version of DB
110  conn_machine_name(str): Name of run machine
111  """
112  run_guid = str(uuid.uuid4())
113  logging.debug("Run guid: " + run_guid)
114  run_timestamp = datetime.datetime.now()
115  run_connection = str(kwargs["con"])
116  logging.debug("Connection string: " + run_connection)
117  run_driver = "" # TODO
118  run_version = kwargs["con"]._client.get_version()
119  if "-" in run_version:
120  run_version_short = run_version.split("-")[0]
121  else:
122  run_version_short = run_version
123  conn_machine_name = re.search(r"@(.*?):", run_connection).group(1)
124  run_vars = {
125  "run_guid": run_guid,
126  "run_timestamp": run_timestamp,
127  "run_connection": run_connection,
128  "run_driver": run_driver,
129  "run_version": run_version,
130  "run_version_short": run_version_short,
131  "conn_machine_name": conn_machine_name,
132  }
133  return run_vars
134 
135 
136 def get_gpu_info(**kwargs):
137  """
138  Gets run machine GPU info
139 
140  Kwargs:
141  gpu_name(str): GPU name from input param
142  no_gather_conn_gpu_info(bool): Gather GPU info fields
143  con(class 'pymapd.connection.Connection'): Mapd connection
144  conn_machine_name(str): Name of run machine
145  no_gather_nvml_gpu_info(bool): Do not gather GPU info using nvml
146  gather_nvml_gpu_info(bool): Gather GPU info using nvml
147  gpu_count(int): Number of GPUs on run machine
148 
149  Returns:
150  gpu_info(dict):::
151  conn_gpu_count(int): Number of GPUs gathered from pymapd con
152  source_db_gpu_count(int): Number of GPUs on run machine
153  source_db_gpu_mem(str): Amount of GPU mem on run machine
154  source_db_gpu_driver_ver(str): GPU driver version
155  source_db_gpu_name(str): GPU name
156  """
157  # Set GPU info fields
158  conn_gpu_count = None
159  source_db_gpu_count = None
160  source_db_gpu_mem = None
161  source_db_gpu_driver_ver = ""
162  source_db_gpu_name = ""
163  if kwargs["no_gather_conn_gpu_info"]:
164  logging.debug(
165  "--no-gather-conn-gpu-info passed, "
166  + "using blank values for source database GPU info fields "
167  + "[run_gpu_count, run_gpu_mem_mb] "
168  )
169  else:
170  logging.debug(
171  "Gathering source database GPU info fields "
172  + "[run_gpu_count, run_gpu_mem_mb] "
173  + "using pymapd connection info. "
174  )
175  conn_hardware_info = kwargs["con"]._client.get_hardware_info(
176  kwargs["con"]._session
177  )
178  conn_gpu_count = conn_hardware_info.hardware_info[0].num_gpu_allocated
179  if conn_gpu_count == 0 or conn_gpu_count is None:
180  no_gather_nvml_gpu_info = True
181  if conn_gpu_count == 0:
182  logging.warning(
183  "0 GPUs detected from connection info, "
184  + "using blank values for source database GPU info fields "
185  + "If running against cpu-only server, make sure to set "
186  + "--no-gather-nvml-gpu-info and --no-gather-conn-gpu-info."
187  )
188  else:
189  no_gather_nvml_gpu_info = kwargs["no_gather_nvml_gpu_info"]
190  source_db_gpu_count = conn_gpu_count
191  try:
192  source_db_gpu_mem = int(
193  conn_hardware_info.hardware_info[0].gpu_info[0].memory
194  / 1000000
195  )
196  except IndexError:
197  logging.error("GPU memory info not available from connection.")
198  if no_gather_nvml_gpu_info:
199  logging.debug(
200  "--no-gather-nvml-gpu-info passed, "
201  + "using blank values for source database GPU info fields "
202  + "[gpu_driver_ver, run_gpu_name] "
203  )
204  elif (
205  kwargs["conn_machine_name"] == "localhost"
206  or kwargs["gather_nvml_gpu_info"]
207  ):
208  logging.debug(
209  "Gathering source database GPU info fields "
210  + "[gpu_driver_ver, run_gpu_name] "
211  + "from local GPU using pynvml. "
212  )
213  import pynvml
214 
215  pynvml.nvmlInit()
216  source_db_gpu_driver_ver = pynvml.nvmlSystemGetDriverVersion().decode()
217  for i in range(source_db_gpu_count):
218  handle = pynvml.nvmlDeviceGetHandleByIndex(i)
219  # Assume all cards are the same, overwrite name value
220  source_db_gpu_name = pynvml.nvmlDeviceGetName(handle).decode()
221  pynvml.nvmlShutdown()
222  # If gpu_count argument passed in, override gathered value
223  if kwargs["gpu_count"]:
224  source_db_gpu_count = kwargs["gpu_count"]
225  if kwargs["gpu_name"]:
226  source_db_gpu_name = kwargs["gpu_name"]
227  gpu_info = {
228  "conn_gpu_count": conn_gpu_count,
229  "source_db_gpu_count": source_db_gpu_count,
230  "source_db_gpu_mem": source_db_gpu_mem,
231  "source_db_gpu_driver_ver": source_db_gpu_driver_ver,
232  "source_db_gpu_name": source_db_gpu_name,
233  }
234  return gpu_info
235 
236 
237 def get_machine_info(**kwargs):
238  """
239  Gets run machine GPU info
240 
241  Kwargs:
242  conn_machine_name(str): Name of machine from pymapd con
243  machine_name(str): Name of machine if passed in
244  machine_uname(str): Uname of machine if passed in
245 
246  Returns:
247  machine_info(dict):::
248  run_machine_name(str): Run machine name
249  run_machine_uname(str): Run machine uname
250  """
251  # Set machine names, using local info if connected to localhost
252  if kwargs["conn_machine_name"] == "localhost":
253  local_uname = os.uname()
254  # If --machine-name passed in, override pymapd con value
255  if kwargs["machine_name"]:
256  run_machine_name = kwargs["machine_name"]
257  else:
258  if kwargs["conn_machine_name"] == "localhost":
259  run_machine_name = local_uname.nodename.split(".")[0]
260  else:
261  run_machine_name = kwargs["conn_machine_name"]
262  # If --machine-uname passed in, override pymapd con value
263  if kwargs["machine_uname"]:
264  run_machine_uname = kwargs["machine_uname"]
265  else:
266  if kwargs["conn_machine_name"] == "localhost":
267  run_machine_uname = " ".join(local_uname)
268  else:
269  run_machine_uname = ""
270  machine_info = {
271  "run_machine_name": run_machine_name,
272  "run_machine_uname": run_machine_uname,
273  }
274  return machine_info
275 
276 
277 def read_query_files(**kwargs):
278  """
279  Gets run machine GPU info
280 
281  Kwargs:
282  queries_dir(str): Directory with query files
283  source_table(str): Table to run query against
284 
285  Returns:
286  query_list(dict):::
287  query_group(str): Query group, usually matches table name
288  queries(list)
289  query(dict):::
290  name(str): Name of query
291  mapdql(str): Query syntax to run
292  False(bool): Unable to find queries dir
293  """
294  # Read query files contents and write to query_list
295  query_list = {"query_group": "", "queries": []}
296  query_group = kwargs["queries_dir"].split("/")[-1]
297  query_list.update(query_group=query_group)
298  logging.debug("Queries dir: " + kwargs["queries_dir"])
299  try:
300  for query_filename in sorted(os.listdir(kwargs["queries_dir"])):
301  logging.debug("Validating query filename: " + query_filename)
302  if validate_query_file(query_filename=query_filename):
303  with open(
304  kwargs["queries_dir"] + "/" + query_filename, "r"
305  ) as query_filepath:
306  logging.debug(
307  "Reading query with filename: " + query_filename
308  )
309  query_mapdql = query_filepath.read().replace("\n", " ")
310  query_mapdql = query_mapdql.replace(
311  "##TAB##", kwargs["source_table"]
312  )
313  query_list["queries"].append(
314  {"name": query_filename, "mapdql": query_mapdql}
315  )
316  logging.info("Read all query files")
317  return query_list
318  except FileNotFoundError:
319  logging.exception("Could not find queries directory.")
320  return False
321 
322 
324  """
325  Get queries to run for setup and teardown from directory
326 
327  Kwargs:
328  queries_dir(str): Directory with query files
329  source_table(str): Table to run query against
330  foreign_table_filename(str): File to create foreign table from
331 
332  Returns:
333  setup_queries(query_list): List of setup queries
334  teardown_queries(query_list): List of teardown queries
335  False(bool): Unable to find queries dir
336 
337  query_list is described by:
338  query_list(dict):::
339  query_group(str): Query group, usually matches table name
340  queries(list)
341  query(dict):::
342  name(str): Name of query
343  mapdql(str): Query syntax to run
344  """
345  setup_teardown_queries_dir = kwargs['queries_dir']
346  source_table = kwargs['source_table']
347  # Read setup/tear-down queries if they exist
348  setup_teardown_query_list = None
349  if setup_teardown_queries_dir is not None:
350  setup_teardown_query_list = read_query_files(
351  queries_dir=setup_teardown_queries_dir,
352  source_table=source_table
353  )
354  if kwargs["foreign_table_filename"] is not None:
355  for query in setup_teardown_query_list['queries']:
356  query['mapdql'] = query['mapdql'].replace(
357  "##FILE##", kwargs["foreign_table_filename"])
358  # Filter setup queries
359  setup_query_list = None
360  if setup_teardown_query_list is not None:
361  setup_query_list = filter(
363  query_filename=x['name'], check_which='setup', quiet=True),
364  setup_teardown_query_list['queries'])
365  setup_query_list = list(setup_query_list)
366  # Filter teardown queries
367  teardown_query_list = None
368  if setup_teardown_query_list is not None:
369  teardown_query_list = filter(
371  query_filename=x['name'], check_which='teardown', quiet=True),
372  setup_teardown_query_list['queries'])
373  teardown_query_list = list(teardown_query_list)
374  return setup_query_list, teardown_query_list
375 
376 
378  """
379  Validates query file. Currently only checks the query file name, and
380  checks for setup or teardown in basename
381 
382  Kwargs:
383  query_filename(str): Name of query file
384  check_which(bool): either 'setup' or 'teardown', decide which to
385  check
386  quiet(bool): optional, if True, no warning is logged
387 
388  Returns:
389  True(bool): Query succesfully validated
390  False(bool): Query failed validation
391  """
392  qfilename = kwargs["query_filename"]
393  basename = os.path.basename(qfilename)
394  check_str = False
395  if kwargs["check_which"] == 'setup':
396  check_str = basename.lower().find('setup') > -1
397  elif kwargs["check_which"] == 'teardown':
398  check_str = basename.lower().find('teardown') > -1
399  else:
400  raise TypeError('Unsupported `check_which` parameter.')
401  return_val = True
402  if not qfilename.endswith(".sql"):
403  logging.warning(
404  "Query filename "
405  + qfilename
406  + ' is invalid - does not end in ".sql". Skipping'
407  )
408  return_val = False
409  elif not check_str:
410  quiet = True if 'quiet' in kwargs and kwargs['quiet'] else False
411  if not quiet:
412  logging.warning(
413  "Query filename "
414  + qfilename
415  + ' does not match "setup" or "teardown". Skipping'
416  )
417  return_val = False
418  return return_val
419 
420 
421 def validate_query_file(**kwargs):
422  """
423  Validates query file. Currently only checks the query file name
424 
425  Kwargs:
426  query_filename(str): Name of query file
427 
428  Returns:
429  True(bool): Query succesfully validated
430  False(bool): Query failed validation
431  """
432  if not kwargs["query_filename"].endswith(".sql"):
433  logging.warning(
434  "Query filename "
435  + kwargs["query_filename"]
436  + ' is invalid - does not end in ".sql". Skipping'
437  )
438  return False
439  else:
440  return True
441 
442 
443 def execute_query(**kwargs):
444  """
445  Executes a query against the connected db using pymapd
446  https://pymapd.readthedocs.io/en/latest/usage.html#querying
447 
448  Kwargs:
449  query_name(str): Name of query
450  query_mapdql(str): Query to run
451  iteration(int): Iteration number
452  con(class): Connection class
453 
454  Returns:
455  query_execution(dict):::
456  result_count(int): Number of results returned
457  execution_time(float): Time (in ms) that pymapd reports
458  backend spent on query.
459  connect_time(float): Time (in ms) for overhead of query, calculated
460  by subtracting backend execution time
461  from time spent on the execution function.
462  results_iter_time(float): Time (in ms) it took to for
463  pymapd.fetchone() to iterate through all
464  of the results.
465  total_time(float): Time (in ms) from adding all above times.
466  False(bool): The query failed. Exception should be logged.
467  """
468  start_time = timeit.default_timer()
469  try:
470  # Run the query
471  query_result = kwargs["con"].execute(kwargs["query_mapdql"])
472  logging.debug(
473  "Completed iteration "
474  + str(kwargs["iteration"])
475  + " of query "
476  + kwargs["query_name"]
477  )
478  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
479  logging.exception(
480  "Error running query "
481  + kwargs["query_name"]
482  + " during iteration "
483  + str(kwargs["iteration"])
484  )
485  return False
486 
487  # Calculate times
488  query_elapsed_time = (timeit.default_timer() - start_time) * 1000
489  execution_time = query_result._result.execution_time_ms
490  debug_info = query_result._result.debug
491  connect_time = round((query_elapsed_time - execution_time), 1)
492  # Iterate through each result from the query
493  logging.debug(
494  "Counting results from query"
495  + kwargs["query_name"]
496  + " iteration "
497  + str(kwargs["iteration"])
498  )
499  result_count = 0
500  start_time = timeit.default_timer()
501  while query_result.fetchone():
502  result_count += 1
503  results_iter_time = round(
504  ((timeit.default_timer() - start_time) * 1000), 1
505  )
506  query_execution = {
507  "result_count": result_count,
508  "execution_time": execution_time,
509  "connect_time": connect_time,
510  "results_iter_time": results_iter_time,
511  "total_time": execution_time + connect_time + results_iter_time,
512  "debug_info": debug_info,
513  }
514  logging.debug(
515  "Execution results for query"
516  + kwargs["query_name"]
517  + " iteration "
518  + str(kwargs["iteration"])
519  + ": "
520  + str(query_execution)
521  )
522  return query_execution
523 
524 
525 def calculate_query_times(**kwargs):
526  """
527  Calculates aggregate query times from all iteration times
528 
529  Kwargs:
530  total_times(list): List of total time calculations
531  execution_times(list): List of execution_time calculations
532  results_iter_times(list): List of results_iter_time calculations
533  connect_times(list): List of connect_time calculations
534  trim(float): Amount to trim from iterations set to gather trimmed
535  values. Enter as deciman corresponding to percent to
536  trim - ex: 0.15 to trim 15%.
537 
538  Returns:
539  query_execution(dict): Query times
540  False(bool): The query failed. Exception should be logged.
541  """
542  trim_size = int(kwargs["trim"] * len(kwargs["total_times"]))
543  return {
544  "total_time_avg": round(numpy.mean(kwargs["total_times"]), 1),
545  "total_time_min": round(numpy.min(kwargs["total_times"]), 1),
546  "total_time_max": round(numpy.max(kwargs["total_times"]), 1),
547  "total_time_85": round(numpy.percentile(kwargs["total_times"], 85), 1),
548  "total_time_trimmed_avg": round(
549  numpy.mean(
550  numpy.sort(kwargs["total_times"])[trim_size:-trim_size]
551  ),
552  1,
553  )
554  if trim_size
555  else round(numpy.mean(kwargs["total_times"]), 1),
556  "total_times": kwargs["total_times"],
557  "execution_time_avg": round(numpy.mean(kwargs["execution_times"]), 1),
558  "execution_time_min": round(numpy.min(kwargs["execution_times"]), 1),
559  "execution_time_max": round(numpy.max(kwargs["execution_times"]), 1),
560  "execution_time_85": round(
561  numpy.percentile(kwargs["execution_times"], 85), 1
562  ),
563  "execution_time_25": round(
564  numpy.percentile(kwargs["execution_times"], 25), 1
565  ),
566  "execution_time_std": round(numpy.std(kwargs["execution_times"]), 1),
567  "execution_time_trimmed_avg": round(
568  numpy.mean(
569  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
570  )
571  )
572  if trim_size > 0
573  else round(numpy.mean(kwargs["execution_times"]), 1),
574  "execution_time_trimmed_max": round(
575  numpy.max(
576  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
577  )
578  )
579  if trim_size > 0
580  else round(numpy.max(kwargs["execution_times"]), 1),
581  "execution_times": kwargs["execution_times"],
582  "connect_time_avg": round(numpy.mean(kwargs["connect_times"]), 1),
583  "connect_time_min": round(numpy.min(kwargs["connect_times"]), 1),
584  "connect_time_max": round(numpy.max(kwargs["connect_times"]), 1),
585  "connect_time_85": round(
586  numpy.percentile(kwargs["connect_times"], 85), 1
587  ),
588  "results_iter_time_avg": round(
589  numpy.mean(kwargs["results_iter_times"]), 1
590  ),
591  "results_iter_time_min": round(
592  numpy.min(kwargs["results_iter_times"]), 1
593  ),
594  "results_iter_time_max": round(
595  numpy.max(kwargs["results_iter_times"]), 1
596  ),
597  "results_iter_time_85": round(
598  numpy.percentile(kwargs["results_iter_times"], 85), 1
599  ),
600  }
601 
602 
603 def clear_memory(**kwargs):
604  """
605  Clears CPU or GPU memory
606 
607  Kwargs:
608  con(class 'pymapd.connection.Connection'): Mapd connection
609  mem_type(str): [gpu, cpu] Type of memory to clear
610 
611  Returns:
612  None
613  """
614  try:
615  session = kwargs["con"]._session
616  mem_type = kwargs["mem_type"]
617  if mem_type == 'cpu':
618  kwargs["con"]._client.clear_cpu_memory(session)
619  elif mem_type == 'gpu':
620  kwargs["con"]._client.clear_gpu_memory(session)
621  else:
622  raise TypeError("Unkown mem_type '" + str(mem_type) + "' supplied to 'clear_memory'")
623  except Exception as e:
624  errormessage = "Clear memory failed with error: " + str(e)
625  logging.error(errormessage)
626 
627 
629  """
630  Clears system caches
631  """
632  try:
633  os.system('sudo sh -c "/bin/echo 3 > /proc/sys/vm/drop_caches"')
634  except Exception as e:
635  errormessage = "Clear system caches failed with error: " + str(e)
636  logging.error(errormessage)
637 
638 
639 def get_mem_usage(**kwargs):
640  """
641  Calculates memory statistics from mapd_server _client.get_memory call
642 
643  Kwargs:
644  con(class 'pymapd.connection.Connection'): Mapd connection
645  mem_type(str): [gpu, cpu] Type of memory to gather metrics for
646 
647  Returns:
648  ramusage(dict):::
649  usedram(float): Amount of memory (in MB) used
650  freeram(float): Amount of memory (in MB) free
651  totalallocated(float): Total amount of memory (in MB) allocated
652  errormessage(str): Error if returned by get_memory call
653  rawdata(list): Raw data returned from get_memory call
654  """
655  try:
656  con_mem_data_list = kwargs["con"]._client.get_memory(
657  session=kwargs["con"]._session, memory_level=kwargs["mem_type"]
658  )
659  usedram = 0
660  freeram = 0
661  for con_mem_data in con_mem_data_list:
662  page_size = con_mem_data.page_size
663  node_memory_data_list = con_mem_data.node_memory_data
664  for node_memory_data in node_memory_data_list:
665  ram = node_memory_data.num_pages * page_size
666  is_free = node_memory_data.is_free
667  if is_free:
668  freeram += ram
669  else:
670  usedram += ram
671  totalallocated = usedram + freeram
672  if totalallocated > 0:
673  totalallocated = round(totalallocated / 1024 / 1024, 1)
674  usedram = round(usedram / 1024 / 1024, 1)
675  freeram = round(freeram / 1024 / 1024, 1)
676  ramusage = {}
677  ramusage["usedram"] = usedram
678  ramusage["freeram"] = freeram
679  ramusage["totalallocated"] = totalallocated
680  ramusage["errormessage"] = ""
681  except Exception as e:
682  errormessage = "Get memory failed with error: " + str(e)
683  logging.error(errormessage)
684  ramusage["errormessage"] = errormessage
685  return ramusage
686 
687 
688 def run_query(**kwargs):
689  """
690  Takes query name, syntax, and iteration count and calls the
691  execute_query function for each iteration. Reports total, iteration,
692  and exec timings, memory usage, and failure status.
693 
694  Kwargs:
695  query(dict):::
696  name(str): Name of query
697  mapdql(str): Query syntax to run
698  iterations(int): Number of iterations of each query to run
699  trim(float): Trim decimal to remove from top and bottom of results
700  con(class 'pymapd.connection.Connection'): Mapd connection
701  clear_all_memory_pre_query(bool,optional): Flag to determine if memory is cleared
702  between query runs
703 
704  Returns:
705  query_results(dict):::
706  query_name(str): Name of query
707  query_mapdql(str): Query to run
708  query_id(str): Query ID
709  query_succeeded(bool): Query succeeded
710  query_error_info(str): Query error info
711  result_count(int): Number of results returned
712  initial_iteration_results(dict):::
713  first_execution_time(float): Execution time for first query
714  iteration
715  first_connect_time(float): Connect time for first query
716  iteration
717  first_results_iter_time(float): Results iteration time for
718  first query iteration
719  first_total_time(float): Total time for first iteration
720  first_cpu_mem_usage(float): CPU memory usage for first query
721  iteration
722  first_gpu_mem_usage(float): GPU memory usage for first query
723  iteration
724  noninitial_iteration_results(list):::
725  execution_time(float): Time (in ms) that pymapd reports
726  backend spent on query.
727  connect_time(float): Time (in ms) for overhead of query,
728  calculated by subtracting backend execution time from
729  time spent on the execution function.
730  results_iter_time(float): Time (in ms) it took to for
731  pymapd.fetchone() to iterate through all of the results.
732  total_time(float): Time (in ms) from adding all above times.
733  query_total_elapsed_time(int): Total elapsed time for query
734  False(bool): The query failed. Exception should be logged.
735  """
736  logging.info(
737  "Running query: "
738  + kwargs["query"]["name"]
739  + " iterations: "
740  + str(kwargs["iterations"])
741  )
742  query_id = kwargs["query"]["name"].rsplit(".")[
743  0
744  ] # Query ID = filename without extention
745  query_results = {
746  "query_name": kwargs["query"]["name"],
747  "query_mapdql": kwargs["query"]["mapdql"],
748  "query_id": query_id,
749  "query_succeeded": True,
750  "query_error_info": "",
751  "initial_iteration_results": {},
752  "noninitial_iteration_results": [],
753  "query_total_elapsed_time": 0,
754  }
755  query_total_start_time = timeit.default_timer()
756  # Run iterations of query
757  for iteration in range(kwargs["iterations"]):
758  # Gather memory before running query iteration
759  logging.debug("Getting pre-query memory usage on CPU")
760  pre_query_cpu_mem_usage = get_mem_usage(
761  con=kwargs["con"], mem_type="cpu"
762  )
763  logging.debug("Getting pre-query memory usage on GPU")
764  pre_query_gpu_mem_usage = get_mem_usage(
765  con=kwargs["con"], mem_type="gpu"
766  )
767  if "clear_all_memory_pre_query" in kwargs and kwargs["clear_all_memory_pre_query"]:
768  # Clear GPU & CPU memory
769  clear_memory(
770  con=kwargs["con"], mem_type="cpu"
771  )
772  clear_memory(
773  con=kwargs["con"], mem_type="gpu"
774  )
776  # Run query iteration
777  logging.debug(
778  "Running iteration "
779  + str(iteration)
780  + " of query "
781  + kwargs["query"]["name"]
782  )
783  query_result = execute_query(
784  query_name=kwargs["query"]["name"],
785  query_mapdql=kwargs["query"]["mapdql"],
786  iteration=iteration,
787  con=kwargs["con"],
788  )
789  # Gather memory after running query iteration
790  logging.debug("Getting post-query memory usage on CPU")
791  post_query_cpu_mem_usage = get_mem_usage(
792  con=kwargs["con"], mem_type="cpu"
793  )
794  logging.debug("Getting post-query memory usage on GPU")
795  post_query_gpu_mem_usage = get_mem_usage(
796  con=kwargs["con"], mem_type="gpu"
797  )
798  # Calculate total (post minus pre) memory usage after query iteration
799  query_cpu_mem_usage = round(
800  post_query_cpu_mem_usage["usedram"]
801  - pre_query_cpu_mem_usage["usedram"],
802  1,
803  )
804  query_gpu_mem_usage = round(
805  post_query_gpu_mem_usage["usedram"]
806  - pre_query_gpu_mem_usage["usedram"],
807  1,
808  )
809  if query_result:
810  query_results.update(
811  query_error_info="" # TODO - interpret query error info
812  )
813  # Assign first query iteration times
814  if iteration == 0:
815  first_execution_time = round(query_result["execution_time"], 1)
816  first_connect_time = round(query_result["connect_time"], 1)
817  first_results_iter_time = round(
818  query_result["results_iter_time"], 1
819  )
820  first_total_time = (
821  first_execution_time
822  + first_connect_time
823  + first_results_iter_time
824  )
825  query_results.update(
826  initial_iteration_results={
827  "first_execution_time": first_execution_time,
828  "first_connect_time": first_connect_time,
829  "first_results_iter_time": first_results_iter_time,
830  "first_total_time": first_total_time,
831  "first_cpu_mem_usage": query_cpu_mem_usage,
832  "first_gpu_mem_usage": query_gpu_mem_usage,
833  }
834  )
835  else:
836  # Put noninitial iterations into query_result list
837  query_results["noninitial_iteration_results"].append(
838  query_result
839  )
840  # Verify no change in memory for noninitial iterations
841  if query_cpu_mem_usage != 0.0:
842  logging.error(
843  (
844  "Noninitial iteration ({0}) of query ({1}) "
845  + "shows non-zero CPU memory usage: {2}"
846  ).format(
847  iteration,
848  kwargs["query"]["name"],
849  query_cpu_mem_usage,
850  )
851  )
852  if query_gpu_mem_usage != 0.0:
853  logging.error(
854  (
855  "Noninitial iteration ({0}) of query ({1}) "
856  + "shows non-zero GPU memory usage: {2}"
857  ).format(
858  iteration,
859  kwargs["query"]["name"],
860  query_gpu_mem_usage,
861  )
862  )
863  else:
864  logging.warning(
865  "Error detected during execution of query: "
866  + kwargs["query"]["name"]
867  + ". This query will be skipped and "
868  + "times will not reported"
869  )
870  query_results.update(query_succeeded=False)
871  break
872  # Calculate time for all iterations to run
873  query_total_elapsed_time = round(
874  ((timeit.default_timer() - query_total_start_time) * 1000), 1
875  )
876  query_results.update(query_total_elapsed_time=query_total_elapsed_time)
877  logging.info(
878  "Completed all iterations of query " + kwargs["query"]["name"]
879  )
880  return query_results
881 
882 
884  """
885  Convenience wrapper around `run_query` to run a setup or
886  teardown query
887 
888  Kwargs:
889  queries(query_list): List of queries to run
890  do_run(bool): If true will run query, otherwise do nothing
891  trim(float): Trim decimal to remove from top and bottom of results
892  con(class 'pymapd.connection.Connection'): Mapd connection
893 
894  Returns:
895  See return value for `run_query`
896 
897  query_list is described by:
898  queries(list)
899  query(dict):::
900  name(str): Name of query
901  mapdql(str): Query syntax to run
902  [setup : queries(list)]
903  [teardown : queries(list)]
904  """
905  query_results = list()
906  if kwargs['do_run']:
907  for query in kwargs['queries']:
908  result = run_query(
909  query=query, iterations=1,
910  trim=kwargs['trim'],
911  con=kwargs['con']
912  )
913  if not result['query_succeeded']:
914  logging.warning(
915  "Error setup or teardown query: "
916  + query["name"]
917  + ". did not complete."
918  )
919  else:
920  query_results.append(result)
921  return query_results
922 
923 
925  # Function to allow json to deal with datetime and numpy int
926  if isinstance(x, datetime.datetime):
927  return x.isoformat()
928  if isinstance(x, numpy.int64):
929  return int(x)
930  raise TypeError("Unknown type")
931 
932 
934  """
935  Create results dataset
936 
937  Kwargs:
938  run_guid(str): Run GUID
939  run_timestamp(datetime): Run timestamp
940  run_connection(str): Connection string
941  run_machine_name(str): Run machine name
942  run_machine_uname(str): Run machine uname
943  run_driver(str): Run driver
944  run_version(str): Version of DB
945  run_version_short(str): Shortened version of DB
946  label(str): Run label
947  source_db_gpu_count(int): Number of GPUs on run machine
948  source_db_gpu_driver_ver(str): GPU driver version
949  source_db_gpu_name(str): GPU name
950  source_db_gpu_mem(str): Amount of GPU mem on run machine
951  source_table(str): Table to run query against
952  trim(float): Trim decimal to remove from top and bottom of results
953  iterations(int): Number of iterations of each query to run
954  query_group(str): Query group, usually matches table name
955  query_results(dict):::
956  query_name(str): Name of query
957  query_mapdql(str): Query to run
958  query_id(str): Query ID
959  query_succeeded(bool): Query succeeded
960  query_error_info(str): Query error info
961  result_count(int): Number of results returned
962  initial_iteration_results(dict):::
963  first_execution_time(float): Execution time for first query
964  iteration
965  first_connect_time(float): Connect time for first query
966  iteration
967  first_results_iter_time(float): Results iteration time for
968  first query iteration
969  first_total_time(float): Total time for first iteration
970  first_cpu_mem_usage(float): CPU memory usage for first query
971  iteration
972  first_gpu_mem_usage(float): GPU memory usage for first query
973  iteration
974  noninitial_iteration_results(list):::
975  execution_time(float): Time (in ms) that pymapd reports
976  backend spent on query.
977  connect_time(float): Time (in ms) for overhead of query,
978  calculated by subtracting backend execution time from
979  time spent on the execution function.
980  results_iter_time(float): Time (in ms) it took to for
981  pymapd.fetchone() to iterate through all of the results.
982  total_time(float): Time (in ms) from adding all above times.
983  query_total_elapsed_time(int): Total elapsed time for query
984 
985  Returns:
986  results_dataset(list):::
987  result_dataset(dict): Query results dataset
988  """
989  results_dataset = []
990  for query_results in kwargs["queries_results"]:
991  if query_results["query_succeeded"]:
992  # Aggregate iteration values
993  execution_times, connect_times, results_iter_times, total_times = (
994  [],
995  [],
996  [],
997  [],
998  )
999  detailed_timing_last_iteration = {}
1000  if len(query_results["noninitial_iteration_results"]) == 0:
1001  # A single query run (most likely a setup or teardown query)
1002  initial_result = query_results["initial_iteration_results"]
1003  execution_times.append(initial_result["first_execution_time"])
1004  connect_times.append(initial_result["first_connect_time"])
1005  results_iter_times.append(
1006  initial_result["first_results_iter_time"]
1007  )
1008  total_times.append(initial_result["first_total_time"])
1009  # Special case
1010  result_count = 1
1011  else:
1012  # More than one query run
1013  for noninitial_result in query_results[
1014  "noninitial_iteration_results"
1015  ]:
1016  execution_times.append(noninitial_result["execution_time"])
1017  connect_times.append(noninitial_result["connect_time"])
1018  results_iter_times.append(
1019  noninitial_result["results_iter_time"]
1020  )
1021  total_times.append(noninitial_result["total_time"])
1022  # Overwrite result count, same for each iteration
1023  result_count = noninitial_result["result_count"]
1024 
1025  # If available, getting the last iteration's component-wise timing information as a json structure
1026  if (
1027  query_results["noninitial_iteration_results"][-1]["debug_info"]
1028  ):
1029  detailed_timing_last_iteration = json.loads(
1030  query_results["noninitial_iteration_results"][-1][
1031  "debug_info"
1032  ]
1033  )["timer"]
1034  # Calculate query times
1035  logging.debug(
1036  "Calculating times from query " + query_results["query_id"]
1037  )
1038  query_times = calculate_query_times(
1039  total_times=total_times,
1040  execution_times=execution_times,
1041  connect_times=connect_times,
1042  results_iter_times=results_iter_times,
1043  trim=kwargs[
1044  "trim"
1045  ], # Trim top and bottom n% for trimmed calculations
1046  )
1047  result_dataset = {
1048  "name": query_results["query_name"],
1049  "mapdql": query_results["query_mapdql"],
1050  "succeeded": True,
1051  "results": {
1052  "run_guid": kwargs["run_guid"],
1053  "run_timestamp": kwargs["run_timestamp"],
1054  "run_connection": kwargs["run_connection"],
1055  "run_machine_name": kwargs["run_machine_name"],
1056  "run_machine_uname": kwargs["run_machine_uname"],
1057  "run_driver": kwargs["run_driver"],
1058  "run_version": kwargs["run_version"],
1059  "run_version_short": kwargs["run_version_short"],
1060  "run_label": kwargs["label"],
1061  "run_gpu_count": kwargs["source_db_gpu_count"],
1062  "run_gpu_driver_ver": kwargs["source_db_gpu_driver_ver"],
1063  "run_gpu_name": kwargs["source_db_gpu_name"],
1064  "run_gpu_mem_mb": kwargs["source_db_gpu_mem"],
1065  "run_table": kwargs["source_table"],
1066  "query_group": kwargs["query_group"],
1067  "query_id": query_results["query_id"],
1068  "query_result_set_count": result_count,
1069  "query_error_info": query_results["query_error_info"],
1070  "query_conn_first": query_results[
1071  "initial_iteration_results"
1072  ]["first_connect_time"],
1073  "query_conn_avg": query_times["connect_time_avg"],
1074  "query_conn_min": query_times["connect_time_min"],
1075  "query_conn_max": query_times["connect_time_max"],
1076  "query_conn_85": query_times["connect_time_85"],
1077  "query_exec_first": query_results[
1078  "initial_iteration_results"
1079  ]["first_execution_time"],
1080  "query_exec_avg": query_times["execution_time_avg"],
1081  "query_exec_min": query_times["execution_time_min"],
1082  "query_exec_max": query_times["execution_time_max"],
1083  "query_exec_85": query_times["execution_time_85"],
1084  "query_exec_25": query_times["execution_time_25"],
1085  "query_exec_stdd": query_times["execution_time_std"],
1086  "query_exec_trimmed_avg": query_times[
1087  "execution_time_trimmed_avg"
1088  ],
1089  "query_exec_trimmed_max": query_times[
1090  "execution_time_trimmed_max"
1091  ],
1092  # Render queries not supported yet
1093  "query_render_first": None,
1094  "query_render_avg": None,
1095  "query_render_min": None,
1096  "query_render_max": None,
1097  "query_render_85": None,
1098  "query_render_25": None,
1099  "query_render_stdd": None,
1100  "query_total_first": query_results[
1101  "initial_iteration_results"
1102  ]["first_total_time"],
1103  "query_total_avg": query_times["total_time_avg"],
1104  "query_total_min": query_times["total_time_min"],
1105  "query_total_max": query_times["total_time_max"],
1106  "query_total_85": query_times["total_time_85"],
1107  "query_total_all": query_results[
1108  "query_total_elapsed_time"
1109  ],
1110  "query_total_trimmed_avg": query_times[
1111  "total_time_trimmed_avg"
1112  ],
1113  "results_iter_count": kwargs["iterations"],
1114  "results_iter_first": query_results[
1115  "initial_iteration_results"
1116  ]["first_results_iter_time"],
1117  "results_iter_avg": query_times["results_iter_time_avg"],
1118  "results_iter_min": query_times["results_iter_time_min"],
1119  "results_iter_max": query_times["results_iter_time_max"],
1120  "results_iter_85": query_times["results_iter_time_85"],
1121  "cpu_mem_usage_mb": query_results[
1122  "initial_iteration_results"
1123  ]["first_cpu_mem_usage"],
1124  "gpu_mem_usage_mb": query_results[
1125  "initial_iteration_results"
1126  ]["first_gpu_mem_usage"],
1127  },
1128  "debug": {
1129  "query_exec_times": query_times["execution_times"],
1130  "query_total_times": query_times["total_times"],
1131  "detailed_timing_last_iteration": detailed_timing_last_iteration,
1132  },
1133  }
1134  elif not query_results["query_succeeded"]:
1135  result_dataset = {
1136  "name": query_results["query_name"],
1137  "mapdql": query_results["query_mapdql"],
1138  "succeeded": False,
1139  }
1140  results_dataset.append(result_dataset)
1141  logging.debug("All values set for query " + query_results["query_id"])
1142  return results_dataset
1143 
1144 
1145 def send_results_db(**kwargs):
1146  """
1147  Send results dataset to a database using pymapd
1148 
1149  Kwargs:
1150  results_dataset(list):::
1151  result_dataset(dict): Query results dataset
1152  table(str): Results destination table name
1153  db_user(str): Results destination user name
1154  db_passwd(str): Results destination password
1155  db_server(str): Results destination server address
1156  db_port(int): Results destination server port
1157  db_name(str): Results destination database name
1158  table_schema_file(str): Path to destination database schema file
1159 
1160  Returns:
1161  True(bool): Sending results to destination database succeeded
1162  False(bool): Sending results to destination database failed. Exception
1163  should be logged.
1164  """
1165  # Create dataframe from list of query results
1166  logging.debug("Converting results list to pandas dataframe")
1167  results_df = DataFrame(kwargs["results_dataset"])
1168  # Establish connection to destination db
1169  logging.debug("Connecting to destination db")
1170  dest_con = get_connection(
1171  db_user=kwargs["db_user"],
1172  db_passwd=kwargs["db_passwd"],
1173  db_server=kwargs["db_server"],
1174  db_port=kwargs["db_port"],
1175  db_name=kwargs["db_name"],
1176  )
1177  if not dest_con:
1178  logging.exception("Could not connect to destination db.")
1179  return False
1180  # Load results into db, creating table if it does not exist
1181  tables = dest_con.get_tables()
1182  if kwargs["table"] not in tables:
1183  logging.info("Destination table does not exist. Creating.")
1184  try:
1185  with open(kwargs["table_schema_file"], "r") as table_schema:
1186  logging.debug(
1187  "Reading table_schema_file: " + kwargs["table_schema_file"]
1188  )
1189  create_table_sql = table_schema.read().replace("\n", " ")
1190  create_table_sql = create_table_sql.replace(
1191  "##TAB##", kwargs["table"]
1192  )
1193  except FileNotFoundError:
1194  logging.exception("Could not find destination table_schema_file.")
1195  return False
1196  try:
1197  logging.debug("Executing create destination table query")
1198  dest_con.execute(create_table_sql)
1199  logging.debug("Destination table created.")
1200  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
1201  logging.exception("Error running destination table creation")
1202  return False
1203  logging.info("Loading results into destination db")
1204  try:
1205  dest_con.load_table_columnar(
1206  kwargs["table"],
1207  results_df,
1208  preserve_index=False,
1209  chunk_size_bytes=0,
1210  col_names_from_schema=True,
1211  )
1212  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
1213  logging.exception("Error loading results into destination db")
1214  dest_con.close()
1215  return False
1216  dest_con.close()
1217  return True
1218 
1219 
1221  """
1222  Send results dataset to a local json file
1223 
1224  Kwargs:
1225  results_dataset_json(str): Json-formatted query results dataset
1226  output_file_json (str): Location of .json file output
1227 
1228  Returns:
1229  True(bool): Sending results to json file succeeded
1230  False(bool): Sending results to json file failed. Exception
1231  should be logged.
1232  """
1233  try:
1234  logging.debug("Opening json output file for writing")
1235  with open(kwargs["output_file_json"], "w") as file_json_open:
1236  logging.info(
1237  "Writing to output json file: " + kwargs["output_file_json"]
1238  )
1239  file_json_open.write(kwargs["results_dataset_json"])
1240  return True
1241  except IOError:
1242  logging.exception("Error writing results to json output file")
1243  return False
1244 
1245 
1247  """
1248  Send results dataset to a local json file formatted for use with jenkins
1249  benchmark plugin: https://github.com/jenkinsci/benchmark-plugin
1250 
1251  Kwargs:
1252  results_dataset(list):::
1253  result_dataset(dict): Query results dataset
1254  thresholds_name(str): Name to use for Jenkins result field
1255  thresholds_field(str): Field to use for query threshold in jenkins
1256  output_tag_jenkins(str): Jenkins benchmark result tag, for different
1257  sets from same table
1258  output_file_jenkins (str): Location of .json jenkins file output
1259 
1260  Returns:
1261  True(bool): Sending results to json file succeeded
1262  False(bool): Sending results to json file failed. Exception
1263  should be logged.
1264  """
1265  jenkins_bench_results = []
1266  for result_dataset in kwargs["results_dataset"]:
1267  logging.debug("Constructing output for jenkins benchmark plugin")
1268  jenkins_bench_results.append(
1269  {
1270  "name": result_dataset["query_id"],
1271  "description": "",
1272  "parameters": [],
1273  "results": [
1274  {
1275  "name": result_dataset["query_id"]
1276  + "_"
1277  + kwargs["thresholds_name"],
1278  "description": "",
1279  "unit": "ms",
1280  "dblValue": result_dataset[kwargs["thresholds_field"]],
1281  }
1282  ],
1283  }
1284  )
1285  jenkins_bench_json = json.dumps(
1286  {
1287  "groups": [
1288  {
1289  "name": result_dataset["run_table"]
1290  + kwargs["output_tag_jenkins"],
1291  "description": "Source table: "
1292  + result_dataset["run_table"],
1293  "tests": jenkins_bench_results,
1294  }
1295  ]
1296  }
1297  )
1298  try:
1299  logging.debug("Opening jenkins_bench json output file for writing")
1300  with open(kwargs["output_file_jenkins"], "w") as file_jenkins_open:
1301  logging.info(
1302  "Writing to jenkins_bench json file: "
1303  + kwargs["output_file_jenkins"]
1304  )
1305  file_jenkins_open.write(jenkins_bench_json)
1306  return True
1307  except IOError:
1308  logging.exception("Error writing results to jenkins json output file")
1309  return False
1310 
1311 
1312 def send_results_output(**kwargs):
1313  """
1314  Send results dataset script output
1315 
1316  Kwargs:
1317  results_dataset_json(str): Json-formatted query results dataset
1318 
1319  Returns:
1320  True(bool): Sending results to output succeeded
1321  """
1322  logging.info("Printing query results to output")
1323  print(kwargs["results_dataset_json"])
1324  return True
1325 
1326 
1327 def process_arguments(input_arguments):
1328  # Parse input parameters
1329  parser = ArgumentParser()
1330  optional = parser._action_groups.pop()
1331  required = parser.add_argument_group("required arguments")
1332  parser._action_groups.append(optional)
1333  optional.add_argument(
1334  "-v", "--verbose", action="store_true", help="Turn on debug logging"
1335  )
1336  optional.add_argument(
1337  "-q",
1338  "--quiet",
1339  action="store_true",
1340  help="Suppress script outuput " + "(except warnings and errors)",
1341  )
1342  required.add_argument(
1343  "-u",
1344  "--user",
1345  dest="user",
1346  default="mapd",
1347  help="Source database user",
1348  )
1349  required.add_argument(
1350  "-p",
1351  "--passwd",
1352  dest="passwd",
1353  default="HyperInteractive",
1354  help="Source database password",
1355  )
1356  required.add_argument(
1357  "-s",
1358  "--server",
1359  dest="server",
1360  default="localhost",
1361  help="Source database server hostname",
1362  )
1363  optional.add_argument(
1364  "-o",
1365  "--port",
1366  dest="port",
1367  type=int,
1368  default=6274,
1369  help="Source database server port",
1370  )
1371  required.add_argument(
1372  "-n",
1373  "--name",
1374  dest="name",
1375  default="mapd",
1376  help="Source database name",
1377  )
1378  required.add_argument(
1379  "-t",
1380  "--table",
1381  dest="table",
1382  required=True,
1383  help="Source db table name",
1384  )
1385  required.add_argument(
1386  "-l",
1387  "--label",
1388  dest="label",
1389  required=True,
1390  help="Benchmark run label",
1391  )
1392  required.add_argument(
1393  "-d",
1394  "--queries-dir",
1395  dest="queries_dir",
1396  help='Absolute path to dir with query files. \
1397  [Default: "queries" dir in same location as script]',
1398  )
1399  required.add_argument(
1400  "-i",
1401  "--iterations",
1402  dest="iterations",
1403  type=int,
1404  required=True,
1405  help="Number of iterations per query. Must be > 1",
1406  )
1407  optional.add_argument(
1408  "-g",
1409  "--gpu-count",
1410  dest="gpu_count",
1411  type=int,
1412  default=None,
1413  help="Number of GPUs. Not required when gathering local gpu info",
1414  )
1415  optional.add_argument(
1416  "-G",
1417  "--gpu-name",
1418  dest="gpu_name",
1419  type=str,
1420  default="",
1421  help="Name of GPU(s). Not required when gathering local gpu info",
1422  )
1423  optional.add_argument(
1424  "--no-gather-conn-gpu-info",
1425  dest="no_gather_conn_gpu_info",
1426  action="store_true",
1427  help="Do not gather source database GPU info fields "
1428  + "[run_gpu_count, run_gpu_mem_mb] "
1429  + "using pymapd connection info. "
1430  + "Use when testing a CPU-only server.",
1431  )
1432  optional.add_argument(
1433  "--no-gather-nvml-gpu-info",
1434  dest="no_gather_nvml_gpu_info",
1435  action="store_true",
1436  help="Do not gather source database GPU info fields "
1437  + "[gpu_driver_ver, run_gpu_name] "
1438  + "from local GPU using pynvml. "
1439  + 'Defaults to True when source server is not "localhost". '
1440  + "Use when testing a CPU-only server.",
1441  )
1442  optional.add_argument(
1443  "--gather-nvml-gpu-info",
1444  dest="gather_nvml_gpu_info",
1445  action="store_true",
1446  help="Gather source database GPU info fields "
1447  + "[gpu_driver_ver, run_gpu_name] "
1448  + "from local GPU using pynvml. "
1449  + 'Defaults to True when source server is "localhost". '
1450  + "Only use when benchmarking against same machine that this script "
1451  + "is run from.",
1452  )
1453  optional.add_argument(
1454  "-m",
1455  "--machine-name",
1456  dest="machine_name",
1457  help="Name of source machine",
1458  )
1459  optional.add_argument(
1460  "-a",
1461  "--machine-uname",
1462  dest="machine_uname",
1463  help="Uname info from " + "source machine",
1464  )
1465  optional.add_argument(
1466  "-e",
1467  "--destination",
1468  dest="destination",
1469  default="mapd_db",
1470  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
1471  + "Multiple values can be input seperated by commas, "
1472  + 'ex: "mapd_db,file_json"',
1473  )
1474  optional.add_argument(
1475  "-U",
1476  "--dest-user",
1477  dest="dest_user",
1478  default="mapd",
1479  help="Destination mapd_db database user",
1480  )
1481  optional.add_argument(
1482  "-P",
1483  "--dest-passwd",
1484  dest="dest_passwd",
1485  default="HyperInteractive",
1486  help="Destination mapd_db database password",
1487  )
1488  optional.add_argument(
1489  "-S",
1490  "--dest-server",
1491  dest="dest_server",
1492  help="Destination mapd_db database server hostname"
1493  + ' (required if destination = "mapd_db")',
1494  )
1495  optional.add_argument(
1496  "-O",
1497  "--dest-port",
1498  dest="dest_port",
1499  type=int,
1500  default=6274,
1501  help="Destination mapd_db database server port",
1502  )
1503  optional.add_argument(
1504  "-N",
1505  "--dest-name",
1506  dest="dest_name",
1507  default="mapd",
1508  help="Destination mapd_db database name",
1509  )
1510  optional.add_argument(
1511  "-T",
1512  "--dest-table",
1513  dest="dest_table",
1514  default="results",
1515  help="Destination mapd_db table name",
1516  )
1517  optional.add_argument(
1518  "-C",
1519  "--dest-table-schema-file",
1520  dest="dest_table_schema_file",
1521  default="results_table_schemas/query-results.sql",
1522  help="Destination table schema file. This must be an executable "
1523  + "CREATE TABLE statement that matches the output of this script. It "
1524  + "is required when creating the results table. Default location is "
1525  + 'in "./results_table_schemas/query-results.sql"',
1526  )
1527  optional.add_argument(
1528  "-j",
1529  "--output-file-json",
1530  dest="output_file_json",
1531  help="Absolute path of .json output file "
1532  + '(required if destination = "file_json")',
1533  )
1534  optional.add_argument(
1535  "-J",
1536  "--output-file-jenkins",
1537  dest="output_file_jenkins",
1538  help="Absolute path of jenkins benchmark .json output file "
1539  + '(required if destination = "jenkins_bench")',
1540  )
1541  optional.add_argument(
1542  "-E",
1543  "--output-tag-jenkins",
1544  dest="output_tag_jenkins",
1545  default="",
1546  help="Jenkins benchmark result tag. "
1547  + 'Optional, appended to table name in "group" field',
1548  )
1549  optional.add_argument(
1550  "--setup-teardown-queries-dir",
1551  dest="setup_teardown_queries_dir",
1552  type=str,
1553  default=None,
1554  help='Absolute path to dir with setup & teardown query files. '
1555  'Query files with "setup" in the filename will be executed in '
1556  'the setup stage, likewise query files with "teardown" in '
1557  'the filenname will be executed in the tear-down stage. Queries '
1558  'execute in lexical order. [Default: None, meaning this option is '
1559  'not used.]',
1560  )
1561  optional.add_argument(
1562  "--clear-all-memory-pre-query",
1563  dest="clear_all_memory_pre_query",
1564  action="store_true",
1565  help='Clear gpu & cpu memory before every query.'
1566  ' [Default: False]'
1567  )
1568  optional.add_argument(
1569  "--run-setup-teardown-per-query",
1570  dest="run_setup_teardown_per_query",
1571  action="store_true",
1572  help='Run setup & teardown steps per query. '
1573  'If set, setup-teardown-queries-dir must be specified. '
1574  'If not set, but setup-teardown-queries-dir is specified '
1575  'setup & tear-down queries will run globally, that is, '
1576  'once per script invocation.'
1577  ' [Default: False]'
1578  )
1579  optional.add_argument(
1580  "-F",
1581  "--foreign-table-filename",
1582  dest="foreign_table_filename",
1583  default=None,
1584  help="Path to file containing template for import query. "
1585  "Path must be relative to the FOREIGN SERVER. "
1586  "Occurances of \"##FILE##\" within setup/teardown queries will be"
1587  " replaced with this. "
1588  )
1589  optional.add_argument(
1590  "--jenkins-thresholds-name",
1591  dest="jenkins_thresholds_name",
1592  default="average",
1593  help="Name of Jenkins output field.",
1594  )
1595  optional.add_argument(
1596  "--jenkins-thresholds-field",
1597  dest="jenkins_thresholds_field",
1598  default="query_exec_trimmed_avg",
1599  help="Field to report as jenkins output value.",
1600  )
1601  optional.add_argument(
1602  "--cuda-block-grid-perf-test",
1603  dest="cuda_block_grid_perf_test",
1604  action="store_true",
1605  help="Performance test while varying cuda block and grid sizes.",
1606  )
1607  optional.add_argument(
1608  "--show-simplified-result",
1609  dest="show_simplified_result",
1610  action="store_true",
1611  help="Show simplified benchmark result per query via stdout.",
1612  )
1613  optional.add_argument(
1614  "--cuda-grid-sizes",
1615  dest="cuda_grid_sizes",
1616  nargs="+", type=float,
1617  help="List of grid size multipliers used to benchmark (valid iff --cuda-block-grid-perf-test is enabled).",
1618  )
1619  optional.add_argument(
1620  "--cuda-block-sizes",
1621  dest="cuda_block_sizes",
1622  nargs="+", type=int,
1623  help="List of block used to benchmark (valid iff --cuda-block-grid-perf-test is enabled).",
1624  )
1625  args = parser.parse_args(args=input_arguments)
1626  return args
1627 
1628 
1629 def benchmark(input_arguments):
1630  # Set input args to vars
1631  args = process_arguments(input_arguments)
1632  verbose = args.verbose
1633  quiet = args.quiet
1634  source_db_user = args.user
1635  source_db_passwd = args.passwd
1636  source_db_server = args.server
1637  source_db_port = args.port
1638  source_db_name = args.name
1639  source_table = args.table
1640  label = args.label
1641  queries_dir = args.queries_dir
1642  iterations = args.iterations
1643  gpu_count = args.gpu_count
1644  gpu_name = args.gpu_name
1645  no_gather_conn_gpu_info = args.no_gather_conn_gpu_info
1646  no_gather_nvml_gpu_info = args.no_gather_nvml_gpu_info
1647  gather_nvml_gpu_info = args.gather_nvml_gpu_info
1648  machine_name = args.machine_name
1649  machine_uname = args.machine_uname
1650  destinations = args.destination
1651  dest_db_user = args.dest_user
1652  dest_db_passwd = args.dest_passwd
1653  dest_db_server = args.dest_server
1654  dest_db_port = args.dest_port
1655  dest_db_name = args.dest_name
1656  dest_table = args.dest_table
1657  dest_table_schema_file = args.dest_table_schema_file
1658  output_file_json = args.output_file_json
1659  output_file_jenkins = args.output_file_jenkins
1660  output_tag_jenkins = args.output_tag_jenkins
1661  setup_teardown_queries_dir = args.setup_teardown_queries_dir
1662  run_setup_teardown_per_query = args.run_setup_teardown_per_query
1663  foreign_table_filename = args.foreign_table_filename
1664  jenkins_thresholds_name = args.jenkins_thresholds_name
1665  jenkins_thresholds_field = args.jenkins_thresholds_field
1666  clear_all_memory_pre_query = args.clear_all_memory_pre_query
1667  cuda_block_grid_perf_test = args.cuda_block_grid_perf_test
1668  show_simplified_result = args.show_simplified_result
1669  if args.cuda_grid_sizes is None:
1670  grid_size_range = [2]
1671  else:
1672  grid_size_range = args.cuda_grid_sizes
1673  if args.cuda_block_sizes is None:
1674  block_size_range = [1024]
1675  else:
1676  block_size_range = args.cuda_block_sizes
1677 
1678  # Hard-coded vars
1679  trim = 0.15
1680 
1681  # Set logging output level
1682  if verbose:
1683  logging.basicConfig(level=logging.DEBUG)
1684  elif quiet:
1685  logging.basicConfig(level=logging.WARNING)
1686  else:
1687  logging.basicConfig(level=logging.INFO)
1688 
1689  # Input validation
1690  if (iterations > 1) is not True:
1691  # Need > 1 iteration as first iteration is dropped from calculations
1692  logging.error("Iterations must be greater than 1")
1693  exit(1)
1695  destinations=destinations,
1696  dest_db_server=dest_db_server,
1697  output_file_json=output_file_json,
1698  output_file_jenkins=output_file_jenkins,
1699  ):
1700  logging.debug("Destination(s) have been verified.")
1701  else:
1702  logging.error("No valid destination(s) have been set. Exiting.")
1703  exit(1)
1704 
1705  # Establish connection to mapd db
1706  con = get_connection(
1707  db_user=source_db_user,
1708  db_passwd=source_db_passwd,
1709  db_server=source_db_server,
1710  db_port=source_db_port,
1711  db_name=source_db_name,
1712  )
1713  if not con:
1714  exit(1) # Exit if cannot connect to db
1715  # Set run-specific variables (time, uid, etc.)
1716  run_vars = get_run_vars(con=con)
1717  # Set GPU info depending on availability
1718  gpu_info = get_gpu_info(
1719  gpu_name=gpu_name,
1720  no_gather_conn_gpu_info=no_gather_conn_gpu_info,
1721  con=con,
1722  conn_machine_name=run_vars["conn_machine_name"],
1723  no_gather_nvml_gpu_info=no_gather_nvml_gpu_info,
1724  gather_nvml_gpu_info=gather_nvml_gpu_info,
1725  gpu_count=gpu_count,
1726  )
1727  # Set run machine info
1728  machine_info = get_machine_info(
1729  conn_machine_name=run_vars["conn_machine_name"],
1730  machine_name=machine_name,
1731  machine_uname=machine_uname,
1732  )
1733  # Read queries from files, set to queries dir in PWD if not passed in
1734  if not queries_dir:
1735  queries_dir = os.path.join(os.path.dirname(__file__), "queries")
1736  query_list = read_query_files(
1737  queries_dir=queries_dir, source_table=source_table
1738  )
1739  if not query_list:
1740  exit(1)
1741  # Read setup/teardown queries if they exist
1742  setup_query_list, teardown_query_list = \
1743  read_setup_teardown_query_files(queries_dir=setup_teardown_queries_dir,
1744  source_table=source_table,
1745  foreign_table_filename=foreign_table_filename)
1746  # Check at what granularity we want to run setup or teardown queries at
1747  run_global_setup_queries = setup_query_list is not None and not run_setup_teardown_per_query
1748  run_per_query_setup_queries = setup_query_list is not None and run_setup_teardown_per_query
1749  run_global_teardown_queries = teardown_query_list is not None and not run_setup_teardown_per_query
1750  run_per_query_teardown_queries = teardown_query_list is not None and run_setup_teardown_per_query
1751  # Run global setup queries if they exist
1752  queries_results = []
1753  st_qr = run_setup_teardown_query(queries=setup_query_list,
1754  do_run=run_global_setup_queries, trim=trim, con=con)
1755  queries_results.extend(st_qr)
1756  new_query_list = {"query_group": "", "queries": []}
1757  new_query_list.update(query_group=query_list["query_group"])
1758  if cuda_block_grid_perf_test:
1759  for query_info in query_list["queries"]:
1760  query = query_info["mapdql"]
1761  for block_size in block_size_range:
1762  for grid_size in grid_size_range:
1763  query_hint = "SELECT /*+ g_cuda_block_size(" + str(block_size) + "), "
1764  query_hint += "g_cuda_grid_size_multiplier(" + str(grid_size) + ") */ "
1765  query_name = query_info["name"] + "_block_size_" + str(block_size) + "_grid_size_" + str(grid_size)
1766  new_query = re.sub("select", "SELECT", query, re.IGNORECASE)
1767  new_query = new_query.replace("SELECT", query_hint, 1)
1768  new_query_list["queries"].append(
1769  {"name": query_name, "mapdql": new_query})
1770  cuda_opt_query_hint = "SELECT /*+ cuda_opt_param */ "
1771  cuda_opt_query_name = query_info["name"] + "_block_size_-1_grid_size_-1"
1772  cuda_opt_new_query = re.sub("select", "SELECT", query, re.IGNORECASE)
1773  cuda_opt_new_query = cuda_opt_new_query.replace("SELECT", cuda_opt_query_hint, 1)
1774  new_query_list["queries"].append(
1775  {"name": cuda_opt_query_name, "mapdql": cuda_opt_new_query})
1776  query_list = new_query_list
1777 
1778  # Run queries
1779  for query in query_list["queries"]:
1780  # Run setup queries
1781  st_qr = run_setup_teardown_query(
1782  queries=setup_query_list, do_run=run_per_query_setup_queries, trim=trim, con=con)
1783  queries_results.extend(st_qr)
1784  # Run benchmark query
1785  query_result = run_query(
1786  query=query, iterations=iterations, trim=trim, con=con,
1787  clear_all_memory_pre_query=clear_all_memory_pre_query
1788  )
1789  queries_results.append(query_result)
1790  # Run tear-down queries
1791  st_qr = run_setup_teardown_query(
1792  queries=teardown_query_list, do_run=run_per_query_teardown_queries, trim=trim, con=con)
1793  queries_results.extend(st_qr)
1794  logging.info("Completed all queries.")
1795  # Run global tear-down queries if they exist
1796  st_qr = run_setup_teardown_query(queries=teardown_query_list,
1797  do_run=run_global_teardown_queries, trim=trim, con=con)
1798  queries_results.extend(st_qr)
1799  logging.debug("Closing source db connection.")
1800  con.close()
1801  # Generate results dataset
1802  results_dataset = create_results_dataset(
1803  run_guid=run_vars["run_guid"],
1804  run_timestamp=run_vars["run_timestamp"],
1805  run_connection=run_vars["run_connection"],
1806  run_machine_name=machine_info["run_machine_name"],
1807  run_machine_uname=machine_info["run_machine_uname"],
1808  run_driver=run_vars["run_driver"],
1809  run_version=run_vars["run_version"],
1810  run_version_short=run_vars["run_version_short"],
1811  label=label,
1812  source_db_gpu_count=gpu_info["source_db_gpu_count"],
1813  source_db_gpu_driver_ver=gpu_info["source_db_gpu_driver_ver"],
1814  source_db_gpu_name=gpu_info["source_db_gpu_name"],
1815  source_db_gpu_mem=gpu_info["source_db_gpu_mem"],
1816  source_table=source_table,
1817  trim=trim,
1818  iterations=iterations,
1819  query_group=query_list["query_group"],
1820  queries_results=queries_results,
1821  )
1822  results_dataset_json = json.dumps(
1823  results_dataset, default=json_format_handler, indent=2
1824  )
1825  successful_results_dataset = [
1826  x for x in results_dataset if x["succeeded"] is not False
1827  ]
1828  successful_results_dataset_results = []
1829  for results_dataset_entry in successful_results_dataset:
1830  successful_results_dataset_results.append(
1831  results_dataset_entry["results"]
1832  )
1833 
1834  # Send results to destination(s)
1835  sent_destination = True
1836  if "mapd_db" in destinations:
1837  if not send_results_db(
1838  results_dataset=successful_results_dataset_results,
1839  table=dest_table,
1840  db_user=dest_db_user,
1841  db_passwd=dest_db_passwd,
1842  db_server=dest_db_server,
1843  db_port=dest_db_port,
1844  db_name=dest_db_name,
1845  table_schema_file=dest_table_schema_file,
1846  ):
1847  sent_destination = False
1848  if "file_json" in destinations:
1849  if not send_results_file_json(
1850  results_dataset_json=results_dataset_json,
1851  output_file_json=output_file_json,
1852  ):
1853  sent_destination = False
1854  if "jenkins_bench" in destinations:
1856  results_dataset=successful_results_dataset_results,
1857  thresholds_name=jenkins_thresholds_name,
1858  thresholds_field=jenkins_thresholds_field,
1859  output_tag_jenkins=output_tag_jenkins,
1860  output_file_jenkins=output_file_jenkins,
1861  ):
1862  sent_destination = False
1863  if "output" in destinations:
1864  if not send_results_output(results_dataset_json=results_dataset_json):
1865  sent_destination = False
1866  if not sent_destination:
1867  logging.error("Sending results to one or more destinations failed")
1868  exit(1)
1869  else:
1870  logging.info(
1871  "Succesfully loaded query results info into destination(s)"
1872  )
1873  if show_simplified_result:
1874  res_header = ['Query', 'Block', 'Grid', 'First', 'Min', 'Max', 'Avg']
1875  for i in range(1, iterations):
1876  res_header.append("Run-" + str(i))
1877  res_data = []
1878  for perf_info in results_dataset:
1879  tok = perf_info["name"].split("_")
1880  cur_query_perf = [tok[0], tok[3], tok[6], str(perf_info["results"]["query_exec_first"]),
1881  str(perf_info["results"]["query_exec_min"]), str(perf_info["results"]["query_exec_max"]),
1882  str(perf_info["results"]["query_exec_avg"])]
1883  for query_time in perf_info["debug"]["query_exec_times"]:
1884  cur_query_perf.append(str(query_time))
1885  res_data.append(cur_query_perf)
1886  res_df = pd.DataFrame(res_data, columns=res_header)
1887  res_df['Query'] = res_df['Query'].astype(str)
1888  res_df['Block'] = res_df['Block'].astype(int)
1889  res_df['Grid'] = res_df['Grid'].astype(float)
1890  res_df['Min'] = res_df['Min'].astype(int)
1891  res_str = res_df.to_string(header=True, index=False)
1892  col_desc = "(Block: cuda block size, Grid: cuda grid size multiplier (cuda grid size = # SMs * multiplier))"
1893  print("[Benchmark result in ms.]\n" + col_desc + "\n" + res_str)
1894 
1895  df1 = res_df.groupby(["Query", "Block"]).apply(
1896  lambda v: v.sort_values(by=['Min'], ascending=[True])).reset_index(drop=True)
1897  df2 = df1.groupby(["Query", "Block"]).head(1)
1898  per_block_str = df2[['Query', 'Block', 'Grid', 'Min']].to_string(index=False)
1899  per_block_str = per_block_str.replace("-1.00", "opt")
1900  per_block_str = per_block_str.replace("-1", "opt")
1901  print("[Best performance per block size in ms.]\n" + per_block_str)
1902 
1903  df3 = res_df.groupby(["Query", "Grid"]).apply(
1904  lambda v: v.sort_values(by=['Min'], ascending=[True])).reset_index(drop=True)
1905  df4 = df3.groupby(["Query", "Grid"]).head(1)
1906  per_grid_str = df4[['Query', 'Grid', 'Block', 'Min']].to_string(index=False)
1907  per_grid_str = per_grid_str.replace("-1.00", "opt")
1908  per_grid_str = per_grid_str.replace("-1", "opt")
1909  print("[Best performance per grid size]\n" + per_grid_str)
1910 
1911  overall_best = res_df.sort_values('Min', ascending=True)[['Query', 'Block', 'Grid', 'Min']].head(1)
1912  print("[Best performance in all conditions in ms.]\n" + overall_best.to_string(index=False))
1913 
1914 
1915 if __name__ == "__main__":
1916  benchmark(sys.argv[1:])
size_t append(FILE *f, const size_t size, const int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:158
std::string join(T const &container, std::string const &delim)
std::string to_string(char const *&&v)
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
def validate_setup_teardown_query_file
int open(const char *path, int flags, int mode)
Definition: heavyai_fs.cpp:66
def verify_destinations
def create_results_dataset
def read_setup_teardown_query_files
def send_results_file_json
def calculate_query_times
def run_setup_teardown_query
def send_results_jenkins_bench