OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2023 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.io.File;
22 import java.lang.Integer;
23 import java.lang.RuntimeException;
24 import java.lang.String;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.Random;
29 import java.util.concurrent.CyclicBarrier;
30 
31 import ai.heavy.thrift.server.TDBException;
32 
33 public abstract class ConcurrencyTest {
34  public final static String db = "TestDB", userName = "admin",
35  password = "HyperInteractive", localhost = "localhost",
36  defaultDb = "heavyai";
37  public final static int defaultPort = 6274, defaultNumThreads = 5,
39  public final static boolean defaultEnableHeavyConnect = true,
41 
45 
46  static Random rand = new Random();
47 
48  // These need to be initialized in the derived class.
49  public static Logger logger = null;
50  public static String testName = "";
51  public static int[] ports = null;
52 
53  public CyclicBarrier barrier; // Sync point for threads.
54  public ArrayList<String> exceptionTexts = new ArrayList<String>();
55 
56  public static boolean deleteDirectory(File dir) {
57  File[] files = dir.listFiles();
58  if (files != null) {
59  for (File file : files) {
60  deleteDirectory(file);
61  }
62  }
63  return dir.delete();
64  }
65 
66  // Optionally specify the set of ports on which heavyDB servers are running (can run
67  // multiple servers for Horizontal Scaling testing). Threads will be randomly assigned
68  // to ports.
69  public static int[] getPorts() {
70  String portString = System.getProperty("PORT_LIST");
71  int[] retPorts;
72  if (portString != null) {
73  List<String> portList = Arrays.asList(portString.split(","));
74  if (portList.size() > 0) {
75  retPorts = new int[portList.size()];
76  for (int i = 0; i < portList.size(); ++i) {
77  retPorts[i] = Integer.parseInt(portList.get(i));
78  }
79  return retPorts;
80  }
81  }
82  retPorts = new int[1];
83  retPorts[0] = defaultPort;
84  return retPorts;
85  }
86 
87  // Optionally specify the number of threads teach test class will run.
88  public static int getNumThreads() {
89  String propertyString = System.getProperty("NUM_THREADS");
90  if (propertyString == null) {
91  return defaultNumThreads;
92  } else {
93  return Integer.parseInt(propertyString);
94  }
95  }
96 
97  // Optionally specify the number of iterations each thread will run.
98  public static int getNumIterations() {
99  String propertyString = System.getProperty("NUM_ITERATIONS");
100  if (propertyString == null) {
101  return defaultNumIterations;
102  } else {
103  return Integer.parseInt(propertyString);
104  }
105  }
106 
107  // Optionally controll if HeavyConnect testing is enabled.
108  public static boolean getEnableHeavyConnect() {
109  String propertyString = System.getProperty("ENABLE_HEAVY_CONNECT");
110  if (propertyString == null) {
112  } else {
113  return Boolean.parseBoolean(propertyString);
114  }
115  }
116 
117  // Optionally enable a monitoring thread that will periodically display how many threads
118  // are still running (useful for debugging).
119  public static boolean getEnableMonitorThread() {
120  String propertyString = System.getProperty("ENABLE_MONITOR_THREAD");
121  if (propertyString == null) {
123  } else {
124  return Boolean.parseBoolean(propertyString);
125  }
126  }
127 
128  // Dump the configuration.
129  public static String getConfig() {
130  String log = "Config for " + testName + ":\n{\n ports = {\n";
131  for (int port : ports) {
132  log += " " + port + "\n";
133  }
134  log += " }\n num_threads = " + numThreads + "\n num_iterations = " + numIterations
135  + "\n enable_heavy_connect = " + enableHeavyConnect
136  + "\n enable_monitor_thread = " + enableMonitorThread + "\n}";
137  return log;
138  }
139 
140  // Barrier to synchronize all threads.
141  public static CyclicBarrier createBarrier(int numThreadsToWait) {
142  return new CyclicBarrier(numThreadsToWait, new Runnable() {
143  @Override
144  public void run() {
145  logger.info("Threads Synched");
146  }
147  });
148  }
149 
150  // Print all errors that have accumulated.
151  public static void printErrors(ArrayList<String> exceptionTexts) {
152  if (exceptionTexts.size() > 0) {
153  String errors = "\n";
154  for (String s : exceptionTexts) {
155  errors += s + "\n";
156  }
157  logger.error("Found exceptions:" + errors);
158  }
159  }
160 
161  public static int getRandomPort() {
162  return ports[rand.nextInt(ports.length)];
163  }
164 
165  public abstract List<SqlCommandThread[]> createTestThreads();
166  public abstract void runTests(final List<SqlCommandThread[]> tests) throws Exception;
167  public abstract void setUpTests() throws Exception;
168  public abstract void cleanUpTests(final List<SqlCommandThread[]> tests)
169  throws Exception;
170 
171  // Performs test setup, running of all tests, and teardown.
172  public void testConcurrency() throws Exception {
173  if (testName.equals("")) {
174  throw new RuntimeException("Derived test has not set a test name");
175  }
176  if (logger == null) {
177  throw new RuntimeException("Derived test has not initialized logger");
178  }
179  logger.info(testName + "()");
180  final List<SqlCommandThread[]> tests = createTestThreads();
181  setUpTests();
182  runTests(tests);
183  cleanUpTests(tests);
184  logger.info(testName + "() done");
185  }
186 
187  public HeavyDBTestClient getAdminClient(String db) throws Exception {
188  return HeavyDBTestClient.getClient(localhost, ports[0], db, userName, password);
189  }
190 
191  public void runAndLog(HeavyDBTestClient client, String sql) throws Exception {
192  client.runSql(sql);
193  logger.info(" " + sql);
194  }
195 
196  // Optional monitoring thread that will report how many threads are still running once a
197  // minute (useful when debugging).
198  public class MonitoringThread extends Thread {
199  List<SqlCommandThread[]> monitoredThreads;
201 
203  this.monitoredThreads = monitoredThreads;
204  this.numThreads = numThreads;
205  }
206 
207  @Override
208  public void run() {
209  int finishedThreads = 0;
210  int totalThreads = monitoredThreads.size() * numThreads;
211  while (finishedThreads < totalThreads) {
212  finishedThreads = 0;
213  for (SqlCommandThread threadGroup[] : monitoredThreads) {
214  for (SqlCommandThread thread : threadGroup) {
215  if (thread.getState() == Thread.State.TERMINATED) {
216  finishedThreads++;
217  }
218  }
219  }
220  logger.info("Threads running: " + (totalThreads - finishedThreads));
221  try {
222  Thread.sleep(60000);
223  } catch (Exception e) {
224  logger.error("Monitoring thread: " + e.getMessage());
225  }
226  }
227  }
228  }
229 
230  public class SqlCommandThread extends Thread {
232  final String threadName;
233  final int port, iterations, threadId;
234 
236  final List<String> queries,
237  int threadId,
238  int port,
239  int iterations,
240  final List<String> exceptions,
241  final List<String> cleanUpQueries) {
242  this.queries = queries;
243  this.port = port;
244  this.iterations = iterations;
245  this.threadId = threadId;
246  this.threadName = threadName + "[" + port + "][" + threadId + "]";
247  this.expectedExceptionTexts = exceptions;
248  this.cleanUpQueries = cleanUpQueries;
249  }
250 
252  final List<String> queries,
253  int threadId,
254  final List<String> exceptions,
255  final List<String> cleanUpQueries) {
256  this(threadName,
257  queries,
258  threadId,
259  getRandomPort(),
261  exceptions,
263  }
264 
265  @Override
266  public void run() {
267  logger.info(" Starting: " + threadName);
268  try {
269  HeavyDBTestClient user =
270  HeavyDBTestClient.getClient(localhost, port, db, userName, password);
271  barrier.await(); // Synch point.
272  for (int iteration = 0; iteration < iterations; ++iteration) {
273  for (String query : queries) {
274  logger.info(" " + threadName + "[" + iteration + "]: " + query);
275  try {
276  user.runSql(query);
277  } catch (TDBException e) {
278  boolean foundExpected = false;
279  for (String exceptionText : expectedExceptionTexts) {
280  if (e.error_msg.contains(exceptionText)) {
281  foundExpected = true;
282  }
283  }
284  if (!foundExpected) {
285  throw e;
286  }
287  }
288  }
289  }
290  } catch (TDBException e) {
291  logger.error(" " + threadName + ": caught exception - " + e.error_msg);
292  exceptionTexts.add(threadName + ": " + e.error_msg);
293  } catch (Exception e) {
294  logger.error(" " + threadName + ": caught exception - " + e.getMessage());
295  exceptionTexts.add(threadName + ": " + e.getMessage());
296  }
297  logger.info(" Finished: " + threadName);
298  }
299  }
300 }
void runAndLog(HeavyDBTestClient client, String sql)
static boolean deleteDirectory(File dir)
MonitoringThread(List< SqlCommandThread[]> monitoredThreads, int numThreads)
static void printErrors(ArrayList< String > exceptionTexts)
static CyclicBarrier createBarrier(int numThreadsToWait)
ArrayList< String > exceptionTexts
abstract List< SqlCommandThread[]> createTestThreads()
abstract void runTests(final List< SqlCommandThread[]> tests)
SqlCommandThread(final String threadName, final List< String > queries, int threadId, int port, int iterations, final List< String > exceptions, final List< String > cleanUpQueries)
static bool run
SqlCommandThread(final String threadName, final List< String > queries, int threadId, final List< String > exceptions, final List< String > cleanUpQueries)
static final boolean defaultEnableHeavyConnect
static final boolean defaultEnableMonitorThread
abstract void cleanUpTests(final List< SqlCommandThread[]> tests)
HeavyDBTestClient getAdminClient(String db)