OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SelectCopyFromDeleteConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.mapd.tests;
18 
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 
22 import java.nio.file.Files;
23 import java.nio.file.Path;
24 import java.nio.file.Paths;
25 import java.util.ArrayList;
26 
28  final static Logger logger =
29  LoggerFactory.getLogger(SelectCopyFromDeleteConcurrencyTest.class);
30 
32 
34  input_file_path_ = path;
35  }
36 
37  public static void main(String[] args) throws Exception {
38  // TODO: take as arg
39  String path_str = "java/utility/src/main/java/com/mapd/tests/data/simple_test.csv";
40 
41  Path path = Paths.get(path_str).toAbsolutePath();
42  assert Files.exists(path);
43 
46  test.testSelecyCopyFromConcurrency(/*shard_count=*/0);
47  // TODO: take as arg
48  int shard_count = 4; // expected configuration: 2 leaves, 2 GPUs per leaf
49  test.testSelecyCopyFromConcurrency(shard_count);
50  }
51 
52  private void run_test(HeavyDBTestClient dba,
53  HeavyDBTestClient user,
54  String prefix,
55  Path filepath,
56  int shard_count,
57  int runs) throws Exception {
58  String table_name = "table_" + prefix + "_";
59  long tid = Thread.currentThread().getId();
60  if (shard_count > 0) {
61  logger.info("[" + tid + "] "
62  + "CREATE " + table_name + " WITH " + shard_count + " SHARDS");
63  user.runSql("CREATE TABLE " + table_name
64  + " (id INTEGER, str TEXT ENCODING DICT(32), x DOUBLE, y BIGINT, SHARD KEY(id)) WITH (FRAGMENT_SIZE=1, SHARD_COUNT="
65  + shard_count + ")");
66  } else {
67  logger.info("[" + tid + "] "
68  + "CREATE " + table_name);
69  user.runSql("CREATE TABLE " + table_name
70  + " (id INTEGER, str TEXT ENCODING DICT(32), x DOUBLE, y BIGINT) WITH (FRAGMENT_SIZE=1)");
71  }
72 
73  for (int i = 0; i < runs; i++) {
74  logger.info("[" + tid + "] "
75  + "SELECT 1 " + table_name);
76  user.runSql("SELECT id, str FROM " + table_name + " WHERE x < 5.0 LIMIT 1;");
77 
78  logger.info("[" + tid + "] "
79  + "COPY 1 " + table_name);
80  user.runSql("COPY " + table_name + " FROM '" + filepath.toString()
81  + "' WITH (header='false');");
82 
83  logger.info("[" + tid + "] "
84  + "SELECT 2 " + table_name);
85  user.runSql("SELECT COUNT(*) FROM " + table_name + " WHERE x = (SELECT MIN(x) FROM "
86  + table_name + ");");
87 
88  logger.info("[" + tid + "] "
89  + "DELETE 1 " + table_name);
90  user.runSql("DELETE FROM " + table_name + " WHERE x = (SELECT MAX(x) FROM "
91  + table_name + ");");
92 
93  logger.info("[" + tid + "] "
94  + "SELECT 2 " + table_name);
95  user.runSql("COPY " + table_name + " FROM '" + filepath.toString()
96  + "' WITH (header='false');");
97 
98  logger.info("[" + tid + "] "
99  + "TRUNCATE 1 " + table_name);
100  user.runSql("TRUNCATE TABLE " + table_name);
101 
102  if (tid % 4 == 0) {
103  logger.info("[" + tid + "] "
104  + "COPY 3 " + table_name);
105  dba.runSql("COPY " + table_name + " FROM '" + filepath.toString()
106  + "' WITH (header='false');");
107  } else {
108  logger.info("[" + tid + "] "
109  + "SELECT 3 " + table_name);
110  user.runSql("SELECT COUNT(*) FROM " + table_name
111  + " WHERE x = (SELECT MIN(x) FROM " + table_name + ");");
112  }
113  }
114 
115  logger.info("[" + tid + "] "
116  + "DROP TABLE " + table_name);
117  dba.runSql("DROP TABLE " + table_name + ";");
118  }
119 
120  private void runTest(int num_threads, int shard_count) throws Exception {
121  final int runs = 25;
122  Exception exceptions[] = new Exception[num_threads];
123 
124  ArrayList<Thread> threads = new ArrayList<>();
125  for (int i = 0; i < num_threads; i++) {
126  logger.info("Starting " + i);
127  final int threadId = i;
128 
129  Thread t = new Thread(new Runnable() {
130  @Override
131  public void run() {
132  try {
133  final String username = threadId % 2 == 0 ? "alice" : "bob";
134  HeavyDBTestClient dba = HeavyDBTestClient.getClient(
135  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
136  HeavyDBTestClient user = HeavyDBTestClient.getClient(
137  "localhost", 6274, "heavyai", username, "password");
138  final String prefix = "for_" + username + "_" + threadId + "_";
139 
140  run_test(dba, user, prefix, input_file_path_, shard_count, runs);
141 
142  } catch (Exception e) {
143  logger.error("[" + Thread.currentThread().getId() + "] "
144  + "Caught Exception: " + e.getMessage());
145  exceptions[threadId] = e;
146  }
147  }
148  });
149  t.start();
150  threads.add(t);
151  }
152 
153  for (Thread t : threads) {
154  t.join();
155  }
156 
157  for (Exception e : exceptions) {
158  if (null != e) {
159  logger.error("Exception: " + e.getMessage(), e);
160  throw new Exception(e.getMessage(), e);
161  }
162  }
163  }
164 
165  public void testSelecyCopyFromConcurrency(int shard_count) throws Exception {
166  logger.info("testSelectCopyFromConcurrency()");
167 
168  logger.info("Using import file: " + input_file_path_.toString());
169 
170  // Use the default database for now
171  HeavyDBTestClient su = HeavyDBTestClient.getClient(
172  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
173  su.runSql("CREATE USER alice (password = 'password', is_super = 'false');");
174  su.runSql("CREATE USER bob (password = 'password', is_super = 'false');");
175 
176  su.runSql("GRANT CREATE on DATABASE heavyai TO alice;");
177  su.runSql("GRANT CREATE on DATABASE heavyai TO bob;");
178 
179  su.runSql("GRANT CREATE VIEW on DATABASE heavyai TO alice;");
180  su.runSql("GRANT CREATE VIEW on DATABASE heavyai TO bob;");
181 
182  su.runSql("GRANT DROP VIEW on DATABASE heavyai TO alice;");
183  su.runSql("GRANT DROP VIEW on DATABASE heavyai TO bob;");
184 
185  su.runSql("GRANT ACCESS on database heavyai TO alice;");
186  su.runSql("GRANT ACCESS on database heavyai TO bob;");
187 
188  final int num_threads = 5;
189  runTest(num_threads, shard_count);
190 
191  su.runSql("DROP USER alice;");
192  su.runSql("DROP USER bob;");
193 
194  logger.info("Pass!");
195  }
196 }
void run_test(HeavyDBTestClient dba, HeavyDBTestClient user, String prefix, Path filepath, int shard_count, int runs)
static bool run