17 package com.mapd.tests;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
22 import java.nio.file.Files;
23 import java.nio.file.Path;
24 import java.nio.file.Paths;
25 import java.util.ArrayList;
29 LoggerFactory.getLogger(SelectCopyFromDeleteConcurrencyTest.class);
37 public static void main(String[]
args)
throws Exception {
39 String path_str =
"java/utility/src/main/java/com/mapd/tests/data/simple_test.csv";
41 Path path = Paths.get(path_str).toAbsolutePath();
42 assert Files.exists(path);
46 test.testSelecyCopyFromConcurrency(0);
49 test.testSelecyCopyFromConcurrency(shard_count);
57 int runs)
throws Exception {
58 String table_name =
"table_" + prefix +
"_";
59 long tid = Thread.currentThread().getId();
60 if (shard_count > 0) {
61 logger.info(
"[" + tid +
"] "
62 +
"CREATE " + table_name +
" WITH " + shard_count +
" SHARDS");
63 user.runSql(
"CREATE TABLE " + table_name
64 +
" (id INTEGER, str TEXT ENCODING DICT(32), x DOUBLE, y BIGINT, SHARD KEY(id)) WITH (FRAGMENT_SIZE=1, SHARD_COUNT="
67 logger.info(
"[" + tid +
"] "
68 +
"CREATE " + table_name);
69 user.runSql(
"CREATE TABLE " + table_name
70 +
" (id INTEGER, str TEXT ENCODING DICT(32), x DOUBLE, y BIGINT) WITH (FRAGMENT_SIZE=1)");
73 for (
int i = 0; i < runs; i++) {
74 logger.info(
"[" + tid +
"] "
75 +
"SELECT 1 " + table_name);
76 user.runSql(
"SELECT id, str FROM " + table_name +
" WHERE x < 5.0 LIMIT 1;");
78 logger.info(
"[" + tid +
"] "
79 +
"COPY 1 " + table_name);
80 user.runSql(
"COPY " + table_name +
" FROM '" + filepath.toString()
81 +
"' WITH (header='false');");
83 logger.info(
"[" + tid +
"] "
84 +
"SELECT 2 " + table_name);
85 user.runSql(
"SELECT COUNT(*) FROM " + table_name +
" WHERE x = (SELECT MIN(x) FROM "
88 logger.info(
"[" + tid +
"] "
89 +
"DELETE 1 " + table_name);
90 user.runSql(
"DELETE FROM " + table_name +
" WHERE x = (SELECT MAX(x) FROM "
93 logger.info(
"[" + tid +
"] "
94 +
"SELECT 2 " + table_name);
95 user.runSql(
"COPY " + table_name +
" FROM '" + filepath.toString()
96 +
"' WITH (header='false');");
98 logger.info(
"[" + tid +
"] "
99 +
"TRUNCATE 1 " + table_name);
100 user.runSql(
"TRUNCATE TABLE " + table_name);
103 logger.info(
"[" + tid +
"] "
104 +
"COPY 3 " + table_name);
105 dba.runSql(
"COPY " + table_name +
" FROM '" + filepath.toString()
106 +
"' WITH (header='false');");
108 logger.info(
"[" + tid +
"] "
109 +
"SELECT 3 " + table_name);
110 user.runSql(
"SELECT COUNT(*) FROM " + table_name
111 +
" WHERE x = (SELECT MIN(x) FROM " + table_name +
");");
115 logger.info(
"[" + tid +
"] "
116 +
"DROP TABLE " + table_name);
117 dba.runSql(
"DROP TABLE " + table_name +
";");
120 private void runTest(
int num_threads,
int shard_count)
throws Exception {
122 Exception exceptions[] =
new Exception[num_threads];
124 ArrayList<Thread> threads =
new ArrayList<>();
125 for (
int i = 0; i < num_threads; i++) {
126 logger.info(
"Starting " + i);
127 final int threadId = i;
129 Thread t =
new Thread(
new Runnable() {
133 final String username = threadId % 2 == 0 ?
"alice" :
"bob";
135 "localhost", 6274,
"heavyai",
"admin",
"HyperInteractive");
137 "localhost", 6274,
"heavyai", username,
"password");
138 final String prefix =
"for_" + username +
"_" + threadId +
"_";
142 }
catch (Exception e) {
143 logger.error(
"[" + Thread.currentThread().getId() +
"] "
144 +
"Caught Exception: " + e.getMessage());
145 exceptions[threadId] = e;
153 for (Thread t : threads) {
157 for (Exception e : exceptions) {
159 logger.error(
"Exception: " + e.getMessage(), e);
160 throw new Exception(e.getMessage(), e);
166 logger.info(
"testSelectCopyFromConcurrency()");
168 logger.info(
"Using import file: " + input_file_path_.toString());
172 "localhost", 6274,
"heavyai",
"admin",
"HyperInteractive");
173 su.runSql(
"CREATE USER alice (password = 'password', is_super = 'false');");
174 su.runSql(
"CREATE USER bob (password = 'password', is_super = 'false');");
176 su.runSql(
"GRANT CREATE on DATABASE heavyai TO alice;");
177 su.runSql(
"GRANT CREATE on DATABASE heavyai TO bob;");
179 su.runSql(
"GRANT CREATE VIEW on DATABASE heavyai TO alice;");
180 su.runSql(
"GRANT CREATE VIEW on DATABASE heavyai TO bob;");
182 su.runSql(
"GRANT DROP VIEW on DATABASE heavyai TO alice;");
183 su.runSql(
"GRANT DROP VIEW on DATABASE heavyai TO bob;");
185 su.runSql(
"GRANT ACCESS on database heavyai TO alice;");
186 su.runSql(
"GRANT ACCESS on database heavyai TO bob;");
188 final int num_threads = 5;
189 runTest(num_threads, shard_count);
191 su.runSql(
"DROP USER alice;");
192 su.runSql(
"DROP USER bob;");
194 logger.info(
"Pass!");
SelectCopyFromDeleteConcurrencyTest(Path path)
static final Logger logger
void runTest(int num_threads, int shard_count)
void testSelecyCopyFromConcurrency(int shard_count)
static void main(String[] args)
void run_test(HeavyDBTestClient dba, HeavyDBTestClient user, String prefix, Path filepath, int shard_count, int runs)