OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
run_benchmark_arrow Namespace Reference

Functions

def execute_query
 
def calculate_query_times
 
def run_query
 
def create_results_dataset
 
def process_arguments
 
def benchmark
 

Function Documentation

def run_benchmark_arrow.benchmark (   input_arguments)

Definition at line 814 of file run_benchmark_arrow.py.

References create_results_dataset(), run_benchmark.get_connection(), run_benchmark.get_gpu_info(), run_benchmark.get_machine_info(), run_benchmark.get_run_vars(), process_arguments(), run_benchmark.read_query_files(), run_query(), run_benchmark.send_results_db(), run_benchmark.send_results_file_json(), run_benchmark.send_results_jenkins_bench(), run_benchmark.send_results_output(), and run_benchmark.verify_destinations().

815 def benchmark(input_arguments):
816  # Set input args to vars
817  args = process_arguments(input_arguments)
818  verbose = args.verbose
819  quiet = args.quiet
820  source_db_user = args.user
821  source_db_passwd = args.passwd
822  source_db_server = args.server
823  source_db_port = args.port
824  source_db_name = args.name
825  source_table = args.table
826  label = args.label
827  queries_dir = args.queries_dir
828  iterations = args.iterations
829  gpu_count = args.gpu_count
830  gpu_name = args.gpu_name
831  no_gather_conn_gpu_info = args.no_gather_conn_gpu_info
832  no_gather_nvml_gpu_info = args.no_gather_nvml_gpu_info
833  gather_nvml_gpu_info = args.gather_nvml_gpu_info
834  machine_name = args.machine_name
835  machine_uname = args.machine_uname
836  destinations = args.destination
837  dest_db_user = args.dest_user
838  dest_db_passwd = args.dest_passwd
839  dest_db_server = args.dest_server
840  dest_db_port = args.dest_port
841  dest_db_name = args.dest_name
842  dest_table = args.dest_table
843  dest_table_schema_file = args.dest_table_schema_file
844  output_file_json = args.output_file_json
845  output_file_jenkins = args.output_file_jenkins
846  output_tag_jenkins = args.output_tag_jenkins
847  arrow_cpu_output = args.arrow_cpu_output
848 
849  # Hard-coded vars
850  trim = 0.15
851  jenkins_thresholds_name = "average"
852  jenkins_thresholds_field = "query_exec_avg"
853 
854  # Set logging output level
855  if verbose:
856  logging.basicConfig(level=logging.DEBUG)
857  elif quiet:
858  logging.basicConfig(level=logging.WARNING)
859  else:
860  logging.basicConfig(level=logging.INFO)
861 
862  # Input validation
863  if (iterations > 1) is not True:
864  # Need > 1 iteration as first iteration is dropped from calculations
865  logging.error("Iterations must be greater than 1")
866  exit(1)
868  destinations=destinations,
869  dest_db_server=dest_db_server,
870  output_file_json=output_file_json,
871  output_file_jenkins=output_file_jenkins,
872  ):
873  logging.debug("Destination(s) have been verified.")
874  else:
875  logging.error("No valid destination(s) have been set. Exiting.")
876  exit(1)
877 
878  # Establish connection to mapd db
879  con = get_connection(
880  db_user=source_db_user,
881  db_passwd=source_db_passwd,
882  db_server=source_db_server,
883  db_port=source_db_port,
884  db_name=source_db_name,
885  )
886  if not con:
887  exit(1) # Exit if cannot connect to db
888  # Set run-specific variables (time, uid, etc.)
889  run_vars = get_run_vars(con=con)
890  # Set GPU info depending on availability
891  gpu_info = get_gpu_info(
892  gpu_name=gpu_name,
893  no_gather_conn_gpu_info=no_gather_conn_gpu_info,
894  con=con,
895  conn_machine_name=run_vars["conn_machine_name"],
896  no_gather_nvml_gpu_info=no_gather_nvml_gpu_info,
897  gather_nvml_gpu_info=gather_nvml_gpu_info,
898  gpu_count=gpu_count,
899  )
900  # Set run machine info
901  machine_info = get_machine_info(
902  conn_machine_name=run_vars["conn_machine_name"],
903  machine_name=machine_name,
904  machine_uname=machine_uname,
905  )
906  # Read queries from files, set to queries dir in PWD if not passed in
907  if not queries_dir:
908  queries_dir = os.path.join(os.path.dirname(__file__), "queries")
909  query_list = read_query_files(
910  queries_dir=queries_dir, source_table=source_table
911  )
912  if not query_list:
913  exit(1)
914  # Run queries
915  queries_results = []
916  for query in query_list["queries"]:
917  query_result = run_query(
918  query=query,
919  iterations=iterations,
920  trim=trim,
921  con=con,
922  arrow_cpu_output=arrow_cpu_output,
923  )
924  queries_results.append(query_result)
925  logging.info("Completed all queries.")
926  logging.debug("Closing source db connection.")
927  con.close()
928  # Generate results dataset
929  results_dataset = create_results_dataset(
930  run_guid=run_vars["run_guid"],
931  run_timestamp=run_vars["run_timestamp"],
932  run_connection=run_vars["run_connection"],
933  run_machine_name=machine_info["run_machine_name"],
934  run_machine_uname=machine_info["run_machine_uname"],
935  run_driver=run_vars["run_driver"],
936  run_version=run_vars["run_version"],
937  run_version_short=run_vars["run_version_short"],
938  label=label,
939  source_db_gpu_count=gpu_info["source_db_gpu_count"],
940  source_db_gpu_driver_ver=gpu_info["source_db_gpu_driver_ver"],
941  source_db_gpu_name=gpu_info["source_db_gpu_name"],
942  source_db_gpu_mem=gpu_info["source_db_gpu_mem"],
943  source_table=source_table,
944  trim=trim,
945  iterations=iterations,
946  query_group=query_list["query_group"],
947  queries_results=queries_results,
948  )
949  results_dataset_json = json.dumps(
950  results_dataset, default=json_format_handler, indent=2
951  )
952  successful_results_dataset = [
953  x for x in results_dataset if x["succeeded"] is not False
954  ]
955  successful_results_dataset_results = []
956  for results_dataset_entry in successful_results_dataset:
957  successful_results_dataset_results.append(
958  results_dataset_entry["results"]
959  )
960  # Send results to destination(s)
961  sent_destination = True
962  if "mapd_db" in destinations:
963  if not send_results_db(
964  results_dataset=successful_results_dataset_results,
965  table=dest_table,
966  db_user=dest_db_user,
967  db_passwd=dest_db_passwd,
968  db_server=dest_db_server,
969  db_port=dest_db_port,
970  db_name=dest_db_name,
971  table_schema_file=dest_table_schema_file,
972  ):
973  sent_destination = False
974  if "file_json" in destinations:
975  if not send_results_file_json(
976  results_dataset_json=results_dataset_json,
977  output_file_json=output_file_json,
978  ):
979  sent_destination = False
980  if "jenkins_bench" in destinations:
982  results_dataset=successful_results_dataset_results,
983  thresholds_name=jenkins_thresholds_name,
984  thresholds_field=jenkins_thresholds_field,
985  output_tag_jenkins=output_tag_jenkins,
986  output_file_jenkins=output_file_jenkins,
987  ):
988  sent_destination = False
989  if "output" in destinations:
990  if not send_results_output(results_dataset_json=results_dataset_json):
991  sent_destination = False
992  if not sent_destination:
993  logging.error("Sending results to one or more destinations failed")
994  exit(1)
995  else:
996  logging.info(
997  "Succesfully loaded query results info into destination(s)"
998  )
999 
def verify_destinations
def send_results_file_json
def send_results_jenkins_bench

+ Here is the call graph for this function:

def run_benchmark_arrow.calculate_query_times (   kwargs)
  Calculates aggregate query times from all iteration times

  Kwargs:
    total_times(list): List of total time calculations
    execution_times(list): List of execution_time calculations
    connect_times(list): List of connect_time calculations
    arrow_conversion_times(list): List of arrow_conversion_time calculations
    trim(float): Amount to trim from iterations set to gather trimmed
                 values. Enter as deciman corresponding to percent to
                 trim - ex: 0.15 to trim 15%.

  Returns:
    query_execution(dict): Query times
    False(bool): The query failed. Exception should be logged.

Definition at line 128 of file run_benchmark_arrow.py.

Referenced by create_results_dataset().

129 def calculate_query_times(**kwargs):
130  """
131  Calculates aggregate query times from all iteration times
132 
133  Kwargs:
134  total_times(list): List of total time calculations
135  execution_times(list): List of execution_time calculations
136  connect_times(list): List of connect_time calculations
137  arrow_conversion_times(list): List of arrow_conversion_time calculations
138  trim(float): Amount to trim from iterations set to gather trimmed
139  values. Enter as deciman corresponding to percent to
140  trim - ex: 0.15 to trim 15%.
141 
142  Returns:
143  query_execution(dict): Query times
144  False(bool): The query failed. Exception should be logged.
145  """
146  trim_size = int(kwargs["trim"] * len(kwargs["total_times"]))
147  return {
148  "total_time_avg": round(numpy.mean(kwargs["total_times"]), 1),
149  "total_time_min": round(numpy.min(kwargs["total_times"]), 1),
150  "total_time_max": round(numpy.max(kwargs["total_times"]), 1),
151  "total_time_85": round(numpy.percentile(kwargs["total_times"], 85), 1),
152  "total_time_trimmed_avg": round(
153  numpy.mean(
154  numpy.sort(kwargs["total_times"])[trim_size:-trim_size]
155  ),
156  1,
157  )
158  if trim_size
159  else round(numpy.mean(kwargs["total_times"]), 1),
160  "total_times": kwargs["total_times"],
161  "execution_time_avg": round(numpy.mean(kwargs["execution_times"]), 1),
162  "execution_time_min": round(numpy.min(kwargs["execution_times"]), 1),
163  "execution_time_max": round(numpy.max(kwargs["execution_times"]), 1),
164  "execution_time_85": round(
165  numpy.percentile(kwargs["execution_times"], 85), 1
166  ),
167  "execution_time_25": round(
168  numpy.percentile(kwargs["execution_times"], 25), 1
169  ),
170  "execution_time_std": round(numpy.std(kwargs["execution_times"]), 1),
171  "execution_time_trimmed_avg": round(
172  numpy.mean(
173  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
174  )
175  )
176  if trim_size > 0
177  else round(numpy.mean(kwargs["execution_times"]), 1),
178  "execution_time_trimmed_max": round(
179  numpy.max(
180  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
181  )
182  )
183  if trim_size > 0
184  else round(numpy.max(kwargs["execution_times"]), 1),
185  "execution_times": kwargs["execution_times"],
186  "connect_time_avg": round(numpy.mean(kwargs["connect_times"]), 1),
187  "connect_time_min": round(numpy.min(kwargs["connect_times"]), 1),
188  "connect_time_max": round(numpy.max(kwargs["connect_times"]), 1),
189  "connect_time_85": round(
190  numpy.percentile(kwargs["connect_times"], 85), 1
191  ),
192  "arrow_conversion_time_avg": round(
193  numpy.mean(kwargs["arrow_conversion_times"]), 1
194  ),
195  "arrow_conversion_time_min": round(
196  numpy.min(kwargs["arrow_conversion_times"]), 1
197  ),
198  "arrow_conversion_time_max": round(
199  numpy.max(kwargs["arrow_conversion_times"]), 1
200  ),
201  "arrow_conversion_time_85": round(
202  numpy.percentile(kwargs["arrow_conversion_times"], 85), 1
203  ),
204  "arrow_conversion_time_25": round(
205  numpy.percentile(kwargs["arrow_conversion_times"], 25), 1
206  ),
207  "arrow_conversion_time_std": round(
208  numpy.std(kwargs["arrow_conversion_times"]), 1
209  ),
210  }

+ Here is the caller graph for this function:

def run_benchmark_arrow.create_results_dataset (   kwargs)
  Create results dataset

  Kwargs:
    run_guid(str): Run GUID
    run_timestamp(datetime): Run timestamp
    run_connection(str): Connection string
    run_machine_name(str): Run machine name
    run_machine_uname(str): Run machine uname
    run_driver(str): Run driver
    run_version(str): Version of DB
    run_version_short(str): Shortened version of DB
    label(str): Run label
    source_db_gpu_count(int): Number of GPUs on run machine
    source_db_gpu_driver_ver(str): GPU driver version
    source_db_gpu_name(str): GPU name
    source_db_gpu_mem(str): Amount of GPU mem on run machine
    source_table(str): Table to run query against
    trim(float): Trim decimal to remove from top and bottom of results
    iterations(int): Number of iterations of each query to run
    query_group(str): Query group, usually matches table name
    query_results(dict):::
        query_name(str): Name of query
        query_mapdql(str): Query to run
        query_id(str): Query ID
        query_succeeded(bool): Query succeeded
        query_error_info(str): Query error info
        result_count(int): Number of results returned
        initial_iteration_results(dict):::
            first_execution_time(float): Execution time for first query
                iteration
            first_connect_time(float):  Connect time for first query
                iteration
            first_total_time(float): Total time for first iteration
            first_cpu_mem_usage(float): CPU memory usage for first query
                iteration
            first_gpu_mem_usage(float): GPU memory usage for first query
                iteration
        noninitial_iteration_results(list):::
            execution_time(float): Time (in ms) that pymapd reports
                backend spent on query.
            connect_time(float): Time (in ms) for overhead of query,
                calculated by subtracting backend execution time from
                time spent on the execution function.
            arrow_conversion_time(float): Time (in ms) it took for
                arrow conversion and serialization fo results.
            total_time(float): Time (in ms) from adding all above times.
        query_total_elapsed_time(int): Total elapsed time for query

  Returns:
    results_dataset(list):::
        result_dataset(dict): Query results dataset

Definition at line 391 of file run_benchmark_arrow.py.

References calculate_query_times().

Referenced by benchmark().

392 def create_results_dataset(**kwargs):
393  """
394  Create results dataset
395 
396  Kwargs:
397  run_guid(str): Run GUID
398  run_timestamp(datetime): Run timestamp
399  run_connection(str): Connection string
400  run_machine_name(str): Run machine name
401  run_machine_uname(str): Run machine uname
402  run_driver(str): Run driver
403  run_version(str): Version of DB
404  run_version_short(str): Shortened version of DB
405  label(str): Run label
406  source_db_gpu_count(int): Number of GPUs on run machine
407  source_db_gpu_driver_ver(str): GPU driver version
408  source_db_gpu_name(str): GPU name
409  source_db_gpu_mem(str): Amount of GPU mem on run machine
410  source_table(str): Table to run query against
411  trim(float): Trim decimal to remove from top and bottom of results
412  iterations(int): Number of iterations of each query to run
413  query_group(str): Query group, usually matches table name
414  query_results(dict):::
415  query_name(str): Name of query
416  query_mapdql(str): Query to run
417  query_id(str): Query ID
418  query_succeeded(bool): Query succeeded
419  query_error_info(str): Query error info
420  result_count(int): Number of results returned
421  initial_iteration_results(dict):::
422  first_execution_time(float): Execution time for first query
423  iteration
424  first_connect_time(float): Connect time for first query
425  iteration
426  first_total_time(float): Total time for first iteration
427  first_cpu_mem_usage(float): CPU memory usage for first query
428  iteration
429  first_gpu_mem_usage(float): GPU memory usage for first query
430  iteration
431  noninitial_iteration_results(list):::
432  execution_time(float): Time (in ms) that pymapd reports
433  backend spent on query.
434  connect_time(float): Time (in ms) for overhead of query,
435  calculated by subtracting backend execution time from
436  time spent on the execution function.
437  arrow_conversion_time(float): Time (in ms) it took for
438  arrow conversion and serialization fo results.
439  total_time(float): Time (in ms) from adding all above times.
440  query_total_elapsed_time(int): Total elapsed time for query
441 
442  Returns:
443  results_dataset(list):::
444  result_dataset(dict): Query results dataset
445  """
446  results_dataset = []
447  for query_results in kwargs["queries_results"]:
448  if query_results["query_succeeded"]:
449  # Aggregate iteration values
450  (
451  execution_times,
452  connect_times,
453  arrow_conversion_times,
454  total_times,
455  ) = (
456  [],
457  [],
458  [],
459  [],
460  )
461  for noninitial_result in query_results[
462  "noninitial_iteration_results"
463  ]:
464  execution_times.append(noninitial_result["execution_time"])
465  connect_times.append(noninitial_result["connect_time"])
466  arrow_conversion_times.append(noninitial_result["arrow_conversion_time"]
467  )
468  total_times.append(noninitial_result["total_time"])
469  # Overwrite result count, same for each iteration
470  result_count = noninitial_result["result_count"]
471  # Calculate query times
472  logging.debug(
473  "Calculating times from query " + query_results["query_id"]
474  )
475  query_times = calculate_query_times(
476  total_times=total_times,
477  execution_times=execution_times,
478  connect_times=connect_times,
479  arrow_conversion_times=arrow_conversion_times,
480  trim=kwargs[
481  "trim"
482  ], # Trim top and bottom n% for trimmed calculations
483  )
484  result_dataset = {
485  "name": query_results["query_name"],
486  "mapdql": query_results["query_mapdql"],
487  "succeeded": True,
488  "results": {
489  "run_guid": kwargs["run_guid"],
490  "run_timestamp": kwargs["run_timestamp"],
491  "run_connection": kwargs["run_connection"],
492  "run_machine_name": kwargs["run_machine_name"],
493  "run_machine_uname": kwargs["run_machine_uname"],
494  "run_driver": kwargs["run_driver"],
495  "run_version": kwargs["run_version"],
496  "run_version_short": kwargs["run_version_short"],
497  "run_label": kwargs["label"],
498  "run_gpu_count": kwargs["source_db_gpu_count"],
499  "run_gpu_driver_ver": kwargs["source_db_gpu_driver_ver"],
500  "run_gpu_name": kwargs["source_db_gpu_name"],
501  "run_gpu_mem_mb": kwargs["source_db_gpu_mem"],
502  "run_table": kwargs["source_table"],
503  "query_group": kwargs["query_group"],
504  "query_id": query_results["query_id"],
505  "query_result_set_count": result_count,
506  "query_error_info": query_results["query_error_info"],
507  "query_conn_first": query_results[
508  "initial_iteration_results"
509  ]["first_connect_time"],
510  "query_conn_avg": query_times["connect_time_avg"],
511  "query_conn_min": query_times["connect_time_min"],
512  "query_conn_max": query_times["connect_time_max"],
513  "query_conn_85": query_times["connect_time_85"],
514  "query_exec_first": query_results[
515  "initial_iteration_results"
516  ]["first_execution_time"],
517  "query_exec_avg": query_times["execution_time_avg"],
518  "query_exec_min": query_times["execution_time_min"],
519  "query_exec_max": query_times["execution_time_max"],
520  "query_exec_85": query_times["execution_time_85"],
521  "query_exec_25": query_times["execution_time_25"],
522  "query_exec_stdd": query_times["execution_time_std"],
523  "query_exec_trimmed_avg": query_times[
524  "execution_time_trimmed_avg"
525  ],
526  "query_exec_trimmed_max": query_times[
527  "execution_time_trimmed_max"
528  ],
529  "query_arrow_conversion_avg": query_times[
530  "arrow_conversion_time_avg"
531  ],
532  "query_arrow_conversion_min": query_times[
533  "arrow_conversion_time_min"
534  ],
535  "query_arrow_conversion_max": query_times[
536  "arrow_conversion_time_max"
537  ],
538  "query_arrow_conversion_85": query_times[
539  "arrow_conversion_time_85"
540  ],
541  "query_arrow_conversion_25": query_times[
542  "arrow_conversion_time_25"
543  ],
544  "query_arrow_conversion_stdd": query_times[
545  "arrow_conversion_time_std"
546  ],
547  "query_total_first": query_results[
548  "initial_iteration_results"
549  ]["first_total_time"],
550  "query_total_avg": query_times["total_time_avg"],
551  "query_total_min": query_times["total_time_min"],
552  "query_total_max": query_times["total_time_max"],
553  "query_total_85": query_times["total_time_85"],
554  "query_total_all": query_results[
555  "query_total_elapsed_time"
556  ],
557  "query_total_trimmed_avg": query_times[
558  "total_time_trimmed_avg"
559  ],
560  "cpu_mem_usage_mb": query_results[
561  "initial_iteration_results"
562  ]["first_cpu_mem_usage"],
563  "gpu_mem_usage_mb": query_results[
564  "initial_iteration_results"
565  ]["first_gpu_mem_usage"],
566  },
567  "debug": {
568  "query_exec_times": query_times["execution_times"],
569  "query_total_times": query_times["total_times"],
570  },
571  }
572  elif not query_results["query_succeeded"]:
573  result_dataset = {
574  "name": query_results["query_name"],
575  "mapdql": query_results["query_mapdql"],
576  "succeeded": False,
577  }
578  results_dataset.append(result_dataset)
579  logging.debug("All values set for query " + query_results["query_id"])
580  return results_dataset
581 

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

def run_benchmark_arrow.execute_query (   kwargs)
  Executes a query against the connected db using pymapd
  https://pymapd.readthedocs.io/en/latest/usage.html#querying

  Kwargs:
    query_name(str): Name of query
    query_mapdql(str): Query to run
    iteration(int): Iteration number
    con(class): Connection class

  Returns:
    query_execution(dict):::
      result_count(int): Number of results returned
      execution_time(float): Time (in ms) that pymapd reports
                             backend spent on query.
      connect_time(float): Time (in ms) for overhead of query, calculated
                           by subtracting backend execution time
                           from time spent on the execution function.
      arrow_conversion_time(float): Time (in ms) for converting and
                                serializing results in arrow format
      total_time(float): Time (in ms) from adding all above times.
    False(bool): The query failed. Exception should be logged.

Definition at line 30 of file run_benchmark_arrow.py.

Referenced by run_query().

30 
31 def execute_query(**kwargs):
32  """
33  Executes a query against the connected db using pymapd
34  https://pymapd.readthedocs.io/en/latest/usage.html#querying
35 
36  Kwargs:
37  query_name(str): Name of query
38  query_mapdql(str): Query to run
39  iteration(int): Iteration number
40  con(class): Connection class
41 
42  Returns:
43  query_execution(dict):::
44  result_count(int): Number of results returned
45  execution_time(float): Time (in ms) that pymapd reports
46  backend spent on query.
47  connect_time(float): Time (in ms) for overhead of query, calculated
48  by subtracting backend execution time
49  from time spent on the execution function.
50  arrow_conversion_time(float): Time (in ms) for converting and
51  serializing results in arrow format
52  total_time(float): Time (in ms) from adding all above times.
53  False(bool): The query failed. Exception should be logged.
54  """
55  start_time = timeit.default_timer()
56  query_result = {}
57  arrow_cpu_output = kwargs["arrow_cpu_output"]
58  try:
59  # Run the query
60  if arrow_cpu_output:
61  query_result = kwargs["con"].select_ipc(kwargs["query_mapdql"])
62  else:
63  query_result = kwargs["con"]._client.sql_execute_gdf(
64  kwargs["con"]._session,
65  kwargs["query_mapdql"],
66  device_id=0,
67  first_n=-1,
68  )
69  logging.debug(
70  "Completed iteration "
71  + str(kwargs["iteration"])
72  + " of query "
73  + kwargs["query_name"]
74  )
75  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
76  logging.exception(
77  "Error running query "
78  + kwargs["query_name"]
79  + " during iteration "
80  + str(kwargs["iteration"])
81  )
82  return False
83 
84  # Calculate times
85  query_elapsed_time = (timeit.default_timer() - start_time) * 1000
86  execution_time = 0
87  if arrow_cpu_output:
88  execution_time = query_result._tdf.execution_time_ms
89  else:
90  execution_time = query_result.execution_time_ms
91  connect_time = round((query_elapsed_time - execution_time), 1)
92  arrow_conversion_time = 0
93  if arrow_cpu_output:
94  arrow_conversion_time = query_result._tdf.arrow_conversion_time_ms
95  else:
96  arrow_conversion_time = query_result.arrow_conversion_time_ms
97  # Iterate through each result from the query
98  logging.debug(
99  "Counting results from query"
100  + kwargs["query_name"]
101  + " iteration "
102  + str(kwargs["iteration"])
103  )
104  result_count = 0
105  start_time = timeit.default_timer()
106  if arrow_cpu_output:
107  result_count = len(query_result.index)
108  # TODO(Wamsi): Add support for computing cuDF size, once cuDF is fixed.
109  query_execution = {
110  "result_count": result_count,
111  "execution_time": execution_time,
112  "connect_time": connect_time,
113  "arrow_conversion_time": arrow_conversion_time,
114  "total_time": execution_time
115  + connect_time
116  + arrow_conversion_time,
117  }
118  logging.debug(
119  "Execution results for query"
120  + kwargs["query_name"]
121  + " iteration "
122  + str(kwargs["iteration"])
123  + ": "
124  + str(query_execution)
125  )
126  return query_execution
127 

+ Here is the caller graph for this function:

def run_benchmark_arrow.process_arguments (   input_arguments)

Definition at line 582 of file run_benchmark_arrow.py.

Referenced by benchmark().

583 def process_arguments(input_arguments):
584  # Parse input parameters
585  parser = ArgumentParser()
586  optional = parser._action_groups.pop()
587  required = parser.add_argument_group("required arguments")
588  parser._action_groups.append(optional)
589  optional.add_argument(
590  "-v", "--verbose", action="store_true", help="Turn on debug logging"
591  )
592  optional.add_argument(
593  "-q",
594  "--quiet",
595  action="store_true",
596  help="Suppress script outuput " + "(except warnings and errors)",
597  )
598  required.add_argument(
599  "-u",
600  "--user",
601  dest="user",
602  default="mapd",
603  help="Source database user",
604  )
605  required.add_argument(
606  "-p",
607  "--passwd",
608  dest="passwd",
609  default="HyperInteractive",
610  help="Source database password",
611  )
612  required.add_argument(
613  "-s",
614  "--server",
615  dest="server",
616  default="localhost",
617  help="Source database server hostname",
618  )
619  optional.add_argument(
620  "-o",
621  "--port",
622  dest="port",
623  type=int,
624  default=6274,
625  help="Source database server port",
626  )
627  required.add_argument(
628  "-n",
629  "--name",
630  dest="name",
631  default="mapd",
632  help="Source database name",
633  )
634  required.add_argument(
635  "-t",
636  "--table",
637  dest="table",
638  required=True,
639  help="Source db table name",
640  )
641  required.add_argument(
642  "-l",
643  "--label",
644  dest="label",
645  required=True,
646  help="Benchmark run label",
647  )
648  required.add_argument(
649  "-d",
650  "--queries-dir",
651  dest="queries_dir",
652  help='Absolute path to dir with query files. \
653  [Default: "queries" dir in same location as script]',
654  )
655  required.add_argument(
656  "-i",
657  "--iterations",
658  dest="iterations",
659  type=int,
660  required=True,
661  help="Number of iterations per query. Must be > 1",
662  )
663  optional.add_argument(
664  "-g",
665  "--gpu-count",
666  dest="gpu_count",
667  type=int,
668  default=None,
669  help="Number of GPUs. Not required when gathering local gpu info",
670  )
671  optional.add_argument(
672  "-G",
673  "--gpu-name",
674  dest="gpu_name",
675  type=str,
676  default="",
677  help="Name of GPU(s). Not required when gathering local gpu info",
678  )
679  optional.add_argument(
680  "--no-gather-conn-gpu-info",
681  dest="no_gather_conn_gpu_info",
682  action="store_true",
683  help="Do not gather source database GPU info fields "
684  + "[run_gpu_count, run_gpu_mem_mb] "
685  + "using pymapd connection info. "
686  + "Use when testing a CPU-only server.",
687  )
688  optional.add_argument(
689  "--no-gather-nvml-gpu-info",
690  dest="no_gather_nvml_gpu_info",
691  action="store_true",
692  help="Do not gather source database GPU info fields "
693  + "[gpu_driver_ver, run_gpu_name] "
694  + "from local GPU using pynvml. "
695  + 'Defaults to True when source server is not "localhost". '
696  + "Use when testing a CPU-only server.",
697  )
698  optional.add_argument(
699  "--gather-nvml-gpu-info",
700  dest="gather_nvml_gpu_info",
701  action="store_true",
702  help="Gather source database GPU info fields "
703  + "[gpu_driver_ver, run_gpu_name] "
704  + "from local GPU using pynvml. "
705  + 'Defaults to True when source server is "localhost". '
706  + "Only use when benchmarking against same machine that this script "
707  + "is run from.",
708  )
709  optional.add_argument(
710  "-m",
711  "--machine-name",
712  dest="machine_name",
713  help="Name of source machine",
714  )
715  optional.add_argument(
716  "-a",
717  "--machine-uname",
718  dest="machine_uname",
719  help="Uname info from " + "source machine",
720  )
721  optional.add_argument(
722  "-e",
723  "--destination",
724  dest="destination",
725  default="mapd_db",
726  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
727  + "Multiple values can be input seperated by commas, "
728  + 'ex: "mapd_db,file_json"',
729  )
730  optional.add_argument(
731  "-U",
732  "--dest-user",
733  dest="dest_user",
734  default="mapd",
735  help="Destination mapd_db database user",
736  )
737  optional.add_argument(
738  "-P",
739  "--dest-passwd",
740  dest="dest_passwd",
741  default="HyperInteractive",
742  help="Destination mapd_db database password",
743  )
744  optional.add_argument(
745  "-S",
746  "--dest-server",
747  dest="dest_server",
748  help="Destination mapd_db database server hostname"
749  + ' (required if destination = "mapd_db")',
750  )
751  optional.add_argument(
752  "-O",
753  "--dest-port",
754  dest="dest_port",
755  type=int,
756  default=6274,
757  help="Destination mapd_db database server port",
758  )
759  optional.add_argument(
760  "-N",
761  "--dest-name",
762  dest="dest_name",
763  default="mapd",
764  help="Destination mapd_db database name",
765  )
766  optional.add_argument(
767  "-T",
768  "--dest-table",
769  dest="dest_table",
770  default="results_arrow",
771  help="Destination mapd_db table name",
772  )
773  optional.add_argument(
774  "-C",
775  "--dest-table-schema-file",
776  dest="dest_table_schema_file",
777  default="results_table_schemas/arrow-results.sql",
778  help="Destination table schema file. This must be an executable "
779  + "CREATE TABLE statement that matches the output of this script. It "
780  + "is required when creating the results_arrow table. Default location is "
781  + 'in "./results_table_schemas/arrow-results.sql"',
782  )
783  optional.add_argument(
784  "-j",
785  "--output-file-json",
786  dest="output_file_json",
787  help="Absolute path of .json output file "
788  + '(required if destination = "file_json")',
789  )
790  optional.add_argument(
791  "-J",
792  "--output-file-jenkins",
793  dest="output_file_jenkins",
794  help="Absolute path of jenkins benchmark .json output file "
795  + '(required if destination = "jenkins_bench")',
796  )
797  optional.add_argument(
798  "-E",
799  "--output-tag-jenkins",
800  dest="output_tag_jenkins",
801  default="",
802  help="Jenkins benchmark result tag. "
803  + 'Optional, appended to table name in "group" field',
804  )
805  optional.add_argument(
806  "--enable-arrow-cpu-output",
807  dest="arrow_cpu_output",
808  action="store_true",
809  help="Output results in Apache Arrow Serialized format on CPU",
810  )
811  args = parser.parse_args(args=input_arguments)
812  return args
813 

+ Here is the caller graph for this function:

def run_benchmark_arrow.run_query (   kwargs)
  Takes query name, syntax, and iteration count and calls the
    execute_query function for each iteration. Reports total, iteration,
    and exec timings, memory usage, and failure status.

  Kwargs:
    query(dict):::
        name(str): Name of query
        mapdql(str): Query syntax to run
    iterations(int): Number of iterations of each query to run
    trim(float): Trim decimal to remove from top and bottom of results
    con(class 'pymapd.connection.Connection'): Mapd connection

  Returns:
    query_results(dict):::
        query_name(str): Name of query
        query_mapdql(str): Query to run
        query_id(str): Query ID
        query_succeeded(bool): Query succeeded
        query_error_info(str): Query error info
        result_count(int): Number of results returned
        initial_iteration_results(dict):::
            first_execution_time(float): Execution time for first query
                iteration
            first_connect_time(float):  Connect time for first query
                iteration
            first_results_iter_time(float): Results iteration time for
                first query iteration
            first_total_time(float): Total time for first iteration
            first_cpu_mem_usage(float): CPU memory usage for first query
                iteration
            first_gpu_mem_usage(float): GPU memory usage for first query
                iteration
        noninitial_iteration_results(list):::
            execution_time(float): Time (in ms) that pymapd reports
                backend spent on query.
            connect_time(float): Time (in ms) for overhead of query,
                calculated by subtracting backend execution time from
                time spent on the execution function.
            results_iter_time(float): Time (in ms) it took to for
                pymapd.fetchone() to iterate through all of the results.
            total_time(float): Time (in ms) from adding all above times.
        query_total_elapsed_time(int): Total elapsed time for query
    False(bool): The query failed. Exception should be logged.

Definition at line 211 of file run_benchmark_arrow.py.

References File_Namespace.append(), execute_query(), and run_benchmark.get_mem_usage().

Referenced by benchmark().

212 def run_query(**kwargs):
213  """
214  Takes query name, syntax, and iteration count and calls the
215  execute_query function for each iteration. Reports total, iteration,
216  and exec timings, memory usage, and failure status.
217 
218  Kwargs:
219  query(dict):::
220  name(str): Name of query
221  mapdql(str): Query syntax to run
222  iterations(int): Number of iterations of each query to run
223  trim(float): Trim decimal to remove from top and bottom of results
224  con(class 'pymapd.connection.Connection'): Mapd connection
225 
226  Returns:
227  query_results(dict):::
228  query_name(str): Name of query
229  query_mapdql(str): Query to run
230  query_id(str): Query ID
231  query_succeeded(bool): Query succeeded
232  query_error_info(str): Query error info
233  result_count(int): Number of results returned
234  initial_iteration_results(dict):::
235  first_execution_time(float): Execution time for first query
236  iteration
237  first_connect_time(float): Connect time for first query
238  iteration
239  first_results_iter_time(float): Results iteration time for
240  first query iteration
241  first_total_time(float): Total time for first iteration
242  first_cpu_mem_usage(float): CPU memory usage for first query
243  iteration
244  first_gpu_mem_usage(float): GPU memory usage for first query
245  iteration
246  noninitial_iteration_results(list):::
247  execution_time(float): Time (in ms) that pymapd reports
248  backend spent on query.
249  connect_time(float): Time (in ms) for overhead of query,
250  calculated by subtracting backend execution time from
251  time spent on the execution function.
252  results_iter_time(float): Time (in ms) it took to for
253  pymapd.fetchone() to iterate through all of the results.
254  total_time(float): Time (in ms) from adding all above times.
255  query_total_elapsed_time(int): Total elapsed time for query
256  False(bool): The query failed. Exception should be logged.
257  """
258  logging.info(
259  "Running query: "
260  + kwargs["query"]["name"]
261  + " iterations: "
262  + str(kwargs["iterations"])
263  )
264  query_id = kwargs["query"]["name"].rsplit(".")[
265  0
266  ] # Query ID = filename without extention
267  query_results = {
268  "query_name": kwargs["query"]["name"],
269  "query_mapdql": kwargs["query"]["mapdql"],
270  "query_id": query_id,
271  "query_succeeded": True,
272  "query_error_info": "",
273  "initial_iteration_results": {},
274  "noninitial_iteration_results": [],
275  "query_total_elapsed_time": 0,
276  }
277  query_total_start_time = timeit.default_timer()
278  # Run iterations of query
279  for iteration in range(kwargs["iterations"]):
280  # Gather memory before running query iteration
281  logging.debug("Getting pre-query memory usage on CPU")
282  pre_query_cpu_mem_usage = get_mem_usage(
283  con=kwargs["con"], mem_type="cpu"
284  )
285  logging.debug("Getting pre-query memory usage on GPU")
286  pre_query_gpu_mem_usage = get_mem_usage(
287  con=kwargs["con"], mem_type="gpu"
288  )
289  # Run query iteration
290  logging.debug(
291  "Running iteration "
292  + str(iteration)
293  + " of query "
294  + kwargs["query"]["name"]
295  )
296  query_result = execute_query(
297  query_name=kwargs["query"]["name"],
298  query_mapdql=kwargs["query"]["mapdql"],
299  iteration=iteration,
300  con=kwargs["con"],
301  arrow_cpu_output=kwargs["arrow_cpu_output"],
302 
303  )
304  # Gather memory after running query iteration
305  logging.debug("Getting post-query memory usage on CPU")
306  post_query_cpu_mem_usage = get_mem_usage(
307  con=kwargs["con"], mem_type="cpu"
308  )
309  logging.debug("Getting post-query memory usage on GPU")
310  post_query_gpu_mem_usage = get_mem_usage(
311  con=kwargs["con"], mem_type="gpu"
312  )
313  # Calculate total (post minus pre) memory usage after query iteration
314  query_cpu_mem_usage = round(
315  post_query_cpu_mem_usage["usedram"]
316  - pre_query_cpu_mem_usage["usedram"],
317  1,
318  )
319  query_gpu_mem_usage = round(
320  post_query_gpu_mem_usage["usedram"]
321  - pre_query_gpu_mem_usage["usedram"],
322  1,
323  )
324  if query_result:
325  query_results.update(
326  query_error_info="" # TODO - interpret query error info
327  )
328  # Assign first query iteration times
329  if iteration == 0:
330  first_execution_time = round(query_result["execution_time"], 1)
331  first_connect_time = round(query_result["connect_time"], 1)
332  first_total_time = (
333  first_execution_time
334  + first_connect_time
335  )
336  query_results.update(
337  initial_iteration_results={
338  "first_execution_time": first_execution_time,
339  "first_connect_time": first_connect_time,
340  "first_total_time": first_total_time,
341  "first_cpu_mem_usage": query_cpu_mem_usage,
342  "first_gpu_mem_usage": query_gpu_mem_usage,
343  }
344  )
345  else:
346  # Put noninitial iterations into query_result list
347  query_results["noninitial_iteration_results"].append(
348  query_result
349  )
350  # Verify no change in memory for noninitial iterations
351  if query_cpu_mem_usage != 0.0:
352  logging.error(
353  (
354  "Noninitial iteration ({0}) of query ({1}) "
355  + "shows non-zero CPU memory usage: {2}"
356  ).format(
357  iteration,
358  kwargs["query"]["name"],
359  query_cpu_mem_usage,
360  )
361  )
362  if query_gpu_mem_usage != 0.0:
363  logging.error(
364  (
365  "Noninitial iteration ({0}) of query ({1}) "
366  + "shows non-zero GPU memory usage: {2}"
367  ).format(
368  iteration,
369  kwargs["query"]["name"],
370  query_gpu_mem_usage,
371  )
372  )
373  else:
374  logging.warning(
375  "Error detected during execution of query: "
376  + kwargs["query"]["name"]
377  + ". This query will be skipped and "
378  + "times will not reported"
379  )
380  query_results.update(query_succeeded=False)
381  break
382  # Calculate time for all iterations to run
383  query_total_elapsed_time = round(
384  ((timeit.default_timer() - query_total_start_time) * 1000), 1
385  )
386  query_results.update(query_total_elapsed_time=query_total_elapsed_time)
387  logging.info(
388  "Completed all iterations of query " + kwargs["query"]["name"]
389  )
390  return query_results
size_t append(FILE *f, const size_t size, const int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:158

+ Here is the call graph for this function:

+ Here is the caller graph for this function: