7 from multiprocessing
import Pool, cpu_count
8 from argparse
import ArgumentParser
12 def __init__(self, column_name, sql_type, lower, upper, step=1):
14 assert sql_type.upper()
in [
"INT",
"BIGINT"]
24 ),
"Generated values are larger than 32-bit signed integer."
27 if self.
sql_type in [
"INT",
"BIGINT"]:
30 assert False,
"SQL type " + self.
sql_type +
" not supported yet"
37 Returns the ColumnDetails as expected by pymapd's API
39 result =
"ColumnDetails(name='"
42 result += self.sql_type.upper()
43 result +=
"', nullable=True, precision=0, scale=0, comp_param=0, encoding='NONE', is_array=False)"
51 table_name(str): synthetic table's name in the database
52 fragment_size(int): fragment size (number of entries per fragment)
53 num_fragment(int): total number of fragments for the synthetic table
54 db_user(str): database username
55 db_password(str): database password
56 db_port(int): database port
57 db_name(str): database name
58 db_server(str): database server name
59 data_dir_path(str): path to directory that will include the generated data
60 is_remote_server(Bool): if True, it indicates that this class is not created on the
61 same machine that is going to host the server.
88 "Data already exists in the database, proceeding to the queries:"
96 "Proper data does not exist in the remote server."
100 if not skip_data_generation:
102 current_time = str(datetime.datetime.now()).
split()
107 "Synthetic data created: "
115 print(
"Data imported into the database")
120 column_list.append(
Column(
"x10",
"INT", 1, 10))
121 column_list.append(
Column(
"y10",
"INT", 1, 10))
122 column_list.append(
Column(
"z10",
"INT", 1, 10))
123 column_list.append(
Column(
"x100",
"INT", 1, 100))
124 column_list.append(
Column(
"y100",
"INT", 1, 100))
125 column_list.append(
Column(
"z100",
"INT", 1, 100))
126 column_list.append(
Column(
"x1k",
"INT", 1, 1000))
127 column_list.append(
Column(
"x10k",
"INT", 1, 10000))
128 column_list.append(
Column(
"x100k",
"INT", 1, 100000))
129 column_list.append(
Column(
"x1m",
"INT", 1, 1000000))
130 column_list.append(
Column(
"x10m",
"INT", 1, 10000000))
134 column_list.append(
Column(
"x10k_s10k",
"BIGINT", 1, 10000, 10000))
135 column_list.append(
Column(
"x100k_s10k",
"BIGINT", 1, 100000, 10000))
136 column_list.append(
Column(
"x1m_s10k",
"BIGINT", 1, 1000000, 10000))
140 create_sql =
"CREATE TABLE " + self.
table_name +
" ( "
143 create_sql += column.column_name +
" " + column.sql_type
155 copy_sql =
"COPY " + self.
table_name +
" FROM '"
163 Single-thread random data generation based on the provided schema.
164 Data is stored in CSV format.
169 with
open(file_name,
"w")
as f:
170 for i
in range(size):
183 Uses all available CPU threads to generate random data based on the
184 provided schema. Data is stored in CSV format.
186 num_threads = cpu_count()
187 num_entries_per_thread = int(
190 thread_index = [i
for i
in range(0, num_threads)]
193 num_balanced_entries = [
194 num_entries_per_thread
for _
in range(num_threads)
196 if self.
num_entries != num_entries_per_thread * num_threads:
197 last_threads_portion = (
198 self.
num_entries - num_entries_per_thread * (num_threads - 1)
200 num_balanced_entries[-1] = last_threads_portion
202 arguments = zip(thread_index, num_balanced_entries)
204 with Pool(num_threads)
as pool:
209 Creates table details in the same format as expected
210 from pymapd's get_table_details
213 column.createColumnDetailsString()
for column
in self.
column_list
218 Verifies whether the existing table in the database has the expected
222 con = pymapd.connect(
230 raise Exception(
"Pymapd's connection to the server has failed.")
232 table_details = con.get_table_details(self.
table_name)
235 print(
"Table does not exist in the database")
239 str(table_detail)
for table_detail
in table_details
243 print(
"Schema does not match the expected one:")
245 "Observed table details: "
246 + str([str(table_detail)
for table_detail
in table_details])
249 "Expected table details: "
255 Verifies whether the existing table in the database has the expected
256 number of entries in it as in this class.
259 con = pymapd.connect(
266 result = con.execute(
267 "select count(*) from " + self.
table_name +
";"
272 print(
"Expected num rows did not match:")
275 raise Exception(
"Pymapd's connection to the server has failed.")
279 con = pymapd.connect(
287 con.execute(
"DROP TABLE IF EXISTS " + self.
table_name +
";")
291 raise Exception(
"Failure in creating a new table.")
295 con = pymapd.connect(
305 raise Exception(
"Failure in importing data into the table")
308 if __name__ ==
"__main__":
309 parser = ArgumentParser()
310 required = parser.add_argument_group(
"required arguments")
311 required.add_argument(
"--user", dest=
"user", default=
"admin")
312 required.add_argument(
313 "--password", dest=
"password", default=
"HyperInteractive"
315 required.add_argument(
316 "--table_name", dest=
"table_name", default=
"synthetic_test_table"
318 required.add_argument(
"--fragment_size", dest=
"fragment_size", default=
"1")
319 required.add_argument(
320 "--num_fragments", dest=
"num_fragments", default=
"128"
322 required.add_argument(
"--name", dest=
"name", default=
"heavyai")
323 required.add_argument(
"--server", dest=
"server", default=
"localhost")
324 required.add_argument(
"--port", dest=
"port", default=
"6274")
325 required.add_argument(
328 default=os.getcwd() +
"/../build/synthetic_data",
330 required.add_argument(
331 "--just_data_generation",
332 dest=
"just_data_generation",
334 help=
"Indicates that the code will only generates synthetic data, bypassing all "
335 +
"other capabilities. The generated data will be stored in DATA_DIR.",
337 required.add_argument(
338 "--just_data_import",
339 dest=
"just_data_import",
341 help=
"Indicates that the code assumes the data is generated and exists at data_dir/*.csv. "
342 +
"It then proceeds with table creation and data import. ",
344 args = parser.parse_args()
347 table_name=args.table_name,
348 fragment_size=int(args.fragment_size),
349 num_fragments=int(args.num_fragments),
352 db_password=args.password,
353 db_server=args.server,
354 db_port=int(args.port),
355 data_dir_path=args.data_dir,
356 is_remote_server=
False,
359 if (args.just_data_generation
is False)
and (
360 args.just_data_import
is False
362 synthetic_table.createDataAndImportTable()
363 elif args.just_data_generation
is True:
364 synthetic_table.generateDataParallel()
366 "Synthetic data created: "
367 + str(synthetic_table.num_entries)
371 synthetic_table.createDataAndImportTable(args.just_data_import)
def createExpectedTableDetails
def createColumnDetailsString
int open(const char *path, int flags, int mode)
def importDataIntoTableInDB
def createDataAndImportTable
def doesTableHasExpectedSchemaInDB
def doesTableHasExpectedNumEntriesInDB
def generateColumnsSchema
def getCreateTableCommand