16 package com.mapd.tests;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.List;
24 import java.util.concurrent.CyclicBarrier;
25 import java.util.regex.Matcher;
26 import java.util.regex.Pattern;
28 import ai.heavy.thrift.server.TColumnType;
29 import ai.heavy.thrift.server.TCopyParams;
30 import ai.heavy.thrift.server.TCreateParams;
31 import ai.heavy.thrift.server.TDBException;
32 import ai.heavy.thrift.server.TImportHeaderRow;
33 import ai.heavy.thrift.server.TSourceType;
43 LoggerFactory.getLogger(ImportAlterValidateSelectConcurrencyTest.class);
51 public static void main(String[]
args)
throws Exception {
55 assert args.length == 2;
58 test.testConcurrency();
62 String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
66 final int fragment_size = 10;
67 Exception exceptions[] =
new Exception[num_threads];
69 final CyclicBarrier barrier =
new CyclicBarrier(num_threads,
new Runnable() {
73 "localhost", 6274, db, dbaUser, dbaPassword);
75 +
"(pt GEOMETRY(POINT), ls GEOMETRY(LINESTRING), faii INTEGER[2], fadc DECIMAL(5, 2)[2], fatx TEXT[] ENCODING DICT(32), fatx2 TEXT[2] ENCODING DICT(32)) WITH(FRAGMENT_SIZE = "
76 + fragment_size +
")");
79 +
"( trip INT, mpoly MULTIPOLYGON ) WITH(FRAGMENT_SIZE = "
80 + fragment_size +
")");
82 }
catch (Exception e) {
83 logger.error(
"[" + Thread.currentThread().getId() +
"]"
84 +
" Caught Exception: " + e.getMessage(),
91 ArrayList<Thread> threads =
new ArrayList<>();
92 for (
int i = 0; i < num_threads; i++) {
93 logger.info(
"Starting " + i);
94 final int threadId = i;
96 Thread t =
new Thread(
new Runnable() {
99 long tid = Thread.currentThread().getId();
100 String logPrefix =
"[" + tid +
"]";
103 TCopyParams copy_params =
new TCopyParams();
104 copy_params.has_header = TImportHeaderRow.NO_HEADER;
105 copy_params.delimiter =
",";
106 copy_params.null_str =
"\\N";
107 copy_params.quoted =
true;
108 copy_params.quote =
"\"";
109 copy_params.escape =
"\"";
110 copy_params.line_delim =
"\n";
111 copy_params.array_delim =
",";
112 copy_params.array_begin =
"{";
113 copy_params.array_end =
"}";
114 copy_params.threads = 0;
116 TCopyParams geo_copy_params =
new TCopyParams();
117 geo_copy_params.delimiter =
",";
118 geo_copy_params.null_str =
"\\N";
119 geo_copy_params.quoted =
true;
120 geo_copy_params.quote =
"\"";
121 geo_copy_params.escape =
"\"";
122 geo_copy_params.line_delim =
"\n";
123 geo_copy_params.array_delim =
",";
124 geo_copy_params.array_begin =
"{";
125 geo_copy_params.array_end =
"}";
126 geo_copy_params.threads = 0;
127 geo_copy_params.source_type = TSourceType.GEO_FILE;
133 "localhost", 6274, db, dbUser, dbPassword);
135 if (threadId % 2 == 0) {
136 logger.info(logPrefix +
" IMPORT TABLE");
142 sql =
"COPY " + csvTableName +
" FROM '" + csv_file_path
143 +
"' WITH (header = 'false');";
148 sql =
"DELETE FROM " +
csvTableName +
" WHERE fatx2 IS NULL;";
156 sql =
"ALTER TABLE " +
csvTableName +
" DROP COLUMN faii;";
160 if (threadId % 2 == 1) {
167 logger.info(logPrefix +
" VALIDATE " + sql);
168 final String validateSql = sql;
173 final String alterSql =
"ALTER TABLE " +
geoTableName +
" SET max_rows = 10;";
180 logger.info(logPrefix +
" IMPORT GEO TABLE");
187 new java.util.ArrayList<TColumnType>(),
188 new TCreateParams()),
193 final String selectSql =
"SELECT * FROM " +
geoTableName +
" LIMIT 2;";
200 logger.info(logPrefix +
" VALIDATE " + sql);
201 user.sqlValidate(sql);
203 sql =
"ALTER TABLE " +
csvTableName +
" SET max_rollback_epochs = 0;";
206 sql =
"COPY (SELECT * FROM " +
csvTableName +
") TO 'test_export.csv';";
209 for (
int i = 0; i < 5; i++) {
210 final String insertSql =
"INSERT INTO " +
geoTableName +
" VALUES (" + i
211 +
", 'MULTIPOLYGON(((0 0, 1 1, 2 2)))');";
218 sql =
"COPY (SELECT * FROM " +
csvTableName +
") TO 'test_export.csv';";
225 logger.info(logPrefix +
" VALIDATE " + sql);
226 user.sqlValidate(sql);
233 }
catch (Exception e) {
234 logger.error(logPrefix +
" Caught Exception: " + e.getMessage(), e);
235 exceptions[threadId] = e;
243 for (Thread t : threads) {
248 HeavyDBTestClient.getClient(
"localhost", 6274, db, dbaUser, dbaPassword);
251 for (Exception e : exceptions) {
253 logger.error(
"Exception: " + e.getMessage(), e);
260 logger.info(
"ImportAlterValidateSelectConcurrencyTest()");
262 "localhost", 6274,
"heavyai",
"admin",
"HyperInteractive");
264 su.runSql(
"CREATE USER dba (password = 'password', is_super = 'true');");
265 su.runSql(
"CREATE USER bob (password = 'password', is_super = 'false');");
267 su.runSql(
"GRANT CREATE on DATABASE heavyai TO bob;");
269 su.runSql(
"CREATE DATABASE db1;");
270 su.runSql(
"GRANT CREATE on DATABASE db1 TO bob;");
271 su.runSql(
"GRANT CREATE VIEW on DATABASE db1 TO bob;");
272 su.runSql(
"GRANT DROP on DATABASE db1 TO bob;");
273 su.runSql(
"GRANT DROP VIEW on DATABASE db1 TO bob;");
275 runTest(
"db1",
"admin",
"HyperInteractive",
"admin",
"HyperInteractive");
278 su.runSql(
"DROP DATABASE IF EXISTS db1;");
279 su.runSql(
"DROP USER IF EXISTS bob;");
280 su.runSql(
"DROP USER IF EXISTS dba;");
283 logger.info(
"ImportAlterValidateSelectConcurrencyTest() done");
288 void call()
throws Exception;
295 }
catch (TDBException e) {
296 Pattern pattern = Pattern.compile(
"(Table/View\\s+" + tableName
297 +
".+does not exist|.+Object\\s+'" + tableName +
"'\\s+not found)");
298 Matcher matcher = pattern.matcher(e.error_msg);
299 if (matcher.find()) {
300 logger.info(
"Ignoring missing table error: " + e.error_msg);
309 logger.info(logPrefix +
" " + sql);
314 logger.info(logPrefix +
" Calling load_table API");
315 List<List<String>>
rows =
new ArrayList<>();
316 for (
int i = 0; i < 5; i++) {
317 rows.add(Arrays.asList(
"point(0 0)",
318 "linestring(0 0,1 1)",
329 logger.info(logPrefix +
" Calling load_table_binary_columnar API");
330 List<List<Object>> columns =
new ArrayList<>();
331 for (
int i = 0; i < 3; i++) {
332 columns.add(
new ArrayList<>());
334 for (
int i = 0; i < 5; i++) {
335 columns.get(0).add(Arrays.asList(Long.valueOf(1), Long.valueOf(1)));
336 columns.get(1).add(Arrays.asList(
"1",
"1"));
337 columns.get(2).add(Arrays.asList(
"1",
"1"));
339 user.load_table_binary_columnar(
340 csvTableName, columns, Arrays.asList(
"faii",
"fatx",
"fatx2"));
345 logger.info(logPrefix +
" Calling get_table_details API");
347 logger.info(logPrefix +
" Calling get_table_details_for_database API");
352 -> user.get_table_details_for_database(
geoTableName,
"heavyai"),
358 logger.info(logPrefix +
" Calling get_tables_meta API");
359 user.get_tables_meta();
static void main(String[] args)
void getTableDetails(HeavyDBTestClient user, String logPrefix)
void getTablesMetadata(HeavyDBTestClient user, String logPrefix)
void logAndRunSql(String sql, HeavyDBTestClient user, String logPrefix)
ImportAlterValidateSelectConcurrencyTest(String csv_file_path, String geo_file_path)
static final String geoTableName
static final Logger logger
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
void loadTableBinaryColumnar(HeavyDBTestClient user, String logPrefix)
static final String csvTableName
void loadTable(HeavyDBTestClient user, String logPrefix)
void ignoreMissingTable(final VoidFunction function, final String tableName)