16 package com.mapd.tests;
18 import org.apache.commons.cli.*;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
22 import java.util.ArrayList;
23 import java.util.Random;
24 import java.util.concurrent.CyclicBarrier;
28 LoggerFactory.getLogger(UpdateDeleteInsertConcurrencyTest.class);
51 public static void main(String[]
args)
throws Exception {
52 Options options =
new Options();
53 options.addOption(Option.builder(
"t")
54 .longOpt(
"temptables")
55 .desc(
"Use temporary tables for test")
57 CommandLineParser clp =
new DefaultParser();
58 final CommandLine commandLine = clp.parse(options,
args);
60 Boolean useTemporaryTables = commandLine.hasOption(
"temptables");
62 test.testUpdateDeleteInsertConcurrency(useTemporaryTables);
72 Boolean concurrentInserts)
throws Exception {
75 final int num_rows = 1000;
76 final int fragment_size = 10;
77 final String tableName =
"test";
78 Exception exceptions[] =
new Exception[num_threads];
80 final CyclicBarrier barrier =
new CyclicBarrier(num_threads,
new Runnable() {
84 "localhost", 6274, db, dbaUser, dbaPassword);
85 final String createPrefix =
87 dba.runSql(createPrefix + tableName
88 +
"(x BIGINT, y INTEGER, z SMALLINT, a TINYINT, f FLOAT, d DOUBLE, deci DECIMAL(18,6), str TEXT ENCODING NONE) WITH (FRAGMENT_SIZE = "
89 + fragment_size +
")");
90 if (!concurrentInserts) {
91 for (
int i = 0; i < num_rows; i++) {
92 final String integer_val = Integer.toString(i);
93 final String small_val = Integer.toString(i % 128);
94 final String fp_val = Double.toString(i * 1.1);
95 final String deci_val = Double.toString(i + 0.01);
96 final String str_val =
"'" +
text_values[i % text_values.length] +
"'";
97 final String values_string = String.join(
" , ",
106 dba.runSql(
"INSERT INTO " + tableName +
" VALUES "
107 +
"(" + values_string +
")");
110 }
catch (Exception e) {
111 logger.error(
"[" + Thread.currentThread().getId() +
"]"
112 +
" Caught Exception: " + e.getMessage(),
119 ArrayList<Thread> threads =
new ArrayList<>();
120 for (
int i = 0; i < num_threads; i++) {
121 logger.info(
"Starting " + i);
122 final int threadId = i;
124 Thread t =
new Thread(
new Runnable() {
127 long tid = Thread.currentThread().getId();
128 String logPrefix =
"[" + tid +
"]";
135 "localhost", 6274, db, dbUser, dbPassword);
137 if (concurrentInserts) {
138 for (
int i = 0; i < num_rows / num_threads; i++) {
139 final String integer_val = Integer.toString(i);
140 final String small_val = Integer.toString(i % 128);
141 final String fp_val = Double.toString(i * 1.1);
142 final String deci_val = Double.toString(i + 0.01);
143 final String str_val =
"'" +
text_values[i % text_values.length] +
"'";
144 final String values_string = String.join(
" , ",
153 user.runSql(
"INSERT INTO " + tableName +
" VALUES "
154 +
"(" + values_string +
")");
158 Random rand =
new Random(tid);
160 sql =
"DELETE FROM " + tableName +
" WHERE x = " + (tid * 2) +
";";
161 logger.info(logPrefix +
" " + sql);
164 sql =
"DELETE FROM " + tableName +
" WHERE y = " + rand.nextInt(num_rows)
166 logger.info(logPrefix +
" " + sql);
169 sql =
"SELECT COUNT(*) FROM " + tableName +
" WHERE x > " + (tid * 2) +
";";
170 logger.info(logPrefix +
" " + sql);
173 sql =
"DELETE FROM " + tableName +
" WHERE str = '"
174 +
text_values[rand.nextInt(text_values.length)] +
"';";
175 logger.info(logPrefix +
" " + sql);
178 sql =
"SELECT * FROM " + tableName +
" WHERE str = '"
179 +
text_values[rand.nextInt(text_values.length)] +
"';";
180 logger.info(logPrefix +
" " + sql);
183 sql =
"DELETE FROM " + tableName +
" WHERE d < " + rand.nextInt(num_rows / 4)
185 logger.info(logPrefix +
" " + sql);
188 sql =
"INSERT INTO " + tableName +
" VALUES "
189 +
"(" + tid +
"," + tid +
"," + tid +
"," + tid +
"," + tid +
","
190 + tid +
"," + tid +
"," + (tid % 2 == 0 ?
"'value_1'" :
"'value_2'")
192 logger.info(logPrefix +
" " + sql);
195 sql =
"DELETE FROM " + tableName +
" WHERE z = " + tid +
";";
196 logger.info(logPrefix +
" " + sql);
199 }
catch (Exception e) {
200 logger.error(logPrefix +
" Caught Exception: " + e.getMessage(), e);
201 exceptions[threadId] = e;
209 for (Thread t : threads) {
214 HeavyDBTestClient.getClient(
"localhost", 6274, db, dbaUser, dbaPassword);
215 dba.runSql(
"DROP TABLE " + tableName +
";");
217 for (Exception e : exceptions) {
219 logger.error(
"Exception: " + e.getMessage(), e);
227 logger.info(
"testUpdateDeleteInsertConcurrency()");
230 if (useTemporaryTables_) {
231 logger.info(
"Using temporary tables");
234 "localhost", 6274,
"heavyai",
"admin",
"HyperInteractive");
235 su.runSql(
"CREATE USER dba (password = 'password', is_super = 'true');");
236 su.runSql(
"CREATE USER bob (password = 'password', is_super = 'false');");
238 su.runSql(
"GRANT CREATE on DATABASE heavyai TO bob;");
240 su.runSql(
"CREATE DATABASE db1;");
241 su.runSql(
"GRANT CREATE on DATABASE db1 TO bob;");
242 su.runSql(
"GRANT CREATE VIEW on DATABASE db1 TO bob;");
243 su.runSql(
"GRANT DROP on DATABASE db1 TO bob;");
244 su.runSql(
"GRANT DROP VIEW on DATABASE db1 TO bob;");
260 su.runSql(
"DROP DATABASE db1;");
261 su.runSql(
"DROP USER bob;");
262 su.runSql(
"DROP USER dba;");
264 logger.info(
"testUpdateDeleteInsertConcurrency() done");
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword, Boolean concurrentInserts)
static final Logger logger
static final String[] text_values
Boolean useTemporaryTables_
static void main(String[] args)
void testUpdateDeleteInsertConcurrency(Boolean useTemporaryTables)