16 package com.mapd.logrunner;
18 import org.apache.thrift.TException;
19 import org.apache.thrift.protocol.TJSONProtocol;
20 import org.apache.thrift.protocol.TProtocol;
21 import org.apache.thrift.transport.THttpClient;
22 import org.apache.thrift.transport.TTransport;
23 import org.apache.thrift.transport.TTransportException;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import java.io.BufferedReader;
28 import java.io.FileNotFoundException;
29 import java.io.FileOutputStream;
30 import java.io.FileReader;
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.concurrent.ArrayBlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.logging.Level;
43 import ai.heavy.thrift.server.Heavy;
44 import ai.heavy.thrift.server.TColumn;
45 import ai.heavy.thrift.server.TColumnData;
46 import ai.heavy.thrift.server.TColumnType;
47 import ai.heavy.thrift.server.TDBException;
48 import ai.heavy.thrift.server.TDBInfo;
49 import ai.heavy.thrift.server.TDatum;
50 import ai.heavy.thrift.server.TExecuteMode;
51 import ai.heavy.thrift.server.TPixel;
52 import ai.heavy.thrift.server.TQueryResult;
53 import ai.heavy.thrift.server.TRenderResult;
54 import ai.heavy.thrift.server.TRow;
55 import ai.heavy.thrift.server.TRowSet;
56 import ai.heavy.thrift.server.TTableDetails;
59 final static Logger
logger = LoggerFactory.getLogger(LogRunner.class);
62 private HashMap<Integer, String>
json;
66 public static void main(String[]
args)
throws TException {
67 logger.info(
"Hello, World");
72 }
catch (TTransportException ex) {
73 logger.error(ex.toString());
79 sqlquery =
new HashMap<Integer, String>();
81 json =
new HashMap<Integer, String>();
84 void doWork(String[]
args)
throws TTransportException, TException {
85 logger.info(
"In doWork here");
87 int numberThreads = 3;
97 logger.info(
"got session");
100 ExecutorService executor =
new ThreadPoolExecutor(numberThreads,
103 TimeUnit.MILLISECONDS,
104 new ArrayBlockingQueue<Runnable>(15),
105 new ThreadPoolExecutor.CallerRunsPolicy());
109 BufferedReader in =
new BufferedReader(
new FileReader(
args[2]));
112 while ((str = in.readLine()) != null) {
113 Runnable worker =
new myThread(str, client, session);
118 logger.info(
"############loop complete");
121 }
catch (IOException e) {
122 logger.error(
"IOException " + e.getMessage());
126 private Heavy.Client
getClient(String hostname,
int port)
throws TTransportException {
127 TTransport transport = null;
130 transport =
new THttpClient(
"http://" + hostname +
":" + port);
135 TProtocol protocol =
new TJSONProtocol(transport);
138 return new Heavy.Client(protocol);
142 throws TTransportException, TDBException, TException {
143 String session = client.connect(
"mapd",
"HyperInteractive",
"mapd");
144 logger.info(
"Connected session is " + session);
149 throws TDBException, TException {
151 logger.info(
"Trying to disconnect session " + session);
152 client.disconnect(session);
155 private void theRest(Heavy.Client client, String session)
throws TException {
157 List<TDBInfo> dbs = client.get_databases(session);
159 for (TDBInfo db : dbs) {
160 logger.info(
"db is " + db.toString());
164 List<String>
tables = client.get_tables(session);
166 for (String tab : tables) {
167 logger.info(
"Tables is " + tab);
171 logger.info(
"Version " + client.get_version());
174 TTableDetails table_details = client.get_table_details(session,
"flights");
175 for (TColumnType col : table_details.row_desc) {
176 logger.info(
"col name :" + col.col_name);
177 logger.info(
"\tcol encoding :" + col.col_type.encoding);
178 logger.info(
"\tcol is_array :" + col.col_type.is_array);
179 logger.info(
"\tcol nullable :" + col.col_type.nullable);
184 logger.info(
" -- before query -- ");
186 TQueryResult sql_execute = client.sql_execute(session,
187 "Select uniquecarrier,flightnum from flights LIMIT 3;",
196 logger.info(
" -- after query -- ");
198 logger.info(
"TQueryResult execution time is " + sql_execute.getExecution_time_ms());
199 logger.info(
"TQueryResult is " + sql_execute.toString());
200 logger.info(
"TQueryResult getFieldValue is "
201 + sql_execute.getFieldValue(TQueryResult._Fields.ROW_SET));
203 TRowSet row_set = sql_execute.getRow_set();
204 Object fieldValue = sql_execute.getFieldValue(TQueryResult._Fields.ROW_SET);
206 logger.info(
"fieldValue " + fieldValue);
208 logger.info(
"TRowSet is " + row_set.toString());
210 logger.info(
"Get rows size " + row_set.getRowsSize());
211 logger.info(
"Get col size " + row_set.getRowsSize());
213 List<TRow>
rows = row_set.getRows();
215 for (TRow row : rows) {
216 List<TDatum> cols = row.getCols();
218 for (TDatum dat : cols) {
219 logger.info(
"ROW " + count +
" " + dat.getFieldValue(TDatum._Fields.VAL));
225 List<TColumn> columns = row_set.getColumns();
227 logger.info(
"columns " + columns);
229 for (TColumn col : columns) {
230 TColumnData data = col.getData();
232 logger.info(
"COL " + count +
" " + data.toString());
243 myThread(String str1, Heavy.Client client1, String session1) {
251 int logStart = str.indexOf(
']');
252 if (logStart != -1) {
253 String det = str.substring(logStart + 1).trim();
254 String
header = str.substring(0, logStart).trim();
256 String[] headDet = header.split(
" .");
259 if (headDet.length != 4 || headDet[0].equals(
"Log")) {
262 Integer pid = Integer.valueOf(headDet[2]);
264 if (det.contains(
"sql_execute :")) {
265 logger.info(
"det " + det);
266 String sl[] = det.split(
":query_str:");
267 logger.info(
"run query " + sl[1]);
269 client.sql_execute(
session, sl[1],
true, null, -1, -1);
270 }
catch (TDBException ex1) {
272 "Failed to execute " + sl[1] +
" exception " + ex1.getError_msg());
273 }
catch (TException ex) {
274 logger.error(
"Failed to execute " + sl[1] +
" exception " + ex.toString());
283 if (det.contains(
"get_result_row_for_pixel :")) {
284 logger.info(
"det " + det);
285 String ss[] = det.split(
":");
286 String sl[] = det.split(
":table_col_names:");
287 logger.info(
"run get_result_for_pixel " + sl[1]);
288 Map<String, List<String>> tcn =
new HashMap<String, List<String>>();
290 String tn[] = sl[1].split(
":");
291 for (
int i = 0; i < tn.length; i++) {
292 String
name[] = tn[i].split(
",");
293 List<String> col =
new ArrayList<String>();
294 for (
int j = 1; j < name.length; j++) {
297 tcn.put(name[0], col);
300 client.get_result_row_for_pixel(
session,
301 Integer.parseInt(ss[3]),
302 new TPixel(
Integer.parseInt(ss[5]), Integer.parseInt(ss[7])),
305 Integer.parseInt(ss[11]),
307 }
catch (TDBException ex1) {
308 logger.error(
"Failed to execute get_result_row_for_pixel exception "
309 + ex1.getError_msg());
310 }
catch (TException ex) {
311 logger.error(
"Failed to execute get_result_row_for_pixel exception "
317 if (det.contains(
"render_vega :")) {
318 logger.info(
"det " + det);
319 String ss[] = det.split(
":");
320 String sl[] = det.split(
":vega_json:");
321 json.put(pid, det.substring(det.indexOf(
"render_vega :") + 13, det.length()));
322 logger.info(
"JSON = " + sl[1]);
323 logger.info(
"widget = " + Integer.parseInt(ss[3]));
324 logger.info(
"compressionLevel = " + Integer.parseInt(ss[5]));
325 logger.info(
"run render_vega");
327 logger.info(
"In render: setting gpu mode as we were in CPU mode");
331 client.set_execution_mode(
session, TExecuteMode.GPU);
332 }
catch (TException ex) {
333 logger.error(
"Failed to set_execution_mode exception " + ex.toString());
337 TRenderResult fred = client.render_vega(
session,
338 Integer.parseInt(ss[3]),
344 FileOutputStream fos;
346 fos =
new FileOutputStream(
"/tmp/png.png");
348 fred.image.position(0);
349 byte[] tgxImageDataByte =
new byte[fred.image.limit()];
350 fred.image.get(tgxImageDataByte);
351 fos.write(tgxImageDataByte);
353 }
catch (FileNotFoundException ex) {
354 logger.error(
"Failed to create file exception " + ex.toString());
355 }
catch (IOException ex) {
356 logger.error(
"Failed to create file exception " + ex.toString());
360 }
catch (TException ex) {
361 logger.error(
"Failed to execute render_vega exception " + ex.toString());
366 if (det.contains(
"User mapd sets CPU mode")) {
367 logger.info(
"Set cpu mode");
371 client.set_execution_mode(
session, TExecuteMode.CPU);
372 }
catch (TException ex) {
373 logger.error(
"Failed to set_execution_mode exception " + ex.toString());
378 if (det.contains(
"User mapd sets GPU mode")) {
379 logger.info(
"Set gpu mode");
383 client.set_execution_mode(
session, TExecuteMode.GPU);
384 }
catch (TException ex) {
386 "Failed to execute set_execution_mode exception " + ex.toString());
myThread(String str1, Heavy.Client client1, String session1)
HashMap< Integer, String > sqlquery
void doWork(String[] args)
void theRest(Heavy.Client client, String session)
HashMap< Integer, String > originalSql
static final Logger logger
HashMap< Integer, String > json
String getSession(Heavy.Client client)
static void main(String[] args)
void closeSession(Heavy.Client client, String session)
Heavy.Client getClient(String hostname, int port)