82 int INTERRUPTER_TID = num_threads - 1;
84 final String large_table =
"test_large";
85 final String small_table =
"test_small";
86 final String geo_table =
"test_geo";
87 String loop_join_query =
88 "SELECT /*+ cpu_mode */ COUNT(1) FROM test_large T1, test_large T2;";
89 String hash_join_query =
90 "SELECT /*+ cpu_mode */ COUNT(1) FROM test_large T1, test_small T2 WHERE T1.x = T2.x;";
92 "SELECT /*+ cpu_mode */ x, count(1) FROM test_large T1 GROUP BY x;";
93 Path large_table_path =
94 getAbsolutePath(
"../java/utility/src/main/java/com/mapd/tests/data/1M.csv");
95 Path small_table_path =
96 getAbsolutePath(
"../java/utility/src/main/java/com/mapd/tests/data/1K.csv");
98 "../java/utility/src/main/java/com/mapd/tests/data/geogdal.geojson");
100 HeavyDBTestClient dba =
101 HeavyDBTestClient.getClient(
"localhost", 6274, db, dbaUser, dbaPassword);
102 dba.runSql(
"CREATE TABLE " + large_table +
"(x int not null);");
103 dba.runSql(
"CREATE TABLE " + small_table +
"(x int not null);");
104 dba.runSql(
"CREATE TABLE " + geo_table
105 +
"(trip DOUBLE, pt GEOMETRY(POINT, 4326) ENCODING NONE);");
107 File large_data =
new File(large_table_path.toString());
108 try (BufferedWriter writer =
new BufferedWriter(
new FileWriter(large_data))) {
109 for (
int i = 0; i < 1000000; i++) {
110 writer.write(i +
"\n");
112 }
catch (IOException e) {
116 File small_data =
new File(small_table_path.toString());
117 try (BufferedWriter writer =
new BufferedWriter(
new FileWriter(small_data))) {
118 for (
int i = 0; i < 1000; i++) {
119 writer.write(i +
"\n");
121 }
catch (IOException e) {
125 File geojson_data =
new File(geojson_table_path.toString());
126 ArrayList<String> geojson_header =
new ArrayList<>();
127 ArrayList<String> geojson_footer =
new ArrayList<>();
128 ArrayList<String> geojson_feature =
new ArrayList<>();
129 geojson_header.add(
"{");
130 geojson_header.add(
"\"type\": \"FeatureCollection\",");
131 geojson_header.add(
"\"name\": \"geospatial_point\",");
133 "\"crs\": { \"type\": \"name\", \"properties\": { \"name\": \"urn:ogc:def:crs:OGC:1.3:CRS84\" } },");
134 geojson_header.add(
"\"features\": [");
136 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 10.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 10.0, 9.0 ] } }");
137 geojson_footer.add(
"]");
138 geojson_footer.add(
"}");
140 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 0.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 0.0, 1.0 ] } },");
142 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 1.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 1.0, 2.0 ] } },");
144 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 2.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 2.0, 3.0 ] } },");
146 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 3.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 3.0, 4.0 ] } },");
148 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 4.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 4.0, 5.0 ] } },");
150 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 5.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 5.0, 6.0 ] } },");
152 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 6.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 6.0, 7.0 ] } },");
154 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 7.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 7.0, 8.0 ] } },");
156 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 8.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 8.0, 9.0 ] } },");
158 "{ \"type\": \"Feature\", \"properties\": { \"trip\": 9.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 9.0, 0.0 ] } },");
159 try (BufferedWriter writer =
new BufferedWriter(
new FileWriter(geojson_data))) {
160 for (String str : geojson_header) {
161 writer.write(str +
"\n");
163 for (
int i = 0; i < 1000; i++) {
164 for (String str : geojson_feature) {
165 writer.write(str +
"\n");
168 for (String str : geojson_footer) {
169 writer.write(str +
"\n");
171 }
catch (IOException e) {
175 dba.runSql(
"COPY " + large_table +
" FROM '" + large_table_path.toString()
176 +
"' WITH (header='false');");
177 dba.runSql(
"COPY " + small_table +
" FROM '" + small_table_path.toString()
178 +
"' WITH (header='false');");
179 dba.runSql(
"COPY " + geo_table +
" FROM '" + geojson_table_path.toString()
180 +
"' WITH (header='false', geo='true');");
181 }
catch (Exception e) {
182 logger.error(
"[" + Thread.currentThread().getId() +
"]"
183 +
" Caught Exception: " + e.getMessage(),
187 ArrayList<Thread> queryThreads =
new ArrayList<>();
188 ArrayList<Thread> interrupterThreads =
new ArrayList<>();
189 Thread query_interrupter =
new Thread(
new Runnable() {
193 int tid = INTERRUPTER_TID;
194 String logPrefix =
"[" + tid +
"]";
195 HeavyDBTestClient interrupter =
getClient(db,
"interrupter");
196 int check_empty_session_queue = 0;
199 List<TQueryInfo> queryInfos = interrupter.get_queries_info();
200 boolean found_target_query =
false;
201 for (TQueryInfo queryInfo : queryInfos) {
202 String session_id = queryInfo.query_public_session_id;
203 boolean select_query =
204 queryInfo.current_status.equals(
"RUNNING_QUERY_KERNEL");
205 boolean import_query = queryInfo.current_status.equals(
"RUNNING_IMPORTER");
206 boolean can_interrupt =
false;
209 && queryInfo.query_str.compareTo(loop_join_query) == 0)) {
210 can_interrupt =
true;
213 interrupter.runSql(
"KILL QUERY '" + session_id +
"';");
214 check_empty_session_queue = 0;
215 found_target_query =
true;
218 if (!found_target_query || queryInfos.isEmpty()) {
219 ++check_empty_session_queue;
221 if (check_empty_session_queue > 20) {
225 }
catch (Exception e) {
226 logger.error(logPrefix +
" Caught Exception: " + e.getMessage(), e);
231 query_interrupter.start();
232 interrupterThreads.add(query_interrupter);
234 for (
int i = 0; i < num_runs; i++) {
235 logger.info(
"Starting run-" + i);
236 for (
int r = 0; r < num_threads; r++) {
238 final String logPrefix =
"[" + tid +
"]";
239 final String user_name =
"u".concat(Integer.toString(tid));
240 if (r < num_threads - 2) {
241 String[] queries = {hash_join_query, gby_query, loop_join_query};
242 Thread select_query_runner =
new Thread(
new Runnable() {
245 logger.info(
"Starting thread-" + tid);
246 final HeavyDBTestClient user =
getClient(db, user_name);
247 for (
int k = 0; k < 5; k++) {
248 boolean interrupted =
false;
249 for (
int q = 0; q < 3; q++) {
251 logger.info(logPrefix +
" Run SELECT query: " + queries[q]);
252 user.runSql(queries[q]);
253 }
catch (Exception e2) {
254 if (e2 instanceof TDBException) {
255 TDBException ee = (TDBException) e2;
256 if (q == 2 && ee.error_msg.contains(
"ERR_INTERRUPTED")) {
259 logPrefix +
" Select query issued has been interrupted");
263 logPrefix +
" Caught Exception: " + e2.getMessage(), e2);
271 select_query_runner.start();
272 queryThreads.add(select_query_runner);
274 Thread import_query_runner =
new Thread(
new Runnable() {
277 logger.info(
"Starting thread-" + tid);
278 final HeavyDBTestClient user =
getClient(db, user_name);
279 for (
int k = 0; k < 2; k++) {
280 boolean interrupted =
false;
283 "../Tests/Import/datafiles/interrupt_table_gdal.geojson");
284 user.runSql(
"COPY " + geo_table +
" FROM '" + geo_table_path.toString()
285 +
"' WITH (geo='true');");
286 logger.info(logPrefix +
" Run Import query");
287 }
catch (Exception e2) {
288 if (e2 instanceof TDBException) {
289 TDBException ee = (TDBException) e2;
290 if (ee.error_msg.contains(
"error code 10")) {
292 logger.info(logPrefix +
" Import query has been interrupted");
295 logger.error(logPrefix +
" Caught Exception: " + e2.getMessage(), e2);
302 import_query_runner.start();
303 queryThreads.add(import_query_runner);
308 for (Thread t : queryThreads) {
311 for (Thread t : interrupterThreads) {
315 HeavyDBTestClient dba =
316 HeavyDBTestClient.getClient(
"localhost", 6274, db, dbaUser, dbaPassword);
317 dba.runSql(
"DROP TABLE " + large_table +
";");
318 dba.runSql(
"DROP TABLE " + small_table +
";");
319 dba.runSql(
"DROP TABLE " + geo_table +
";");
320 File large_data =
new File(large_table_path.toString());
321 File small_data =
new File(small_table_path.toString());
322 File geojson_data =
new File(geojson_table_path.toString());
323 if (large_data.exists()) {
326 if (small_data.exists()) {
329 if (geojson_data.exists()) {
330 geojson_data.delete();
HeavyDBTestClient getClient(String db, String username)
Path getAbsolutePath(String path)