10 from argparse
import ArgumentParser
17 Connects to the db using pymapd
18 https://pymapd.readthedocs.io/en/latest/usage.html#connecting
21 db_user(str): DB username
22 db_passwd(str): DB password
23 db_server(str): DB host
28 con(class): Connection class
29 False(bool): The connection failed. Exception should be logged.
32 logging.debug(
"Connecting to mapd db...")
34 user=kwargs[
"db_user"],
35 password=kwargs[
"db_passwd"],
36 host=kwargs[
"db_server"],
37 port=kwargs[
"db_port"],
38 dbname=kwargs[
"db_name"],
40 logging.info(
"Succesfully connected to mapd db")
42 except (pymapd.exceptions.OperationalError, pymapd.exceptions.Error):
43 logging.exception(
"Error connecting to database.")
49 if isinstance(x, datetime.datetime):
51 raise TypeError(
"Unknown type")
55 parser = ArgumentParser()
56 optional = parser._action_groups.pop()
57 required = parser.add_argument_group(
"required arguments")
58 parser._action_groups.append(optional)
59 optional.add_argument(
60 "-v",
"--verbose", action=
"store_true", help=
"Turn on debug logging"
62 optional.add_argument(
66 help=
"Suppress script outuput " +
"(except warnings and errors)",
68 required.add_argument(
69 "-u",
"--user", dest=
"user", default=
"mapd", help=
"Source database user"
71 required.add_argument(
75 default=
"HyperInteractive",
76 help=
"Source database password",
78 required.add_argument(
83 help=
"Source database server hostname",
85 optional.add_argument(
91 help=
"Source database server port",
93 required.add_argument(
94 "-n",
"--name", dest=
"name", default=
"mapd", help=
"Source database name"
96 required.add_argument(
97 "-l",
"--label", dest=
"label", required=
True, help=
"Benchmark run label"
99 required.add_argument(
104 help=
"Absolute path to file on heavydb machine with data for "
107 required.add_argument(
109 "--table-schema-file",
110 dest=
"table_schema_file",
112 help=
"Path to local file with CREATE TABLE sql statement for the "
115 optional.add_argument(
117 "--import-table-name",
118 dest=
"import_table_name",
119 default=
"import_benchmark_test",
120 help=
"Name of table to import data to. NOTE: This table will be dropped "
121 +
"before and after the import test, unless "
122 +
"--no-drop-table-[before/after] is specified.",
124 optional.add_argument(
126 "--import-query-template-file",
127 dest=
"import_query_template_file",
128 help=
"Path to file containing template for import query. "
129 +
'The script will replace "##TAB##" with the value of import_table_name '
130 +
'and "##FILE##" with the value of table_schema_file. By default, the '
131 +
"script will use the COPY FROM command with the default default "
134 optional.add_argument(
135 "--no-drop-table-before",
136 dest=
"no_drop_table_before",
138 help=
"Do not drop the import table and recreate it before import "
139 +
"NOTE: Make sure existing table schema matches import .csv file schema",
141 optional.add_argument(
142 "--no-drop-table-after",
143 dest=
"no_drop_table_after",
145 help=
"Do not drop the import table after import",
147 optional.add_argument(
149 "--import-test-name",
150 dest=
"import_test_name",
151 help=
'Name of import test (ex: "ips"). Required when using '
152 +
"jenkins_bench_json as output.",
154 optional.add_argument(
155 "-m",
"--machine-name", dest=
"machine_name", help=
"Name of source machine"
157 optional.add_argument(
160 dest=
"machine_uname",
161 help=
"Uname info from " +
"source machine",
163 optional.add_argument(
168 help=
"Destination type: [mapd_db, file_json, output, jenkins_bench] "
169 +
"Multiple values can be input seperated by commas, "
170 +
'ex: "mapd_db,file_json"',
172 optional.add_argument(
177 help=
"Destination mapd_db database user",
179 optional.add_argument(
183 default=
"HyperInteractive",
184 help=
"Destination mapd_db database password",
186 optional.add_argument(
190 help=
"Destination mapd_db database server hostname"
191 +
' (required if destination = "mapd_db")',
193 optional.add_argument(
199 help=
"Destination mapd_db database server port",
201 optional.add_argument(
206 help=
"Destination mapd_db database name",
208 optional.add_argument(
212 default=
"import_results",
213 help=
"Destination mapd_db table name",
215 optional.add_argument(
217 "--dest-table-schema-file",
218 dest=
"dest_table_schema_file",
219 default=
"results_table_schemas/import-results.sql",
220 help=
"Destination table schema file. This must be an executable CREATE "
221 +
"TABLE statement that matches the output of this script. It is "
222 +
"required when creating the results table. Default location is in "
223 +
'"./results_table_schemas/query-results.sql"',
225 optional.add_argument(
227 "--output-file-json",
228 dest=
"output_file_json",
229 help=
"Absolute path of .json output file "
230 +
'(required if destination = "file_json")',
232 optional.add_argument(
234 "--output-file-jenkins",
235 dest=
"output_file_jenkins",
236 help=
"Absolute path of jenkins benchmark .json output file "
237 +
'(required if destination = "jenkins_bench")',
239 optional.add_argument(
241 dest=
"fragment_size",
243 help=
"Fragment size to be used to create table. File specified in -c"
244 +
" is modified to replace ##FRAGMENT_SIZE## with value specified for"
247 args = parser.parse_args()
249 logging.basicConfig(level=logging.DEBUG)
251 logging.basicConfig(level=logging.WARNING)
253 logging.basicConfig(level=logging.INFO)
254 source_db_user = args.user
255 source_db_passwd = args.passwd
256 source_db_server = args.server
257 source_db_port = args.port
258 source_db_name = args.name
260 import_file = args.import_file
261 table_schema_file = args.table_schema_file
262 import_table_name = args.import_table_name
263 import_query_template_file = args.import_query_template_file
264 no_drop_table_before = args.no_drop_table_before
265 no_drop_table_after = args.no_drop_table_after
266 import_test_name = args.import_test_name
267 machine_name = args.machine_name
268 machine_uname = args.machine_uname
269 destinations = args.destination.split(
",")
270 if "mapd_db" in destinations:
271 valid_destination_set =
True
272 dest_db_user = args.dest_user
273 dest_db_passwd = args.dest_passwd
274 if args.dest_server
is None:
276 logging.error(
'"dest_server" is required when destination = "mapd_db"')
279 dest_db_server = args.dest_server
280 dest_db_port = args.dest_port
281 dest_db_name = args.dest_name
282 dest_table = args.dest_table
283 dest_table_schema_file = args.dest_table_schema_file
284 if "file_json" in destinations:
285 valid_destination_set =
True
286 if args.output_file_json
is None:
289 '"output_file_json" is required when destination = "file_json"'
293 output_file_json = args.output_file_json
294 if "output" in destinations:
295 valid_destination_set =
True
296 if "jenkins_bench" in destinations:
297 valid_destination_set =
True
298 if args.output_file_jenkins
is None:
301 '"output_file_jenkins" is required '
302 +
'when destination = "jenkins_bench"'
305 elif args.import_test_name
is None:
308 '"import_test_name" is required '
309 +
'when destination = "jenkins_bench"'
313 output_file_jenkins = args.output_file_jenkins
314 if not valid_destination_set:
315 logging.error(
"No valid destination(s) have been set. Exiting.")
321 db_user=source_db_user,
322 db_passwd=source_db_passwd,
323 db_server=source_db_server,
324 db_port=source_db_port,
325 db_name=source_db_name,
331 run_guid = str(uuid.uuid4())
332 logging.debug(
"Run guid: " + run_guid)
333 run_timestamp = datetime.datetime.now()
334 run_connection = str(con)
335 logging.debug(
"Connection string: " + run_connection)
337 run_version = con._client.get_version()
338 if "-" in run_version:
339 run_version_short = run_version.split(
"-")[0]
341 run_version_short = run_version
342 conn_machine_name = re.search(
r"@(.*?):", run_connection).group(1)
344 if conn_machine_name ==
"localhost":
345 local_uname = os.uname()
347 run_machine_name = machine_name
349 if conn_machine_name ==
"localhost":
350 run_machine_name = local_uname.nodename
352 run_machine_name = conn_machine_name
354 run_machine_uname = machine_uname
356 if conn_machine_name ==
"localhost":
357 run_machine_uname =
" ".
join(local_uname)
359 run_machine_uname =
""
362 if not no_drop_table_before:
363 logging.info(
"Dropping import table if exists")
364 con.execute(
"drop table if exists " + import_table_name)
365 logging.debug(
"Creating import table.")
367 with
open(table_schema_file,
"r") as table_schema:
368 logging.debug("Reading table_schema_file: " + table_schema_file)
369 create_table_sql = table_schema.read().replace(
"\n",
" ")
370 create_table_sql = create_table_sql.replace(
371 "##TAB##", import_table_name
373 create_table_sql = create_table_sql.replace(
374 "##FRAGMENT_SIZE##", args.fragment_size
376 except FileNotFoundError:
377 logging.exception(
"Could not find table_schema_file.")
380 logging.debug(
"Creating import table...")
381 res = con.execute(create_table_sql)
382 logging.debug(
"Import table created.")
383 except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
384 logging.exception(
"Error running table creation")
389 if import_query_template_file:
391 with
open(import_query_template_file,
"r") as import_query_template:
393 "Reading import_query_template_file: " + import_query_template_file
395 import_query = import_query_template.read().replace(
"\n",
" ")
396 import_query = import_query.replace(
397 "##TAB##", import_table_name
399 import_query = import_query.replace(
400 "##FILE##", import_file
402 except FileNotFoundError:
403 logging.exception(
"Could not find import_query_template_file.")
406 import_query =
"COPY %s FROM '%s';" % (import_table_name, import_file)
407 logging.debug(
"Import query: " + import_query)
408 logging.info(
"Starting import...")
409 start_time = timeit.default_timer()
411 res = con.execute(import_query)
412 end_time = timeit.default_timer()
413 logging.info(
"Completed import.")
414 except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
415 logging.exception(
"Error running import query")
416 if no_drop_table_before:
418 'Import failed and "--no-drop-table-before" was '
419 +
"passed in. Make sure existing table schema matches "
420 +
"import .csv file schema."
425 query_elapsed_time = round(((end_time - start_time) * 1000), 1)
426 execution_time = res._result.execution_time_ms
427 connect_time = round((query_elapsed_time - execution_time), 2)
428 res_output = str(res.fetchall()[0])
429 logging.debug(
"Query result output: " + res_output)
430 rows_loaded = re.search(
r"Loaded: (.*?) recs, R", res_output).group(1)
431 rows_rejected = re.search(
r"Rejected: (.*?) recs i", res_output).group(1)
434 if not no_drop_table_after:
435 logging.debug(
"Dropping import table")
436 con.execute(
"drop table " + import_table_name)
437 logging.debug(
"Closing source db connection.")
442 "run_guid": run_guid,
443 "run_timestamp": run_timestamp,
444 "run_connection": run_connection,
445 "run_machine_name": run_machine_name,
446 "run_machine_uname": run_machine_uname,
447 "run_driver": run_driver,
448 "run_version": run_version,
450 "import_test_name": import_test_name,
451 "import_elapsed_time_ms": query_elapsed_time,
452 "import_execute_time_ms": execution_time,
453 "import_conn_time_ms": connect_time,
454 "rows_loaded": rows_loaded,
455 "rows_rejected": rows_rejected,
460 result_json = json.dumps(result, default=json_format_handler, indent=2)
463 if "mapd_db" in destinations:
465 logging.debug(
"Converting results list to pandas dataframe")
466 results_df = pandas.DataFrame(result, index=[0])
468 logging.debug(
"Connecting to destination mapd db")
470 db_user=dest_db_user,
471 db_passwd=dest_db_passwd,
472 db_server=dest_db_server,
473 db_port=dest_db_port,
474 db_name=dest_db_name,
479 tables = dest_con.get_tables()
480 if dest_table
not in tables:
481 logging.info(
"Destination table does not exist. Creating.")
483 with
open(dest_table_schema_file,
"r") as table_schema:
485 "Reading table_schema_file: " + dest_table_schema_file
487 create_table_sql = table_schema.read().replace(
"\n",
" ")
488 create_table_sql = create_table_sql.replace(
489 "##TAB##", dest_table
491 except FileNotFoundError:
492 logging.exception(
"Could not find table_schema_file.")
495 logging.debug(
"Executing create destination table query")
496 res = dest_con.execute(create_table_sql)
497 logging.debug(
"Destination table created.")
498 except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
499 logging.exception(
"Error running table creation")
501 logging.info(
"Loading results into destination db")
503 dest_table, results_df, method=
"columnar", create=
False
506 if "file_json" in destinations:
508 logging.debug(
"Opening json output file for writing")
509 file_json_open =
open(output_file_json,
"w")
510 logging.info(
"Writing to output json file: " + output_file_json)
511 file_json_open.write(result_json)
512 if "jenkins_bench" in destinations:
515 logging.debug(
"Constructing output for jenkins benchmark plugin")
516 jenkins_bench_json = json.dumps(
520 "name": import_test_name,
521 "description":
"Import: " + import_test_name,
529 "name": import_test_name +
" average",
532 "dblValue": execution_time,
542 logging.debug(
"Opening jenkins_bench json output file for writing")
543 file_jenkins_open =
open(output_file_jenkins,
"w")
544 logging.info(
"Writing to jenkins_bench json file: " + output_file_jenkins)
545 file_jenkins_open.write(jenkins_bench_json)
546 if "output" in destinations:
547 logging.info(
"Printing query results to output")
int open(const char *path, int flags, int mode)