16 package com.mapd.tests;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 import java.nio.file.Files;
22 import java.nio.file.Path;
23 import java.nio.file.Paths;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Calendar;
28 import java.util.TimeZone;
32 LoggerFactory.getLogger(ForeignTableRefreshConcurrencyTest.class);
35 Calendar cal = Calendar.getInstance();
36 cal.setTime(
new Date());
37 cal.add(Calendar.SECOND, sec_from_now);
38 SimpleDateFormat date_format =
new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
39 date_format.setTimeZone(TimeZone.getTimeZone(
"GMT"));
40 return date_format.format(cal.getTime());
43 public static void main(String[]
args)
throws Exception {
45 test.testConcurrency();
49 Path path_obj = Paths.get(path).toAbsolutePath();
50 assert Files.exists(path_obj);
63 int num_foreign_manual_refresh_threads,
64 int num_foreign_scheduled_refresh_threads,
65 int num_table_join_threads,
66 int num_runs)
throws Exception {
67 ArrayList<Exception> exceptions =
new ArrayList<Exception>();
69 Thread[] foreign_table_manual_refresh_threads =
70 new Thread[num_foreign_manual_refresh_threads];
71 Thread[] foreign_table_scheduled_refresh_threads =
72 new Thread[num_foreign_scheduled_refresh_threads];
73 Thread[] table_join_threads =
new Thread[num_table_join_threads];
75 for (
int i = 0; i < num_foreign_manual_refresh_threads; ++i) {
77 foreign_table_manual_refresh_threads[tid] =
new Thread(
new Runnable() {
80 final String thread_name =
"F[" + tid +
"]";
82 logger.info(
"Starting manual foreign table refresh thread " + thread_name);
84 "localhost", 6274, db, userName, userPassword);
85 for (
int irun = 0; irun < num_runs; ++irun) {
86 runSqlAsUser(
"SELECT * FROM test_foreign_table_manual_refresh_" + tid +
";",
89 runSqlAsUser(
"REFRESH FOREIGN TABLES test_foreign_table_manual_refresh_"
94 logger.info(
"Finished foreign table manual refresh " + thread_name);
95 }
catch (Exception e) {
96 logger.error(
"Foreign table refresh " + thread_name
97 +
" Caught Exception: " + e.getMessage(),
103 foreign_table_manual_refresh_threads[tid].start();
106 for (
int i = 0; i < num_foreign_scheduled_refresh_threads; ++i) {
108 foreign_table_scheduled_refresh_threads[tid] =
new Thread(
new Runnable() {
111 final String thread_name =
"S[" + tid +
"]";
113 logger.info(
"Starting scheduled foreign table refresh thread " + thread_name);
115 "localhost", 6274, db, userName, userPassword);
116 for (
int irun = 0; irun < num_runs; ++irun) {
118 "SELECT * FROM test_foreign_table_scheduled_refresh_" + tid +
";",
124 logger.info(
"Finished foreign table scheduled refresh " + thread_name);
125 }
catch (Exception e) {
126 logger.error(
"Foreign table scheduled refresh " + thread_name
127 +
" Caught Exception: " + e.getMessage(),
133 foreign_table_scheduled_refresh_threads[tid].start();
136 for (
int i = 0; i < num_table_join_threads; ++i) {
138 table_join_threads[tid] =
new Thread(
new Runnable() {
141 final String thread_name =
"T[" + tid +
"]";
143 logger.info(
"Starting table join " + thread_name);
145 "localhost", 6274, db, userName, userPassword);
146 for (
int irun = 0; irun < num_runs; ++irun) {
148 +
"_left AS l JOIN test_table_" + tid
149 +
"_right AS r ON l.id = r.id;",
156 logger.info(
"Finished table join thread T[" + tid +
"]");
157 }
catch (Exception e) {
160 "Table join " + thread_name +
" Caught Exception: " + e.getMessage(),
166 table_join_threads[tid].start();
169 for (Thread t : foreign_table_manual_refresh_threads) {
172 for (Thread t : foreign_table_scheduled_refresh_threads) {
175 for (Thread t : table_join_threads) {
179 for (Exception e : exceptions) {
181 logger.error(
"Exception: " + e.getMessage(), e);
189 logger.info(logPrefix +
" " + sql);
195 dba.runSql(
"CREATE FOREIGN TABLE " + foreign_table_name +
" "
196 +
"(b BOOLEAN, t TINYINT, s SMALLINT, i INTEGER, bi BIGINT, f FLOAT, "
197 +
"dc DECIMAL(10, 5), tm TIME, tp TIMESTAMP, d DATE, txt TEXT, "
198 +
"txt_2 TEXT ENCODING NONE) "
199 +
"SERVER test_server WITH "
200 +
"(file_path = 'scalar_types.csv', "
201 +
"refresh_timing_type = 'scheduled', "
202 +
"refresh_start_date_time = '" +
getTimeStamp(1) +
"', "
203 +
"refresh_interval = '1S', "
204 +
"fragment_size = 2);");
209 dba.runSql(
"CREATE FOREIGN TABLE " + foreign_table_name +
" "
210 +
"(b BOOLEAN, t TINYINT, s SMALLINT, i INTEGER, bi BIGINT, f FLOAT, "
211 +
"dc DECIMAL(10, 5), tm TIME, tp TIMESTAMP, d DATE, txt TEXT, "
212 +
"txt_2 TEXT ENCODING NONE) "
213 +
"SERVER test_server WITH "
214 +
"(file_path = 'scalar_types.csv', "
215 +
"fragment_size = 2);");
221 dba.runSql(
"CREATE TABLE " + table_name
222 +
" (id INTEGER, str TEXT ENCODING DICT(32), x DOUBLE, y BIGINT) WITH (FRAGMENT_SIZE=1, partitions='replicated')");
223 dba.runSql(
"COPY " + table_name +
" FROM '" + copy_from_path.toString()
224 +
"' WITH (header='false');");
235 logger.info(
"ForeignTableRefreshConcurrencyTest()");
238 "localhost", 6274,
"heavyai",
"admin",
"HyperInteractive");
241 su.runSql(
"DROP DATABASE IF EXISTS db1;");
242 su.runSql(
"CREATE DATABASE db1;");
245 final int num_foreign_manual_refresh_threads = 2;
246 final int num_foreign_scheduled_refresh_threads = 2;
247 final int num_table_join_threads = 2;
249 "../java/utility/src/main/java/com/mapd/tests/data/simple_test.csv");
252 "localhost", 6274,
"db1",
"admin",
"HyperInteractive");
253 dba.runSql(
"CREATE SERVER test_server "
254 +
"FOREIGN DATA WRAPPER delimited_file WITH (storage_type = 'LOCAL_FILE', "
255 +
"base_path = '" + foreign_server_path.toString() +
"');");
256 for (
int i = 0; i < num_foreign_manual_refresh_threads; ++i) {
259 for (
int i = 0; i < num_foreign_scheduled_refresh_threads; ++i) {
261 dba,
"test_foreign_table_scheduled_refresh_" + i);
263 for (
int i = 0; i < num_table_join_threads; ++i) {
270 num_foreign_manual_refresh_threads,
271 num_foreign_scheduled_refresh_threads,
272 num_table_join_threads,
276 for (
int i = 0; i < num_foreign_manual_refresh_threads; ++i) {
277 dba.runSql(
"DROP FOREIGN TABLE test_foreign_table_manual_refresh_" + i +
";");
279 for (
int i = 0; i < num_foreign_manual_refresh_threads; ++i) {
280 dba.runSql(
"DROP FOREIGN TABLE test_foreign_table_scheduled_refresh_" + i +
";");
282 for (
int i = 0; i < num_table_join_threads; ++i) {
283 dba.runSql(
"DROP TABLE test_table_" + i +
"_left ;");
284 dba.runSql(
"DROP TABLE test_table_" + i +
"_right ;");
286 dba.runSql(
"DROP SERVER test_server;");
287 su.runSql(
"DROP DATABASE db1;");
289 logger.info(
"ForeignTableRefreshConcurrencyTest() done");
Path getAbsolutePath(String path)
static String getTimeStamp(int sec_from_now)
void createTestTable(HeavyDBTestClient dba, String table_name, Path copy_from_path)
static void main(String[] args)
void runSqlAsUser(String sql, HeavyDBTestClient user, String logPrefix)
static final Logger logger
void createForeignTestTableManualRefresh(HeavyDBTestClient dba, String foreign_table_name)
void createTestTables(HeavyDBTestClient dba, String table_name, Path copy_from_path)
void runTest(String db, String userName, String userPassword, int num_foreign_manual_refresh_threads, int num_foreign_scheduled_refresh_threads, int num_table_join_threads, int num_runs)
void createForeignTestTableScheduledRefresh(HeavyDBTestClient dba, String foreign_table_name)