11 parser = argparse.ArgumentParser(description=
'Benchmark HEAVY.AI batch and streaming table ingest')
12 parser.add_argument(
'-s',
'--host', help=
'HEAVY.AI server address', default=
'localhost')
13 parser.add_argument(
'-p',
'--port', help=
'HEAVY.AI server port', default=
'6273')
14 parser.add_argument(
'-d',
'--db', help=
'HEAVY.AI database name', default=
'heavyai')
15 parser.add_argument(
'-u',
'--user', help=
'HEAVY.AI user name', default=
'admin')
16 parser.add_argument(
'-w',
'--password', help=
'HEAVY.AI password', default=
'HyperInteractive')
17 parser.add_argument(
'-e',
'--max_rollback_epochs', help=
'Max Rollback Epochs', type=int, default=-1)
18 parser.add_argument(
'-t',
'--temp_table', help=
'Use temporary table', type=bool, default=
False)
19 parser.add_argument(
'-r',
'--num_rows', help=
'Number of rows to benchmark with', type=int, default=10000)
20 return parser.parse_args(args)
25 self.
con = pymapd.connect(user=user, password=pw, dbname=dbname, host=
"localhost")
29 return self.cursor.execute(sql)
31 def create_table(omni_con, table_name, is_temporary=False, max_rollback_epochs=-1):
32 drop_sql =
"DROP TABLE IF EXISTS " + table_name
33 optional_temp_stmt =
"TEMPORARY" if is_temporary
else ""
34 optional_max_rollback_stmt =
"WITH (max_rollback_epochs={max_rollback_epochs})".format(max_rollback_epochs=max_rollback_epochs)
if max_rollback_epochs >= 0
else ""
35 create_sql =
"CREATE {optional_temp_stmt} TABLE {table_name} (a INTEGER, b INTEGER, c INTEGER, d INTEGER) {optional_max_rollback_stmt}".format(optional_temp_stmt = optional_temp_stmt, table_name=table_name, optional_max_rollback_stmt=optional_max_rollback_stmt)
36 omni_con.query(drop_sql)
37 omni_con.query(create_sql)
40 df = pd.DataFrame(np.random.randint(0,100,size=(num_rows, 4)), columns=[
'a',
'b',
'c',
'd'])
41 df = df.astype(np.int32)
45 num_rows = len(data.index)
46 base_insert_sql =
"INSERT INTO " + table_name +
"(a, b, c, d) VALUES ({0}, {1}, {2}, {3})"
47 insert_statements = []
48 for r
in range(num_rows):
49 insert_statements.append(base_insert_sql.format(data.iat[r,0], data.iat[r,1], data.iat[r,2], data.iat[r,3]))
50 start_time = time.perf_counter()
51 for r
in range(num_rows):
52 omni_con.query(insert_statements[r])
53 end_time = time.perf_counter()
54 time_diff = end_time - start_time
55 rows_per_second = num_rows / time_diff
56 print(
"Streaming – SQL Inserts: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
59 num_rows = len(data.index)
60 start_time = time.perf_counter()
61 omni_con.con.load_table_columnar(table_name, data, preserve_index=
False)
62 end_time = time.perf_counter()
63 time_diff = end_time - start_time
64 rows_per_second = num_rows / time_diff
65 print(
"Bulk load – Columnar: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
68 num_rows = len(data.index)
69 arrow_data = pa.Table.from_pandas(data)
70 start_time = time.perf_counter()
71 omni_con.con.load_table_arrow(table_name, arrow_data, preserve_index=
False)
72 end_time = time.perf_counter()
73 time_diff = end_time - start_time
74 rows_per_second = num_rows / time_diff
75 print(
"Bulk load – Arrow: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
78 num_rows = len(data.index)
79 start_time = time.perf_counter()
80 for r
in range(num_rows):
81 row_df = data.iloc[r:r+1]
82 omni_con.con.load_table_columnar(table_name, row_df, preserve_index=
False)
83 end_time = time.perf_counter()
84 time_diff = end_time - start_time
85 rows_per_second = num_rows / time_diff
86 print(
"Streaming – Columnar: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
90 omni_con =
OmniCon(options.user, options.password, options.db)
94 table_name =
"stream_insert_sql"
95 create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
103 table_name =
"bulk_columnar"
104 create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
107 table_name =
"bulk_arrow"
108 create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
111 if __name__ ==
"__main__":
def bench_streaming_columnar
def bench_streaming_sql_inserts