17 package com.mapd.metadata;
19 import com.google.common.collect.ImmutableList;
20 import com.google.gson.Gson;
21 import com.google.gson.JsonArray;
22 import com.google.gson.JsonElement;
23 import com.google.gson.JsonObject;
30 import org.apache.calcite.schema.Table;
31 import org.apache.thrift.TException;
32 import org.apache.thrift.protocol.TBinaryProtocol;
33 import org.apache.thrift.protocol.TProtocol;
34 import org.apache.thrift.transport.TServerSocket;
35 import org.apache.thrift.transport.TSocket;
36 import org.apache.thrift.transport.TTransport;
37 import org.apache.thrift.transport.TTransportException;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 import java.io.FileInputStream;
43 import java.io.IOException;
44 import java.sql.Connection;
45 import java.sql.DriverManager;
46 import java.sql.ResultSet;
47 import java.sql.SQLException;
48 import java.sql.Statement;
49 import java.util.ArrayList;
50 import java.util.HashSet;
51 import java.util.List;
53 import java.util.Map.Entry;
55 import java.util.concurrent.ConcurrentHashMap;
57 import ai.heavy.thrift.server.Heavy;
58 import ai.heavy.thrift.server.TColumnType;
59 import ai.heavy.thrift.server.TDBException;
60 import ai.heavy.thrift.server.TDBInfo;
61 import ai.heavy.thrift.server.TDatumType;
62 import ai.heavy.thrift.server.TEncodingType;
63 import ai.heavy.thrift.server.TTableDetails;
64 import ai.heavy.thrift.server.TTypeInfo;
67 final static Logger
HEAVYDBLOGGER = LoggerFactory.getLogger(MetaConnect.class);
76 private static final int KCHAR = 2;
80 private static final int KINT = 6;
84 private static final int KTIME = 10;
87 private static final int KTEXT = 13;
88 private static final int KDATE = 14;
89 private static final int KARRAY = 15;
92 private static final int KPOINT = 18;
102 new ConcurrentHashMap<>();
104 new ConcurrentHashMap<>();
115 this.default_db = db;
117 if (currentHeavyDBUser != null) {
118 this.default_db = currentHeavyDBUser.getDB();
120 this.default_db = null;
123 this.currentUser = currentHeavyDBUser;
126 this.sock_transport_properties = skT;
147 List<String> dbList =
new ArrayList<String>(DATABASE_TO_TABLES.size());
157 Class.forName(
"org.sqlite.JDBC");
158 }
catch (ClassNotFoundException ex) {
159 String err =
"Could not find class for metadata connection; DB: '" + catalog
160 +
"' data dir '" +
dataDir +
"', error was " + ex.getMessage();
161 HEAVYDBLOGGER.error(err);
162 throw new RuntimeException(err);
167 catConn = DriverManager.getConnection(connectURL);
168 }
catch (SQLException ex) {
169 String err =
"Could not establish a connection for metadata; DB: '" + catalog
170 +
"' data dir '" +
dataDir +
"', error was " + ex.getMessage();
171 HEAVYDBLOGGER.error(err);
172 throw new RuntimeException(err);
174 HEAVYDBLOGGER.debug(
"Opened database successfully");
179 File directory =
new File(path);
180 if (!directory.isDirectory()) {
181 throw new RuntimeException(
"Catalog directory not found at: " + path);
183 for (File file : directory.listFiles()) {
184 if (file.getName().equalsIgnoreCase(catalog)) {
185 return file.getName();
188 throw new RuntimeException(
"Database file not found for: " + catalog);
194 }
catch (SQLException ex) {
195 String err =
"Could not disconnect from metadata "
196 +
" data dir '" +
dataDir +
"', error was " + ex.getMessage();
197 HEAVYDBLOGGER.error(err);
198 throw new RuntimeException(err);
207 List<String> dbTable =
208 ImmutableList.of(default_db.toUpperCase(), tableName.toUpperCase());
209 Table cTable = DB_TABLE_DETAILS.get(dbTable);
210 if (cTable != null) {
211 HEAVYDBLOGGER.debug(
"Metaconnect DB " +
default_db +
" get table " + tableName
212 +
" details " + cTable);
218 if (td.getView_sql() == null || td.getView_sql().isEmpty()) {
219 HEAVYDBLOGGER.debug(
"Processing a table");
221 DB_TABLE_DETAILS.putIfAbsent(dbTable, rTable);
222 HEAVYDBLOGGER.debug(
"Metaconnect DB " +
default_db +
" get table " + tableName
223 +
" details " + rTable +
" Not in buffer");
226 HEAVYDBLOGGER.debug(
"Processing a view");
228 DB_TABLE_DETAILS.putIfAbsent(dbTable, rTable);
229 HEAVYDBLOGGER.debug(
"Metaconnect DB " +
default_db +
" get view " + tableName
230 +
" details " + rTable +
" Not in buffer");
236 Set<String> mSet = DATABASE_TO_TABLES.get(default_db.toUpperCase());
237 if (mSet != null && mSet.size() > 0) {
238 HEAVYDBLOGGER.debug(
"Metaconnect DB getTables " +
default_db +
" tables " + mSet);
247 DATABASE_TO_TABLES.put(default_db.toUpperCase(), ts);
249 "Metaconnect DB getTables " +
default_db +
" tables " + ts +
" from catDB");
254 TProtocol protocol = null;
255 TTransport transport =
256 sock_transport_properties.openClientTransport(
"localhost",
dbPort);
257 if (!transport.isOpen()) transport.open();
258 protocol =
new TBinaryProtocol(transport);
260 Heavy.Client client =
new Heavy.Client(protocol);
261 List<String> tablesList =
262 client.get_tables_for_database(currentUser.getSession(),
default_db);
263 Set<String> ts =
new HashSet<String>(tablesList.size());
264 for (String tableName : tablesList) {
269 DATABASE_TO_TABLES.put(default_db.toUpperCase(), ts);
270 HEAVYDBLOGGER.debug(
"Metaconnect DB getTables " +
default_db +
" tables " + ts
274 }
catch (TTransportException ex) {
275 HEAVYDBLOGGER.error(
"TTransportException on port [" +
dbPort +
"]");
276 HEAVYDBLOGGER.error(ex.toString());
277 throw new RuntimeException(ex.toString());
278 }
catch (TDBException ex) {
279 HEAVYDBLOGGER.error(ex.getError_msg());
280 throw new RuntimeException(ex.getError_msg());
281 }
catch (TException ex) {
282 HEAVYDBLOGGER.error(ex.toString());
283 throw new RuntimeException(ex.toString());
289 Set<String> tableSet =
new HashSet<String>();
290 Statement stmt = null;
294 stmt = catConn.createStatement();
297 rs = stmt.executeQuery(
"SELECT name FROM mapd_tables ");
299 tableSet.add(rs.getString(
"name"));
301 HEAVYDBLOGGER.debug(
"Object name = " + rs.getString(
"name"));
306 }
catch (Exception e) {
307 String err =
"error trying to get all the tables, error was " + e.getMessage();
308 HEAVYDBLOGGER.error(err);
309 throw new RuntimeException(err);
315 final String filePath =
317 HEAVYDBLOGGER.debug(
"Opening temp table file at " + filePath);
318 String tempTablesJsonStr;
320 File tempTablesFile =
new File(filePath);
321 FileInputStream tempTablesStream =
new FileInputStream(tempTablesFile);
322 byte[] data =
new byte[(int) tempTablesFile.length()];
323 tempTablesStream.read(data);
324 tempTablesStream.close();
326 tempTablesJsonStr =
new String(data,
"UTF-8");
327 }
catch (java.io.FileNotFoundException e) {
331 Gson gson =
new Gson();
332 JsonObject fileParentObject = gson.fromJson(tempTablesJsonStr, JsonObject.class);
333 for (Entry<String, JsonElement> member : fileParentObject.entrySet()) {
334 String tableName = member.getKey();
335 tableSet.add(tableName);
337 HEAVYDBLOGGER.debug(
"Temp table object name = " + tableName);
340 }
catch (Exception e) {
341 String err =
"error trying to load temporary tables from json file, error was "
343 HEAVYDBLOGGER.error(err);
344 throw new RuntimeException(err);
360 TProtocol protocol = null;
362 TTransport transport =
363 sock_transport_properties.openClientTransport(
"localhost",
dbPort);
364 if (!transport.isOpen()) transport.open();
365 protocol =
new TBinaryProtocol(transport);
367 Heavy.Client client =
new Heavy.Client(protocol);
368 TTableDetails td = client.get_internal_table_details_for_database(
369 currentUser.getSession(), tableName,
default_db);
373 }
catch (TTransportException ex) {
374 HEAVYDBLOGGER.error(ex.toString());
375 throw new RuntimeException(ex.toString());
376 }
catch (TDBException ex) {
377 HEAVYDBLOGGER.error(ex.getError_msg());
378 throw new RuntimeException(ex.getError_msg());
379 }
catch (TException ex) {
380 HEAVYDBLOGGER.error(ex.toString());
381 throw new RuntimeException(ex.toString());
410 TTableDetails td =
new TTableDetails();
411 td.getRow_descIterator();
417 tempTableTd.is_temporary =
true;
419 }
catch (Exception e) {
421 "Table '" + tableName +
"' does not exist for DB '" +
default_db +
"'";
422 HEAVYDBLOGGER.error(err);
423 throw new RuntimeException(err);
428 Statement stmt = null;
431 stmt = catConn.createStatement();
432 HEAVYDBLOGGER.debug(
"table id is " + id);
433 HEAVYDBLOGGER.debug(
"table name is " + tableName);
434 String query = String.format(
435 "SELECT * FROM mapd_columns where tableid = %d and not is_deletedcol order by columnid;",
437 HEAVYDBLOGGER.debug(query);
438 rs = stmt.executeQuery(query);
439 int skip_physical_cols = 0;
441 String colName = rs.getString(
"name");
442 HEAVYDBLOGGER.debug(
"name = " + colName);
443 int colType = rs.getInt(
"coltype");
444 HEAVYDBLOGGER.debug(
"coltype = " + colType);
445 int colSubType = rs.getInt(
"colsubtype");
446 HEAVYDBLOGGER.debug(
"colsubtype = " + colSubType);
447 int compression = rs.getInt(
"compression");
448 HEAVYDBLOGGER.debug(
"compression = " + compression);
449 int compression_param = rs.getInt(
"comp_param");
450 HEAVYDBLOGGER.debug(
"comp_param = " + compression_param);
451 int size = rs.getInt(
"size");
452 HEAVYDBLOGGER.debug(
"size = " + size);
453 int colDim = rs.getInt(
"coldim");
454 HEAVYDBLOGGER.debug(
"coldim = " + colDim);
455 int colScale = rs.getInt(
"colscale");
456 HEAVYDBLOGGER.debug(
"colscale = " + colScale);
457 boolean isNotNull = rs.getBoolean(
"is_notnull");
458 HEAVYDBLOGGER.debug(
"is_notnull = " + isNotNull);
459 boolean isSystemCol = rs.getBoolean(
"is_systemcol");
460 HEAVYDBLOGGER.debug(
"is_systemcol = " + isSystemCol);
461 boolean isVirtualCol = rs.getBoolean(
"is_virtualcol");
462 HEAVYDBLOGGER.debug(
"is_vitrualcol = " + isVirtualCol);
463 HEAVYDBLOGGER.debug(
"");
464 TColumnType tct =
new TColumnType();
465 TTypeInfo tti =
new TTypeInfo();
472 tti.is_array =
false;
476 tti.nullable = !isNotNull;
478 tti.comp_param = compression_param;
481 tti.scale = colScale;
482 tti.precision = colDim;
484 tct.col_name = colName;
486 tct.is_system = isSystemCol;
489 if (
is_geometry(colType) || skip_physical_cols-- <= 0) td.addToRow_desc(tct);
491 }
catch (Exception e) {
492 String err =
"error trying to read from mapd_columns, error was " + e.getMessage();
493 HEAVYDBLOGGER.error(err);
494 throw new RuntimeException(err);
499 }
catch (SQLException ex) {
500 String err =
"Could not close resultset, error was " + ex.getMessage();
501 HEAVYDBLOGGER.error(err);
502 throw new RuntimeException(err);
508 }
catch (SQLException ex) {
509 String err =
"Could not close stmt, error was " + ex.getMessage();
510 HEAVYDBLOGGER.error(err);
511 throw new RuntimeException(err);
516 td.setView_sqlIsSet(
true);
523 throws IOException, RuntimeException {
524 TTableDetails td =
new TTableDetails();
525 td.getRow_descIterator();
528 final String filePath =
530 HEAVYDBLOGGER.debug(
"Opening temp table file at " + filePath);
532 String tempTablesJsonStr;
534 File tempTablesFile =
new File(filePath);
535 FileInputStream tempTablesStream =
new FileInputStream(tempTablesFile);
536 byte[] data =
new byte[(int) tempTablesFile.length()];
537 tempTablesStream.read(data);
538 tempTablesStream.close();
540 tempTablesJsonStr =
new String(data,
"UTF-8");
541 }
catch (java.io.FileNotFoundException e) {
542 throw new RuntimeException(
"Failed to read temporary tables file.");
545 Gson gson =
new Gson();
546 JsonObject fileParentObject = gson.fromJson(tempTablesJsonStr, JsonObject.class);
547 if (fileParentObject == null) {
548 throw new IOException(
"Malformed temporary tables file.");
551 JsonObject tableObject = fileParentObject.getAsJsonObject(tableName);
552 if (tableObject == null) {
553 throw new RuntimeException(
554 "Failed to find table " + tableName +
" in temporary tables file.");
557 String jsonTableName = tableObject.get(
"name").getAsString();
558 assert (tableName == jsonTableName);
559 int id = tableObject.get(
"id").getAsInt();
560 HEAVYDBLOGGER.debug(
"table id is " + id);
561 HEAVYDBLOGGER.debug(
"table name is " + tableName);
563 JsonArray jsonColumns = tableObject.getAsJsonArray(
"columns");
564 assert (jsonColumns != null);
566 int skip_physical_cols = 0;
567 for (JsonElement columnElement : jsonColumns) {
568 JsonObject columnObject = columnElement.getAsJsonObject();
570 String colName = columnObject.get(
"name").getAsString();
571 HEAVYDBLOGGER.debug(
"name = " + colName);
572 int colType = columnObject.get(
"coltype").getAsInt();
573 HEAVYDBLOGGER.debug(
"coltype = " + colType);
574 int colSubType = columnObject.get(
"colsubtype").getAsInt();
575 HEAVYDBLOGGER.debug(
"colsubtype = " + colSubType);
576 int compression = columnObject.get(
"compression").getAsInt();
577 HEAVYDBLOGGER.debug(
"compression = " + compression);
578 int compression_param = columnObject.get(
"comp_param").getAsInt();
579 HEAVYDBLOGGER.debug(
"comp_param = " + compression_param);
580 int size = columnObject.get(
"size").getAsInt();
581 HEAVYDBLOGGER.debug(
"size = " + size);
582 int colDim = columnObject.get(
"coldim").getAsInt();
583 HEAVYDBLOGGER.debug(
"coldim = " + colDim);
584 int colScale = columnObject.get(
"colscale").getAsInt();
585 HEAVYDBLOGGER.debug(
"colscale = " + colScale);
586 boolean isNotNull = columnObject.get(
"is_notnull").getAsBoolean();
587 HEAVYDBLOGGER.debug(
"is_notnull = " + isNotNull);
588 boolean isSystemCol = columnObject.get(
"is_systemcol").getAsBoolean();
589 HEAVYDBLOGGER.debug(
"is_systemcol = " + isSystemCol);
590 boolean isVirtualCol = columnObject.get(
"is_virtualcol").getAsBoolean();
591 HEAVYDBLOGGER.debug(
"is_vitrualcol = " + isVirtualCol);
592 boolean isDeletedCol = columnObject.get(
"is_deletedcol").getAsBoolean();
593 HEAVYDBLOGGER.debug(
"is_deletedcol = " + isDeletedCol);
594 HEAVYDBLOGGER.debug(
"");
597 HEAVYDBLOGGER.debug(
"Skipping delete column.");
601 TColumnType tct =
new TColumnType();
602 TTypeInfo tti =
new TTypeInfo();
609 tti.is_array =
false;
613 tti.nullable = !isNotNull;
615 tti.comp_param = compression_param;
618 tti.scale = colScale;
619 tti.precision = colDim;
621 tct.col_name = colName;
623 tct.is_system = isSystemCol;
626 if (
is_geometry(colType) || skip_physical_cols-- <= 0) td.addToRow_desc(tct);
633 Statement stmt = null;
637 stmt = catConn.createStatement();
638 rs = stmt.executeQuery(String.format(
639 "SELECT tableid FROM mapd_tables where name = '%s' COLLATE NOCASE;",
642 tableId = rs.getInt(
"tableid");
643 HEAVYDBLOGGER.debug(
"tableId = " + tableId);
644 HEAVYDBLOGGER.debug(
"");
648 }
catch (Exception e) {
649 String err =
"Error trying to read from metadata table mapd_tables;DB: "
651 HEAVYDBLOGGER.error(err);
652 throw new RuntimeException(err);
657 }
catch (SQLException ex) {
658 String err =
"Could not close resultset, error was " + ex.getMessage();
659 HEAVYDBLOGGER.error(err);
660 throw new RuntimeException(err);
666 }
catch (SQLException ex) {
667 String err =
"Could not close stmt, error was " + ex.getMessage();
668 HEAVYDBLOGGER.error(err);
669 throw new RuntimeException(err);
676 private boolean isView(String tableName) {
681 stmt = catConn.createStatement();
682 rs = stmt.executeQuery(String.format(
683 "SELECT isview FROM mapd_tables where name = '%s' COLLATE NOCASE;",
686 viewFlag = rs.getInt(
"isview");
687 HEAVYDBLOGGER.debug(
"viewFlag = " + viewFlag);
688 HEAVYDBLOGGER.debug(
"");
692 }
catch (Exception e) {
693 String err =
"error trying to read from mapd_views, error was " + e.getMessage();
694 HEAVYDBLOGGER.error(err);
695 throw new RuntimeException(err);
697 return (viewFlag == 1);
710 TProtocol protocol = null;
712 TTransport transport =
713 sock_transport_properties.openClientTransport(
"localhost",
dbPort);
714 if (!transport.isOpen()) transport.open();
715 protocol =
new TBinaryProtocol(transport);
717 Heavy.Client client =
new Heavy.Client(protocol);
718 TTableDetails td = client.get_table_details_for_database(
719 currentUser.getSession(), tableName,
default_db);
722 sqlText = td.getView_sql();
724 }
catch (TTransportException ex) {
725 HEAVYDBLOGGER.error(ex.toString());
726 throw new RuntimeException(ex.toString());
727 }
catch (TDBException ex) {
728 HEAVYDBLOGGER.error(ex.getError_msg());
729 throw new RuntimeException(ex.getError_msg());
730 }
catch (TException ex) {
731 HEAVYDBLOGGER.error(ex.toString());
732 throw new RuntimeException(ex.toString());
736 if (sqlText.charAt(sqlText.length() - 1) ==
';') {
737 return (sqlText.substring(0, sqlText.length() - 1));
749 stmt = catConn.createStatement();
750 rs = stmt.executeQuery(String.format(
751 "SELECT sql FROM mapd_views where tableid = '%s' COLLATE NOCASE;",
754 sqlText = rs.getString(
"sql");
755 HEAVYDBLOGGER.debug(
"View definition = " + sqlText);
756 HEAVYDBLOGGER.debug(
"");
760 }
catch (Exception e) {
761 String err =
"error trying to read from mapd_views, error was " + e.getMessage();
762 HEAVYDBLOGGER.error(err);
763 throw new RuntimeException(err);
765 if (sqlText == null || sqlText.length() == 0) {
766 String err =
"No view text found";
767 HEAVYDBLOGGER.error(err);
768 throw new RuntimeException(err);
776 return TDatumType.BOOL;
778 return TDatumType.TINYINT;
780 return TDatumType.SMALLINT;
782 return TDatumType.INT;
784 return TDatumType.BIGINT;
786 return TDatumType.FLOAT;
789 return TDatumType.DECIMAL;
791 return TDatumType.DOUBLE;
795 return TDatumType.STR;
797 return TDatumType.TIME;
799 return TDatumType.TIMESTAMP;
801 return TDatumType.DATE;
803 return TDatumType.INTERVAL_DAY_TIME;
805 return TDatumType.INTERVAL_YEAR_MONTH;
807 return TDatumType.POINT;
809 return TDatumType.MULTIPOINT;
811 return TDatumType.LINESTRING;
813 return TDatumType.MULTILINESTRING;
815 return TDatumType.POLYGON;
817 return TDatumType.MULTIPOLYGON;
826 return TEncodingType.NONE;
828 return TEncodingType.FIXED;
830 return TEncodingType.RL;
832 return TEncodingType.DIFF;
834 return TEncodingType.DICT;
836 return TEncodingType.SPARSE;
838 return TEncodingType.GEOINT;
840 return TEncodingType.DATE_IN_DAYS;
863 for (String dbName : dbNames) {
864 Set<String> ts =
new HashSet<String>();
865 DATABASE_TO_TABLES.putIfAbsent(dbName.toUpperCase(), ts);
871 TProtocol protocol = null;
872 TTransport transport =
873 sock_transport_properties.openClientTransport(
"localhost",
dbPort);
874 if (!transport.isOpen()) transport.open();
875 protocol =
new TBinaryProtocol(transport);
877 Heavy.Client client =
new Heavy.Client(protocol);
879 List<TDBInfo> dbList = client.get_databases(currentUser.getSession());
880 for (TDBInfo dbInfo : dbList) {
881 Set<String> ts =
new HashSet<String>();
882 DATABASE_TO_TABLES.putIfAbsent(dbInfo.db_name.toUpperCase(), ts);
886 }
catch (TTransportException ex) {
887 HEAVYDBLOGGER.error(
"TTransportException on port [" +
dbPort +
"]");
888 HEAVYDBLOGGER.error(ex.toString());
889 throw new RuntimeException(ex.toString());
890 }
catch (TDBException ex) {
891 HEAVYDBLOGGER.error(ex.getError_msg());
892 throw new RuntimeException(ex.getError_msg());
893 }
catch (TException ex) {
894 HEAVYDBLOGGER.error(ex.toString());
895 throw new RuntimeException(ex.toString());
900 Set<String> dbSet =
new HashSet<String>();
901 Statement stmt = null;
905 stmt = catConn.createStatement();
908 rs = stmt.executeQuery(
"SELECT name FROM mapd_databases ");
910 dbSet.add(rs.getString(
"name"));
912 HEAVYDBLOGGER.debug(
"Object name = " + rs.getString(
"name"));
917 }
catch (Exception e) {
918 String err =
"error trying to get all the databases, error was " + e.getMessage();
919 HEAVYDBLOGGER.error(err);
920 throw new RuntimeException(err);
928 if (table.equals(
"")) {
931 Set<List<String>> all =
new HashSet<>(DB_TABLE_DETAILS.keySet());
932 for (List<String> keys : all) {
933 if (keys.get(0).equals(schema.toUpperCase())) {
935 "removing all for schema " + keys.get(0) +
" table " + keys.get(1));
936 DB_TABLE_DETAILS.remove(keys);
940 HEAVYDBLOGGER.debug(
"removing schema " + schema.toUpperCase() +
" table "
941 + table.toUpperCase());
942 DB_TABLE_DETAILS.remove(
943 ImmutableList.of(schema.toUpperCase(), table.toUpperCase()));
946 Set<List<String>> all =
new HashSet<>(DB_TABLE_DETAILS.keySet());
947 for (List<String> keys : all) {
948 if (keys.get(0).equals(schema.toUpperCase())) {
949 Table ttable = DB_TABLE_DETAILS.get(keys);
952 "removing view in schema " + keys.get(0) +
" view " + keys.get(1));
953 DB_TABLE_DETAILS.remove(keys);
958 Set<String> mSet = DATABASE_TO_TABLES.get(schema.toUpperCase());
960 if (table.isEmpty()) {
962 HEAVYDBLOGGER.debug(
"removing schema " + schema.toUpperCase());
963 DATABASE_TO_TABLES.remove(schema.toUpperCase());
970 Set<String> ts =
new HashSet<String>();
971 DATABASE_TO_TABLES.putIfAbsent(schema.toUpperCase(), ts);