26 #include <boost/algorithm/string/predicate.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/range/adaptor/map.hpp>
29 #include <boost/version.hpp>
42 #if BOOST_VERSION >= 106600
43 #include <boost/uuid/detail/sha1.hpp>
45 #include <boost/uuid/sha1.hpp>
47 #include <rapidjson/document.h>
48 #include <rapidjson/istreamwrapper.h>
49 #include <rapidjson/ostreamwrapper.h>
50 #include <rapidjson/writer.h>
80 #include "MapDRelease.h"
92 using std::runtime_error;
111 namespace Catalog_Namespace {
137 "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
144 "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
145 "userid integer references mapd_users, state text, image_hash text, update_time "
147 "metadata text, UNIQUE(userid, name) )");
150 "insert into mapd_dashboards (id, name , "
151 "userid, state, image_hash, update_time , "
153 "SELECT viewid , name , userid, view_state, image_hash, update_time, "
155 "from mapd_frontend_views");
156 }
catch (
const std::exception& e) {
166 const std::string& db_name) {
168 db_name +
"_temp_tables.json");
178 std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
179 const std::vector<LeafHostInfo>& string_dict_hosts,
180 std::shared_ptr<Calcite> calcite,
182 : basePath_(basePath)
186 , string_dict_hosts_(string_dict_hosts)
187 , calciteMgr_(calcite)
190 , dcatalogMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
202 ,
dsqliteMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
215 CheckAndExecuteMigrations();
221 createDefaultServersIfNotExists();
224 CheckAndExecuteMigrationsPostBuildMaps();
229 conditionallyInitializeSystemObjects();
241 tableDescIt->second->fragmenter =
nullptr;
242 delete tableDescIt->second;
250 delete columnDescIt->second;
273 std::vector<std::string> cols;
277 if (std::find(cols.begin(), cols.end(), std::string(
"max_chunk_size")) ==
279 string queryString(
"ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
283 if (std::find(cols.begin(), cols.end(), std::string(
"shard_column_id")) ==
285 string queryString(
"ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
289 if (std::find(cols.begin(), cols.end(), std::string(
"shard")) == cols.end()) {
290 string queryString(
"ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
294 if (std::find(cols.begin(), cols.end(), std::string(
"num_shards")) == cols.end()) {
295 string queryString(
"ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
299 if (std::find(cols.begin(), cols.end(), std::string(
"key_metainfo")) == cols.end()) {
300 string queryString(
"ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
303 if (std::find(cols.begin(), cols.end(), std::string(
"userid")) == cols.end()) {
304 string queryString(
"ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
308 if (std::find(cols.begin(), cols.end(), std::string(
"sort_column_id")) ==
311 "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
313 if (std::find(cols.begin(), cols.end(), std::string(
"storage_type")) == cols.end()) {
314 string queryString(
"ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
317 if (std::find(cols.begin(), cols.end(), std::string(
"max_rollback_epochs")) ==
319 string queryString(
"ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
323 if (std::find(cols.begin(), cols.end(), std::string(
"is_system_table")) ==
325 string queryString(
"ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
328 }
catch (std::exception& e) {
340 "select name from sqlite_master WHERE type='table' AND "
341 "name='mapd_version_history'");
344 "CREATE TABLE mapd_version_history(version integer, migration_history text "
348 "select * from mapd_version_history where migration_history = "
349 "'notnull_fixlen_arrays'");
359 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
361 LOG(
INFO) <<
"Updating mapd_columns, legacy fixlen arrays";
363 string queryString(
"UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
366 }
catch (std::exception& e) {
378 "select name from sqlite_master WHERE type='table' AND "
379 "name='mapd_version_history'");
382 "CREATE TABLE mapd_version_history(version integer, migration_history text "
386 "select * from mapd_version_history where migration_history = "
387 "'notnull_geo_columns'");
397 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
399 LOG(
INFO) <<
"Updating mapd_columns, legacy geo columns";
408 }
catch (std::exception& e) {
421 "SELECT name FROM sqlite_master WHERE type='table' AND "
422 "name='mapd_frontend_views'");
430 std::vector<std::string> cols;
434 if (std::find(cols.begin(), cols.end(), std::string(
"image_hash")) == cols.end()) {
437 if (std::find(cols.begin(), cols.end(), std::string(
"update_time")) == cols.end()) {
440 if (std::find(cols.begin(), cols.end(), std::string(
"view_metadata")) == cols.end()) {
443 }
catch (std::exception& e) {
455 "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
456 "integer references mapd_users, "
457 "link text unique, view_state text, update_time timestamp, view_metadata text)");
459 std::vector<std::string> cols;
463 if (std::find(cols.begin(), cols.end(), std::string(
"view_metadata")) == cols.end()) {
466 }
catch (
const std::exception& e) {
480 "SELECT name FROM sqlite_master WHERE type='table' AND "
481 "name='mapd_frontend_views'");
489 "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
490 }
catch (
const std::exception& e) {
510 std::vector<std::string> cols;
514 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
515 LOG(
INFO) <<
"Updating mapd_tables updatePageSize";
520 string queryString(
"ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
524 }
catch (std::exception& e) {
536 std::vector<std::string> cols;
540 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
541 LOG(
INFO) <<
"Updating mapd_columns updateDeletedColumnIndicator";
543 string queryString(
"ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
549 "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
551 }
catch (std::exception& e) {
563 std::vector<std::string> cols;
567 if (std::find(cols.begin(), cols.end(), std::string(
"default_value")) == cols.end()) {
568 LOG(
INFO) <<
"Adding support for default values to mapd_columns";
571 }
catch (std::exception& e) {
573 LOG(
ERROR) <<
"Failed to make metadata update for default values` support";
591 std::vector<std::string> cols;
595 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
598 string dictQuery(
"SELECT dictid, name from mapd_dictionaries");
601 for (
size_t r = 0; r < numRows; ++r) {
611 int result = rename(oldName.c_str(), newName.c_str());
614 LOG(
INFO) <<
"Dictionary upgrade: successfully renamed " << oldName <<
" to "
617 LOG(
ERROR) <<
"Failed to rename old dictionary directory " << oldName <<
" to "
623 string queryString(
"ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
627 }
catch (std::exception& e) {
639 "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
640 "logical_table_id integer, physical_table_id integer)");
641 }
catch (
const std::exception& e) {
658 const auto physicalTables = physicalTableIt->second;
659 CHECK(!physicalTables.empty());
660 for (
size_t i = 0; i < physicalTables.size(); i++) {
661 int32_t physical_tb_id = physicalTables[i];
663 "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
664 "physical_table_id) VALUES (?1, ?2)",
669 }
catch (std::exception& e) {
681 std::vector<std::string> cols;
685 if (std::find(cols.begin(), cols.end(), std::string(
"refcount")) == cols.end()) {
688 }
catch (std::exception& e) {
701 }
catch (std::exception& e) {
714 "select name from sqlite_master WHERE type='table' AND "
715 "name='mapd_version_history'");
716 static const std::string migration_name{
"rename_legacy_data_wrappers"};
719 "CREATE TABLE mapd_version_history(version integer, migration_history text "
723 "select * from mapd_version_history where migration_history = "
725 migration_name +
"'");
732 LOG(
INFO) <<
"Executing " << migration_name <<
" migration.";
737 std::map<std::string, std::string> old_to_new_wrapper_names{
738 {
"OMNISCI_CSV", DataWrapperType::CSV},
739 {
"OMNISCI_PARQUET", DataWrapperType::PARQUET},
740 {
"OMNISCI_REGEX_PARSER", DataWrapperType::REGEX_PARSER},
741 {
"OMNISCI_INTERNAL_CATALOG", DataWrapperType::INTERNAL_CATALOG},
742 {
"INTERNAL_OMNISCI_MEMORY_STATS", DataWrapperType::INTERNAL_MEMORY_STATS},
743 {
"INTERNAL_OMNISCI_STORAGE_STATS", DataWrapperType::INTERNAL_STORAGE_STATS}
747 for (
const auto& [old_wrapper_name, new_wrapper_name] : old_to_new_wrapper_names) {
749 "UPDATE omnisci_foreign_servers SET data_wrapper_type = ? WHERE "
750 "data_wrapper_type = ?",
751 std::vector<std::string>{new_wrapper_name, old_wrapper_name});
756 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
758 LOG(
INFO) << migration_name <<
" migration completed.";
759 }
catch (std::exception& e) {
771 }
catch (
const std::exception& e) {
779 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
780 "omnisci_foreign_servers(id integer primary key, name text unique, " +
781 "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
786 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
787 "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
788 "options text, last_refresh_time integer, next_refresh_time integer, " +
789 "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
790 "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
794 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
795 "omnisci_custom_expressions(id integer primary key, name text, " +
796 "expression_json text, data_source_type text, " +
797 "data_source_id integer, is_deleted boolean)";
803 std::vector<DBObject> objects;
806 "SELECT name FROM sqlite_master WHERE type='table' AND "
807 "name='mapd_record_ownership_marker'");
834 "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
837 static const std::map<const DBObjectType, const AccessPrivileges>
838 object_level_all_privs_lookup{
848 auto _key_place = [&key](
auto type) {
852 for (
auto& it : object_level_all_privs_lookup) {
853 objects.emplace_back(_key_place(it.first), it.second, db.
dbOwner);
860 "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
863 for (
size_t r = 0; r < numRows; ++r) {
882 objects.push_back(obj);
888 string tableQuery(
"SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
891 for (
size_t r = 0; r < numRows; ++r) {
907 objects.push_back(obj);
910 }
catch (
const std::exception& e) {
920 }
catch (
const std::exception& e) {
921 LOG(
ERROR) <<
" Issue during migration of DB " <<
name() <<
" issue was " << e.what();
922 throw std::runtime_error(
" Issue during migration of DB " +
name() +
" issue was " +
940 std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
941 std::vector<std::string> dashboard_ids;
942 static const std::string migration_name{
"dashboard_roles_migration"};
950 "select * from mapd_version_history where migration_history = '" +
951 migration_name +
"'");
957 LOG(
INFO) <<
"Performing dashboard internal roles Migration.";
971 }
catch (
const std::exception& e) {
978 const auto active_grantees =
983 for (
auto dash : dashboards) {
988 auto result = active_grantees.find(dash.first);
989 if (
result != active_grantees.end()) {
999 "select * from mapd_version_history where migration_history = '" +
1000 migration_name +
"'");
1005 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
1007 }
catch (
const std::exception& e) {
1008 LOG(
ERROR) <<
"Failed to create dashboard system roles during migration: "
1012 LOG(
INFO) <<
"Successfully created dashboard system roles during migration.";
1047 std::map<int32_t, std::string> user_name_by_user_id;
1048 for (
const auto& user : users) {
1049 user_name_by_user_id[user.userId] = user.userName;
1051 return user_name_by_user_id;
1056 const std::map<int32_t, std::string>& user_name_by_user_id) {
1057 auto entry = user_name_by_user_id.find(
id);
1058 if (entry != user_name_by_user_id.end()) {
1059 return entry->second;
1068 if (column_type.is_dict_encoded_string() ||
1069 column_type.is_subtype_dict_encoded_string()) {
1076 std::string dictQuery(
1077 "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1080 for (
size_t r = 0; r < numRows; ++r) {
1090 dict_ref, dictName, dictNBits, is_shared, refcount, fname,
false);
1111 std::list<ColumnDescriptor*> original_cds;
1114 original_td = it1->second;
1115 if (dynamic_cast<foreign_storage::ForeignTable*>(original_td)) {
1126 CHECK_EQ(original_td, it2->second);
1133 for (
int column_id = 0; column_id < original_td->
nColumns; ++column_id) {
1137 original_cds.push_back(original_cd);
1145 td = createTableFromDiskUnlocked(table_id);
1146 }
catch (
const NoTableFoundException& e) {
1151 if (
auto tableDescIt = tableDescriptorMapById_.find(table_id);
1152 tableDescIt != tableDescriptorMapById_.end()) {
1153 tableDescIt->second->fragmenter =
nullptr;
1154 delete tableDescIt->second;
1158 auto cds = sqliteGetColumnsForTableUnlocked(table_id);
1162 td->mutex_ = original_td->mutex_;
1164 original_td =
nullptr;
1169 original_cds.clear();
1170 tableDescriptorMap_[
to_upper(td->tableName)] = td;
1171 tableDescriptorMapById_[td->tableId] = td;
1172 int32_t skip_physical_cols = 0;
1176 if (skip_physical_cols <= 0) {
1177 skip_physical_cols = cd->columnType.get_physical_cols();
1180 if (cd->isDeletedCol) {
1181 td->hasDeletedCol =
true;
1182 setDeletedColumnUnlocked(td, cd);
1183 }
else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1184 td->columnIdBySpi_.push_back(cd->columnId);
1189 calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
1193 void Catalog::reloadCatalogMetadata(
1194 const std::map<int32_t, std::string>& user_name_by_user_id) {
1201 void Catalog::reloadCatalogMetadataUnlocked(
1202 const std::map<int32_t, std::string>& user_name_by_user_id) {
1212 std::set<int> cluster_table_ids;
1213 std::string tableQuery(
"SELECT tableid from mapd_tables");
1214 sqliteConnector_.query(tableQuery);
1215 auto numRows = sqliteConnector_.getNumRows();
1216 for (
size_t r = 0; r < numRows; ++r) {
1217 const auto table_id = sqliteConnector_.getData<
int>(r, 0);
1218 cluster_table_ids.insert(table_id);
1223 std::set<int> ignored_table_ids;
1236 std::set<int> reload_table_ids;
1237 for (
auto const& cluster_table_id : cluster_table_ids) {
1238 if (ignored_table_ids.find(cluster_table_id) == ignored_table_ids.end()) {
1239 reload_table_ids.insert(cluster_table_id);
1242 for (
auto const& [cached_table_id, td] : tableDescriptorMapById_) {
1243 if (cluster_table_ids.find(cached_table_id) == cluster_table_ids.end()) {
1244 reload_table_ids.insert(cached_table_id);
1249 for (
auto const& reload_table_id : reload_table_ids) {
1250 reloadTableMetadataUnlocked(reload_table_id);
1255 dashboardDescriptorMap_.clear();
1256 linkDescriptorMap_.clear();
1257 linkDescriptorMapById_.clear();
1258 foreignServerMap_.clear();
1259 foreignServerMapById_.clear();
1260 custom_expr_map_by_id_.clear();
1263 buildForeignServerMapUnlocked();
1266 updateViewsInMapUnlocked();
1267 buildDashboardsMapUnlocked(user_name_by_user_id);
1268 buildLinksMapUnlocked();
1269 buildCustomExpressionsMapUnlocked();
1273 calciteMgr_->updateMetadata(currentDB_.dbName, {});
1277 void Catalog::buildTablesMapUnlocked() {
1278 std::string tableQuery(
1279 "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1280 "max_chunk_size, frag_page_size, "
1281 "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
1282 "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
1283 "from mapd_tables");
1284 sqliteConnector_.query(tableQuery);
1285 auto numRows = sqliteConnector_.getNumRows();
1286 for (
size_t r = 0; r < numRows; ++r) {
1288 const auto& storage_type = sqliteConnector_.getData<
string>(r, 17);
1290 const auto table_id = sqliteConnector_.getData<
int>(r, 0);
1291 const auto& table_name = sqliteConnector_.getData<
string>(r, 1);
1292 LOG(
FATAL) <<
"Unable to read Catalog metadata: storage type is currently not a "
1293 "supported table option (table "
1294 << table_name <<
" [" << table_id <<
"] in database "
1295 << currentDB_.dbName <<
").";
1305 td->
tableId = sqliteConnector_.getData<
int>(r, 0);
1306 td->
tableName = sqliteConnector_.getData<
string>(r, 1);
1307 td->
nColumns = sqliteConnector_.getData<
int>(r, 2);
1308 td->
isView = sqliteConnector_.getData<
bool>(r, 3);
1309 td->
fragments = sqliteConnector_.getData<
string>(r, 4);
1312 td->
maxFragRows = sqliteConnector_.getData<
int>(r, 6);
1313 td->
maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1314 td->
fragPageSize = sqliteConnector_.getData<
int>(r, 8);
1315 td->
maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1316 td->
partitions = sqliteConnector_.getData<
string>(r, 10);
1318 td->
shard = sqliteConnector_.getData<
int>(r, 12);
1319 td->
nShards = sqliteConnector_.getData<
int>(r, 13);
1320 td->
keyMetainfo = sqliteConnector_.getData<
string>(r, 14);
1321 td->
userId = sqliteConnector_.getData<
int>(r, 15);
1323 sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<
int>(r, 16);
1332 tableDescriptorMapById_[td->
tableId] = td;
1336 void Catalog::buildColumnsMapUnlocked() {
1337 std::string columnQuery(
1338 "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1339 "is_notnull, compression, comp_param, "
1340 "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1341 "default_value from "
1342 "mapd_columns ORDER BY tableid, "
1344 sqliteConnector_.query(columnQuery);
1345 auto numRows = sqliteConnector_.getNumRows();
1346 int32_t skip_physical_cols = 0;
1347 for (
size_t r = 0; r < numRows; ++r) {
1349 cd->
tableId = sqliteConnector_.getData<
int>(r, 0);
1350 cd->
columnId = sqliteConnector_.getData<
int>(r, 1);
1351 cd->
columnName = sqliteConnector_.getData<
string>(r, 2);
1360 cd->
chunks = sqliteConnector_.getData<
string>(r, 11);
1361 cd->
isSystemCol = sqliteConnector_.getData<
bool>(r, 12);
1362 cd->
isVirtualCol = sqliteConnector_.getData<
bool>(r, 13);
1363 cd->
virtualExpr = sqliteConnector_.getData<
string>(r, 14);
1364 cd->
isDeletedCol = sqliteConnector_.getData<
bool>(r, 15);
1365 if (sqliteConnector_.isNull(r, 16)) {
1368 cd->
default_value = std::make_optional(sqliteConnector_.getData<
string>(r, 16));
1371 cd->
db_id = getDatabaseId();
1375 if (skip_physical_cols <= 0) {
1379 auto td_itr = tableDescriptorMapById_.find(cd->
tableId);
1380 CHECK(td_itr != tableDescriptorMapById_.end());
1383 td_itr->second->hasDeletedCol =
true;
1384 setDeletedColumnUnlocked(td_itr->second, cd);
1386 tableDescriptorMapById_[cd->
tableId]->columnIdBySpi_.push_back(cd->
columnId);
1391 for (
auto& tit : tableDescriptorMapById_) {
1392 std::sort(tit.second->columnIdBySpi_.begin(),
1393 tit.second->columnIdBySpi_.end(),
1394 [](
const size_t a,
const size_t b) ->
bool {
return a < b; });
1399 std::string viewQuery(
"SELECT sql FROM mapd_views where tableid = " +
1401 sqliteConnector_.query(viewQuery);
1402 auto num_rows = sqliteConnector_.getNumRows();
1403 CHECK_EQ(num_rows, 1U) <<
"Expected single entry in mapd_views for view '"
1404 << td.
tableName <<
"', instead got " << num_rows;
1405 td.
viewSQL = sqliteConnector_.getData<
string>(0, 0);
1408 void Catalog::updateViewsInMapUnlocked() {
1409 std::string viewQuery(
"SELECT tableid, sql FROM mapd_views");
1410 sqliteConnector_.query(viewQuery);
1411 auto numRows = sqliteConnector_.getNumRows();
1412 for (
size_t r = 0; r < numRows; ++r) {
1413 auto tableId = sqliteConnector_.getData<
int>(r, 0);
1414 auto td = tableDescriptorMapById_[tableId];
1415 td->viewSQL = sqliteConnector_.getData<
string>(r, 1);
1416 td->fragmenter =
nullptr;
1420 void Catalog::buildDashboardsMapUnlocked(
1421 const std::map<int32_t, std::string>& user_name_by_user_id) {
1422 std::string frontendViewQuery(
1423 "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1426 "FROM mapd_dashboards");
1427 sqliteConnector_.query(frontendViewQuery);
1428 auto numRows = sqliteConnector_.getNumRows();
1429 for (
size_t r = 0; r < numRows; ++r) {
1430 auto vd = std::make_shared<DashboardDescriptor>();
1431 vd->dashboardId = sqliteConnector_.getData<
int>(r, 0);
1432 vd->dashboardState = sqliteConnector_.getData<
string>(r, 1);
1433 vd->dashboardName = sqliteConnector_.getData<
string>(r, 2);
1434 vd->imageHash = sqliteConnector_.getData<
string>(r, 3);
1435 vd->updateTime = sqliteConnector_.getData<
string>(r, 4);
1436 vd->userId = sqliteConnector_.getData<
int>(r, 5);
1437 vd->dashboardMetadata = sqliteConnector_.getData<
string>(r, 6);
1440 std::to_string(currentDB_.dbId), sqliteConnector_.getData<
string>(r, 0));
1441 dashboardDescriptorMap_[
std::to_string(vd->userId) +
":" + vd->dashboardName] = vd;
1445 void Catalog::buildLinksMapUnlocked() {
1446 std::string linkQuery(
1447 "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1448 "update_time), view_metadata "
1450 sqliteConnector_.query(linkQuery);
1451 auto numRows = sqliteConnector_.getNumRows();
1452 for (
size_t r = 0; r < numRows; ++r) {
1454 ld->linkId = sqliteConnector_.getData<
int>(r, 0);
1455 ld->userId = sqliteConnector_.getData<
int>(r, 1);
1456 ld->link = sqliteConnector_.getData<
string>(r, 2);
1457 ld->viewState = sqliteConnector_.getData<
string>(r, 3);
1458 ld->updateTime = sqliteConnector_.getData<
string>(r, 4);
1459 ld->viewMetadata = sqliteConnector_.getData<
string>(r, 5);
1460 linkDescriptorMap_[
std::to_string(currentDB_.dbId) + ld->link] = ld;
1461 linkDescriptorMapById_[ld->linkId] = ld;
1465 void Catalog::buildLogicalToPhysicalMapUnlocked() {
1467 std::string logicalToPhysicalTableMapQuery(
1468 "SELECT logical_table_id, physical_table_id "
1469 "FROM mapd_logical_to_physical");
1470 sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1471 auto numRows = sqliteConnector_.getNumRows();
1472 for (
size_t r = 0; r < numRows; ++r) {
1473 auto logical_tb_id = sqliteConnector_.getData<
int>(r, 0);
1474 auto physical_tb_id = sqliteConnector_.getData<
int>(r, 1);
1475 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1476 if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1478 std::vector<int32_t> physicalTables{physical_tb_id};
1480 logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1481 CHECK(it_ok.second);
1484 physicalTableIt->second.push_back(physical_tb_id);
1492 void Catalog::buildMaps() {
1501 buildDictionaryMapUnlocked();
1502 buildTablesMapUnlocked();
1505 buildForeignServerMapUnlocked();
1506 updateForeignTablesInMapUnlocked();
1509 buildColumnsMapUnlocked();
1510 updateViewsInMapUnlocked();
1511 buildDashboardsMapUnlocked(user_name_by_user_id);
1512 buildLinksMapUnlocked();
1513 buildLogicalToPhysicalMapUnlocked();
1514 buildCustomExpressionsMapUnlocked();
1517 void Catalog::buildCustomExpressionsMapUnlocked() {
1518 sqliteConnector_.query(
1519 "SELECT id, name, expression_json, data_source_type, data_source_id, "
1521 "FROM omnisci_custom_expressions");
1522 auto num_rows = sqliteConnector_.getNumRows();
1523 for (
size_t row = 0; row < num_rows; row++) {
1524 auto custom_expr = getCustomExpressionFromConnector(row);
1525 custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1529 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(
size_t row) {
1530 auto id = sqliteConnector_.getData<
int>(row, 0);
1531 auto name = sqliteConnector_.getData<
string>(row, 1);
1532 auto expression_json = sqliteConnector_.getData<
string>(row, 2);
1533 auto data_source_type_str = sqliteConnector_.getData<
string>(row, 3);
1534 auto data_source_id = sqliteConnector_.getData<
int>(row, 4);
1535 auto is_deleted = sqliteConnector_.getData<
bool>(row, 5);
1536 return std::make_unique<CustomExpression>(
1540 CustomExpression::dataSourceTypeFromString(data_source_type_str),
1546 const list<ColumnDescriptor>& columns,
1547 const list<DictDescriptor>& dicts) {
1552 if (foreign_table) {
1554 *new_foreign_table = *foreign_table;
1555 new_td = new_foreign_table;
1561 new_td->
mutex_ = std::make_shared<std::mutex>();
1563 tableDescriptorMapById_[td->
tableId] = new_td;
1564 for (
auto cd : columns) {
1567 addToColumnMap(new_cd);
1570 if (cd.isDeletedCol) {
1572 setDeletedColumnUnlocked(new_td, new_cd);
1578 [](
const size_t a,
const size_t b) ->
bool {
return a < b; });
1582 std::unique_ptr<StringDictionaryClient> client;
1583 DictRef dict_ref(currentDB_.dbId, -1);
1584 if (!string_dict_hosts_.empty()) {
1587 for (
auto dd : dicts) {
1588 if (!dd.dictRef.dictId) {
1592 dict_ref.
dictId = dd.dictRef.dictId;
1594 client->create(dict_ref, dd.dictIsTemp);
1597 dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1598 if (!dd.dictIsTemp) {
1604 void Catalog::removeTableFromMap(
const string& tableName,
1606 const bool is_on_error) {
1608 TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1609 if (tableDescIt == tableDescriptorMapById_.end()) {
1616 const auto ret = deletedColumnPerTable_.erase(td);
1620 tableDescriptorMapById_.erase(tableDescIt);
1621 tableDescriptorMap_.erase(
to_upper(tableName));
1623 dict_columns_by_table_id_.erase(tableId);
1628 std::unique_ptr<StringDictionaryClient> client;
1630 CHECK(!string_dict_hosts_.empty());
1631 DictRef dict_ref(currentDB_.dbId, -1);
1637 for (
auto cit = columnDescriptorMapById_.begin();
1638 cit != columnDescriptorMapById_.end();) {
1639 if (tableId != std::get<0>(cit->first)) {
1642 int i = std::get<1>(cit++->first);
1644 ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1646 columnDescriptorMapById_.erase(colDescIt);
1648 columnDescriptorMap_.erase(cnameKey);
1654 DictRef dict_ref(currentDB_.dbId, dictId);
1655 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1660 CHECK(dictIt != dictDescriptorMapByRef_.end());
1662 if (dictIt == dictDescriptorMapByRef_.end()) {
1666 const auto& dd = dictIt->second;
1669 if (!dd->refcount) {
1670 dd->stringDict.reset();
1675 client->drop(dict_ref);
1677 dictDescriptorMapByRef_.erase(dictIt);
1688 addFrontendViewToMapNoLock(vd);
1694 std::make_shared<DashboardDescriptor>(vd);
1697 std::vector<DBObject> Catalog::parseDashboardObjects(
const std::string& view_meta,
1698 const int& user_id) {
1699 std::vector<DBObject> objects;
1701 key.
dbId = currentDB_.dbId;
1702 auto _key_place = [&key](
auto type,
auto id) {
1708 auto td = getMetadataForTable(object_name,
false);
1712 LOG(
INFO) <<
"Ignoring dashboard source Table/View: " << object_name
1713 <<
" no longer exists in current DB.";
1720 objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1722 objects.back().setName(td->tableName);
1727 void Catalog::createOrUpdateDashboardSystemRole(
const std::string& view_meta,
1728 const int32_t& user_id,
1729 const std::string& dash_role_name) {
1730 auto objects = parseDashboardObjects(view_meta, user_id);
1737 dash_role_name,
false,
false);
1742 std::set<DBObjectKey> revoke_keys;
1744 for (
auto key : *ex_objects | boost::adaptors::map_keys) {
1750 for (
auto obj : objects) {
1751 found = key == obj.getObjectKey() ?
true :
false;
1757 revoke_keys.insert(key);
1760 for (
auto& key : revoke_keys) {
1778 linkDescriptorMapById_[ld.
linkId] = new_ld;
1786 vector<Chunk> chunkVec;
1787 auto columnDescs = getAllColumnMetadataForTable(td->
tableId,
true,
false,
true);
1788 Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1791 td->
fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1803 td->
fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1817 LOG(
INFO) <<
"Instantiating Fragmenter for table " << td->
tableName <<
" took "
1822 const std::string& tableName)
const {
1823 auto tableDescIt = tableDescriptorMap_.find(
to_upper(tableName));
1824 if (tableDescIt == tableDescriptorMap_.end()) {
1831 const std::string& tableName)
const {
1833 return getForeignTableUnlocked(tableName);
1836 const TableDescriptor* Catalog::getMetadataForTable(
const string& tableName,
1837 const bool populateFragmenter)
const {
1841 auto td = getMutableMetadataForTableUnlocked(tableName);
1846 if (populateFragmenter) {
1847 std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1848 if (td->fragmenter ==
nullptr && !td->isView) {
1849 instantiateFragmenter(td);
1856 bool populateFragmenter)
const {
1858 auto td = getMutableMetadataForTableUnlocked(table_id);
1863 if (populateFragmenter) {
1864 std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1865 if (td->fragmenter ==
nullptr && !td->isView) {
1866 instantiateFragmenter(td);
1872 std::optional<std::string> Catalog::getTableName(int32_t table_id)
const {
1874 auto td = getMutableMetadataForTableUnlocked(table_id);
1878 return td->tableName;
1881 std::optional<int32_t> Catalog::getTableId(
const std::string& table_name)
const {
1883 auto td = getMutableMetadataForTableUnlocked(table_name);
1891 const std::string& table_name)
const {
1892 auto it = tableDescriptorMap_.find(
to_upper(table_name));
1893 if (it == tableDescriptorMap_.end()) {
1900 auto tableDescIt = tableDescriptorMapById_.find(table_id);
1901 if (tableDescIt == tableDescriptorMapById_.end()) {
1904 return tableDescIt->second;
1908 const bool load_dict)
const {
1910 const DictRef dictRef(currentDB_.dbId, dict_id);
1911 auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1913 dictDescriptorMapByRef_.end()) {
1916 auto& dd = dictDescIt->second;
1920 if (!dd->stringDict) {
1922 if (string_dict_hosts_.empty()) {
1923 if (dd->dictIsTemp) {
1924 dd->stringDict = std::make_shared<StringDictionary>(
1927 dd->stringDict = std::make_shared<StringDictionary>(
1932 std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1935 LOG(
INFO) <<
"Time to load Dictionary " << dd->dictRef.dbId <<
"_"
1936 << dd->dictRef.dictId <<
" was " << time_ms <<
"ms";
1943 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts()
const {
1944 return string_dict_hosts_;
1948 const string& columnName)
const {
1952 auto colDescIt = columnDescriptorMap_.find(columnKey);
1954 columnDescriptorMap_.end()) {
1957 return colDescIt->second;
1963 auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1964 if (colDescIt == columnDescriptorMapById_
1968 return colDescIt->second;
1971 const std::optional<std::string> Catalog::getColumnName(
int table_id,
1972 int column_id)
const {
1974 auto it = columnDescriptorMapById_.find(
ColumnIdKey{table_id, column_id});
1975 if (it == columnDescriptorMapById_.end()) {
1978 return it->second->columnName;
1981 const int Catalog::getColumnIdBySpiUnlocked(
const int table_id,
const size_t spi)
const {
1982 const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1983 CHECK(tableDescriptorMapById_.end() != tabDescIt);
1984 const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1994 CHECK(0 < spx && spx <= columnIdBySpi.size())
1995 <<
"spx = " << spx <<
", size = " << columnIdBySpi.size();
1996 return columnIdBySpi[spx - 1] + phi;
1999 const int Catalog::getColumnIdBySpi(
const int table_id,
const size_t spi)
const {
2001 return getColumnIdBySpiUnlocked(table_id, spi);
2005 const size_t spi)
const {
2008 const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
2010 const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2011 return columnDescriptorMapById_.end() == colDescIt ?
nullptr : colDescIt->second;
2014 void Catalog::deleteMetadataForDashboards(
const std::vector<int32_t> dashboard_ids,
2016 std::stringstream invalid_ids, restricted_ids;
2018 for (int32_t dashboard_id : dashboard_ids) {
2019 if (!getMetadataForDashboard(dashboard_id)) {
2020 invalid_ids << (!invalid_ids.str().empty() ?
", " :
"") << dashboard_id;
2024 object.loadKey(*
this);
2026 std::vector<DBObject> privs = {
object};
2028 restricted_ids << (!restricted_ids.str().empty() ?
", " :
"") << dashboard_id;
2032 if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
2033 std::stringstream error_message;
2034 error_message <<
"Delete dashboard(s) failed with error(s):";
2035 if (invalid_ids.str().size() > 0) {
2036 error_message <<
"\nDashboard id: " << invalid_ids.str()
2037 <<
" - Dashboard id does not exist";
2039 if (restricted_ids.str().size() > 0) {
2041 <<
"\nDashboard id: " << restricted_ids.str()
2042 <<
" - User should be either owner of dashboard or super user to delete it";
2044 throw std::runtime_error(error_message.str());
2046 std::vector<DBObject> dash_objs;
2048 for (int32_t dashboard_id : dashboard_ids) {
2057 sqliteConnector_.query(
"BEGIN TRANSACTION");
2059 for (int32_t dashboard_id : dashboard_ids) {
2060 auto dash = getMetadataForDashboard(dashboard_id);
2064 throw std::runtime_error(
2065 std::string(
"Delete dashboard(s) failed with error(s):\nDashboard id: ") +
2066 std::to_string(dashboard_id) +
" - Dashboard id does not exist ");
2069 std::string dash_name = dash->dashboardName;
2070 auto viewDescIt = dashboardDescriptorMap_.find(user_id +
":" + dash_name);
2071 dashboardDescriptorMap_.erase(viewDescIt);
2072 sqliteConnector_.query_with_text_params(
2073 "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
2074 std::vector<std::string>{dash_name, user_id});
2076 }
catch (std::exception& e) {
2077 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2080 sqliteConnector_.query(
"END TRANSACTION");
2085 const string& userId,
2086 const string& dashName)
const {
2089 auto viewDescIt = dashboardDescriptorMap_.find(userId +
":" + dashName);
2090 if (viewDescIt == dashboardDescriptorMap_.end()) {
2093 return viewDescIt->second.get();
2102 for (
auto descp : dashboardDescriptorMap_) {
2103 auto dash = descp.second.get();
2104 if (dash->dashboardId ==
id) {
2106 name = dash->dashboardName;
2113 return getMetadataForDashboard(userId, name);
2118 const LinkDescriptor* Catalog::getMetadataForLink(
const string& link)
const {
2120 auto linkDescIt = linkDescriptorMap_.find(link);
2121 if (linkDescIt == linkDescriptorMap_.end()) {
2124 return linkDescIt->second;
2129 auto linkDescIt = linkDescriptorMapById_.find(linkId);
2130 if (linkDescIt == linkDescriptorMapById_.end()) {
2133 return linkDescIt->second;
2138 const auto table = getMutableMetadataForTableUnlocked(table_id);
2141 CHECK(foreign_table);
2142 return foreign_table;
2145 void Catalog::getAllColumnMetadataForTableImpl(
2147 list<const ColumnDescriptor*>& columnDescriptors,
2148 const bool fetchSystemColumns,
2149 const bool fetchVirtualColumns,
2150 const bool fetchPhysicalColumns)
const {
2151 int32_t skip_physical_cols = 0;
2152 for (
const auto& columnDescriptor : columnDescriptorMapById_) {
2153 if (!fetchPhysicalColumns && skip_physical_cols > 0) {
2154 --skip_physical_cols;
2157 auto cd = columnDescriptor.second;
2158 if (cd->tableId != td->
tableId) {
2161 if (!fetchSystemColumns && cd->isSystemCol) {
2164 if (!fetchVirtualColumns && cd->isVirtualCol) {
2167 if (!fetchPhysicalColumns) {
2168 const auto& col_ti = cd->columnType;
2169 skip_physical_cols = col_ti.get_physical_cols();
2171 columnDescriptors.push_back(cd);
2175 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
2177 const bool fetchSystemColumns,
2178 const bool fetchVirtualColumns,
2179 const bool fetchPhysicalColumns)
const {
2181 std::list<const ColumnDescriptor*> columnDescriptors;
2182 const TableDescriptor* td = getMutableMetadataForTableUnlocked(tableId);
2183 getAllColumnMetadataForTableImpl(td,
2186 fetchVirtualColumns,
2187 fetchPhysicalColumns);
2188 return columnDescriptors;
2191 list<const TableDescriptor*> Catalog::getAllTableMetadata()
const {
2193 list<const TableDescriptor*> table_list;
2194 for (
auto p : tableDescriptorMapById_) {
2195 table_list.push_back(p.second);
2200 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy()
const {
2202 std::vector<TableDescriptor>
tables;
2203 tables.reserve(tableDescriptorMapById_.size());
2204 for (
auto table_entry : tableDescriptorMapById_) {
2205 tables.emplace_back(*table_entry.second);
2206 tables.back().fragmenter =
nullptr;
2211 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata()
const {
2213 list<const DashboardDescriptor*> dashboards;
2214 for (
auto dashboard_entry : dashboardDescriptorMap_) {
2215 dashboards.push_back(dashboard_entry.second.get());
2220 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataForSysTable()
const {
2222 std::vector<DashboardDescriptor> dashboards;
2223 dashboards.reserve(dashboardDescriptorMap_.size());
2224 for (
auto dashboard_entry : dashboardDescriptorMap_) {
2225 const auto& cat_dashboard = dashboard_entry.second;
2226 dashboards.emplace_back();
2227 auto& dashboard = dashboards.back();
2228 dashboard.dashboardId = cat_dashboard->dashboardId;
2229 dashboard.dashboardName = cat_dashboard->dashboardName;
2230 dashboard.userId = cat_dashboard->userId;
2231 dashboard.updateTime = cat_dashboard->updateTime;
2232 dashboard.dashboardMetadata = cat_dashboard->dashboardMetadata;
2241 sqliteConnector_.query(
"BEGIN TRANSACTION");
2243 ref = addDictionaryNontransactional(cd);
2244 }
catch (std::exception& e) {
2245 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2248 sqliteConnector_.query(
"END TRANSACTION");
2254 const auto& td = *tableDescriptorMapById_[cd.
tableId];
2255 list<DictDescriptor> dds;
2256 setColumnDictionary(cd, dds, td,
true);
2257 auto& dd = dds.back();
2258 CHECK(dd.dictRef.dictId);
2260 std::unique_ptr<StringDictionaryClient> client;
2261 if (!string_dict_hosts_.empty()) {
2263 string_dict_hosts_.front(),
DictRef(currentDB_.dbId, -1),
true));
2266 client->create(dd.dictRef, dd.dictIsTemp);
2270 dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
2271 if (!dd.dictIsTemp) {
2280 sqliteConnector_.query(
"BEGIN TRANSACTION");
2282 delDictionaryNontransactional(cd);
2283 }
catch (std::exception& e) {
2284 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2287 sqliteConnector_.query(
"END TRANSACTION");
2302 const auto td = getMetadataForTable(cd.
tableId,
false);
2304 sqliteConnector_.query_with_text_param(
2305 "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
2307 sqliteConnector_.query_with_text_param(
2308 "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?",
std::to_string(dictId));
2309 const auto refcount = sqliteConnector_.getData<
int>(0, 0);
2310 VLOG(3) <<
"Dictionary " << dictId <<
"from dropped table has reference count "
2315 const DictRef dictRef(currentDB_.dbId, dictId);
2316 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_dictionaries WHERE dictid = ?",
2322 std::unique_ptr<StringDictionaryClient> client;
2323 if (!string_dict_hosts_.empty()) {
2327 client->drop(dictRef);
2330 dictDescriptorMapByRef_.erase(dictRef);
2333 std::list<const DictDescriptor*> Catalog::getAllDictionariesWithColumnInName(
2336 std::list<const DictDescriptor*> dds;
2338 auto table_name_opt = getTableName(cd->
tableId);
2339 CHECK(table_name_opt.has_value());
2340 auto table_name = table_name_opt.value();
2342 for (
const auto& [dkey, dd] : dictDescriptorMapByRef_) {
2343 if (dd->dictName.find(table_name +
"_" + cd->
columnName +
"_dict") !=
2344 std::string::npos) {
2345 dds.push_back(dd.get());
2353 std::map<int, StringDictionary*>& stringDicts) {
2356 CHECK(cit != columnDescriptorMap_.end());
2357 auto& ccd = *cit->second;
2359 if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
2365 if (!(ccd.columnType.get_comp_param() > 0)) {
2369 auto dictId = ccd.columnType.get_comp_param();
2370 getMetadataForDict(dictId);
2372 const DictRef dictRef(currentDB_.dbId, dictId);
2373 auto dit = dictDescriptorMapByRef_.find(dictRef);
2374 CHECK(dit != dictDescriptorMapByRef_.end());
2376 CHECK(dit->second.get()->stringDict);
2377 stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
2380 size_t Catalog::getTotalMemorySizeForDictionariesForDatabase()
const {
2382 for (
auto const& kv : dictDescriptorMapByRef_) {
2383 if (kv.first.dbId == currentDB_.dbId) {
2384 auto dictionary = kv.second.get()->stringDict.get();
2386 ret += dictionary->computeCacheSize();
2397 sqliteConnector_.query(
"BEGIN TRANSACTION");
2399 const auto table_id = cd.
tableId;
2401 auto catalog_cd = getMetadataForColumn(table_id, cd.
columnId);
2403 CHECK(catalog_cd) <<
" can not alter non existing column";
2406 std::vector<BindType> types(11, BindType::TEXT);
2408 types[8] = BindType::NULL_TYPE;
2410 sqliteConnector_.query_with_text_params(
2411 "UPDATE mapd_columns SET "
2420 "default_value = ? "
2421 "WHERE tableid = ? and columnid = ?",
2438 ColumnDescriptorMap::iterator columnDescIt =
2440 CHECK(columnDescIt != columnDescriptorMap_.end());
2441 auto ocd = columnDescIt->second;
2443 updateInColumnMap(ncd, ocd);
2444 }
catch (std::exception& e) {
2445 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2448 sqliteConnector_.query(
"END TRANSACTION");
2453 sqliteConnector_.query_with_text_params(
2454 "SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?",
2456 return sqliteConnector_.getData<
int>(0, 0);
2462 sqliteConnector_.query(
"BEGIN TRANSACTION");
2464 addColumnNontransactional(td, cd);
2465 }
catch (std::exception& e) {
2466 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2469 sqliteConnector_.query(
"END TRANSACTION");
2476 cd.
db_id = getDatabaseId();
2478 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2480 addColumnNontransactional(*shard, shard_cd);
2484 addDictionaryNontransactional(cd);
2488 std::vector<BindType> types(17, BindType::TEXT);
2490 types[16] = BindType::NULL_TYPE;
2492 sqliteConnector_.query_with_text_params(
2493 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2494 "colscale, is_notnull, "
2495 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2496 "is_deletedcol, default_value) "
2498 "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2501 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2521 sqliteConnector_.query_with_text_params(
2522 "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2525 sqliteConnector_.query_with_text_params(
2526 "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2528 cd.
columnId = sqliteConnector_.getData<
int>(0, 0);
2530 ++tableDescriptorMapById_[td.
tableId]->nColumns;
2532 addToColumnMap(ncd);
2533 addColumnDescriptor(ncd);
2534 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
2541 cd.
db_id = getDatabaseId();
2543 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2545 addColumn(*shard, shard_cd);
2549 addDictionaryNontransactional(cd);
2553 std::vector<BindType> types(17, BindType::TEXT);
2555 types[16] = BindType::NULL_TYPE;
2557 sqliteConnector_.query_with_text_params(
2558 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2559 "colscale, is_notnull, "
2560 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2561 "is_deletedcol, default_value) "
2563 "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2566 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2586 sqliteConnector_.query_with_text_params(
2587 "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2590 sqliteConnector_.query_with_text_params(
2591 "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2593 cd.
columnId = sqliteConnector_.getData<
int>(0, 0);
2595 ++tableDescriptorMapById_[td.
tableId]->nColumns;
2597 addToColumnMap(ncd);
2598 columnDescriptorsForRoll.emplace_back(
nullptr, ncd);
2603 dropColumnPolicies(td, cd);
2607 sqliteConnector_.query(
"BEGIN TRANSACTION");
2609 dropColumnNontransactional(td, cd);
2610 }
catch (std::exception& e) {
2611 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2614 sqliteConnector_.query(
"END TRANSACTION");
2622 sqliteConnector_.query_with_text_params(
2623 "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2626 sqliteConnector_.query_with_text_params(
2627 "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2630 ColumnDescriptorMap::iterator columnDescIt =
2632 CHECK(columnDescIt != columnDescriptorMap_.end());
2634 auto ocd = columnDescIt->second;
2635 removeFromColumnMap(ocd);
2636 --tableDescriptorMapById_[td.
tableId]->nColumns;
2637 removeColumnDescriptor(ocd);
2638 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
2642 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2643 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2645 dropColumnNontransactional(*shard, *shard_cd);
2654 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2655 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2657 dropColumnPolicies(*shard, *shard_cd);
2668 sqliteConnector_.query_with_text_params(
2669 "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2673 sqliteConnector_.query_with_text_params(
2674 "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2677 ColumnDescriptorMap::iterator columnDescIt =
2679 CHECK(columnDescIt != columnDescriptorMap_.end());
2681 columnDescriptorsForRoll.emplace_back(columnDescIt->second,
nullptr);
2682 removeFromColumnMap(columnDescIt->second);
2683 --tableDescriptorMapById_[td.
tableId]->nColumns;
2688 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2689 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2691 dropColumn(*shard, *shard_cd);
2701 auto tabDescIt = tableDescriptorMapById_.find(cd->
tableId);
2702 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2703 auto td = tabDescIt->second;
2704 auto& cd_by_spi = td->columnIdBySpi_;
2705 cd_by_spi.erase(std::remove(cd_by_spi.begin(), cd_by_spi.end(), cd->
columnId),
2708 std::sort(cd_by_spi.begin(), cd_by_spi.end());
2716 auto tabDescIt = tableDescriptorMapById_.find(cd->
tableId);
2717 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2718 auto td = tabDescIt->second;
2719 auto& cd_by_spi = td->columnIdBySpi_;
2721 if (cd_by_spi.end() == std::find(cd_by_spi.begin(), cd_by_spi.end(), cd->
columnId)) {
2724 std::sort(cd_by_spi.begin(), cd_by_spi.end());
2728 void Catalog::rollLegacy(
const bool forward) {
2730 std::set<const TableDescriptor*> tds;
2732 for (
const auto& cdr : columnDescriptorsForRoll) {
2733 auto ocd = cdr.first;
2734 auto ncd = cdr.second;
2736 auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2737 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2738 auto td = tabDescIt->second;
2739 auto& vc = td->columnIdBySpi_;
2742 if (
nullptr == ncd ||
2743 ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2744 delDictionaryNontransactional(*ocd);
2747 vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2753 if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2754 if (!ncd->isGeoPhyCol) {
2755 vc.push_back(ncd->columnId);
2762 addToColumnMap(ocd);
2766 removeFromColumnMap(ncd);
2767 if (
nullptr == ocd ||
2768 ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2769 delDictionaryNontransactional(*ncd);
2775 columnDescriptorsForRoll.clear();
2778 for (
const auto td : tds) {
2779 calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2785 list<ColumnDescriptor>& columns) {
2787 if (
IS_GEO(col_ti.get_type())) {
2788 switch (col_ti.get_type()) {
2797 col_ti.get_comp_param() == 32) {
2798 unit_size = 4 *
sizeof(int8_t);
2801 unit_size = 8 *
sizeof(int8_t);
2805 columns.push_back(physical_cd_coords);
2819 columns.push_back(physical_cd_coords);
2825 bounds_ti.
set_size(4 *
sizeof(
double));
2827 columns.push_back(physical_cd_bounds);
2840 columns.push_back(physical_cd_coords);
2846 physical_cd_linestring_sizes.
columnType = linestring_sizes_ti;
2847 columns.push_back(physical_cd_linestring_sizes);
2853 bounds_ti.
set_size(4 *
sizeof(
double));
2855 columns.push_back(physical_cd_bounds);
2868 columns.push_back(physical_cd_coords);
2874 physical_cd_ring_sizes.
columnType = ring_sizes_ti;
2875 columns.push_back(physical_cd_ring_sizes);
2881 bounds_ti.
set_size(4 *
sizeof(
double));
2883 columns.push_back(physical_cd_bounds);
2896 columns.push_back(physical_cd_coords);
2902 physical_cd_ring_sizes.
columnType = ring_sizes_ti;
2903 columns.push_back(physical_cd_ring_sizes);
2909 physical_cd_poly_rings.
columnType = poly_rings_ti;
2910 columns.push_back(physical_cd_poly_rings);
2916 bounds_ti.
set_size(4 *
sizeof(
double));
2918 columns.push_back(physical_cd_bounds);
2925 throw runtime_error(
"Unrecognized geometry type.");
2933 auto timing_type_entry =
2935 CHECK(timing_type_entry != foreign_table.
options.end());
2936 if (timing_type_entry->second ==
2945 void Catalog::createTable(
2947 const list<ColumnDescriptor>& cols,
2948 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2949 bool isLogicalTable) {
2951 list<ColumnDescriptor> cds = cols;
2952 list<DictDescriptor> dds;
2953 std::set<std::string> toplevel_column_names;
2954 list<ColumnDescriptor> columns;
2959 throw std::runtime_error(
"Only temporary tables can be backed by foreign storage.");
2961 dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2964 for (
auto cd : cds) {
2965 if (cd.columnName ==
"rowid") {
2966 throw std::runtime_error(
2967 "Cannot create column with name rowid. rowid is a system defined column.");
2969 columns.push_back(cd);
2970 toplevel_column_names.insert(cd.columnName);
2971 if (cd.columnType.is_geometry()) {
2972 expandGeoColumn(cd, columns);
2982 #ifdef MATERIALIZED_ROWID
2986 cd.
virtualExpr =
"MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2988 columns.push_back(cd);
2999 columns.push_back(cd_del);
3002 for (
auto& column : columns) {
3003 column.db_id = getDatabaseId();
3009 sqliteConnector_.query(
"BEGIN TRANSACTION");
3012 sqliteConnector_.query_with_text_params(
3013 R
"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
3035 sqliteConnector_.query_with_text_param(
3036 "SELECT tableid FROM mapd_tables WHERE name = ?", td.
tableName);
3037 td.
tableId = sqliteConnector_.getData<
int>(0, 0);
3039 for (
auto cd : columns) {
3041 const bool is_foreign_col =
3042 setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
3043 if (!is_foreign_col) {
3050 auto use_temp_dictionary =
false;
3051 setColumnDictionary(cd, dds, td, isLogicalTable, use_temp_dictionary);
3055 if (toplevel_column_names.count(cd.
columnName)) {
3062 std::vector<BindType> types(17, BindType::TEXT);
3064 types[16] = BindType::NULL_TYPE;
3066 sqliteConnector_.query_with_text_params(
3067 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
3068 "coldim, colscale, is_notnull, "
3069 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
3070 "virtual_expr, is_deletedcol, default_value) "
3071 "VALUES (?, ?, ?, ?, ?, "
3073 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
3097 sqliteConnector_.query_with_text_params(
3098 "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
3104 sqliteConnector_.query_with_text_params(
3105 "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
3106 "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
3109 foreign_table.getOptionsAsJsonString(),
3113 }
catch (std::exception& e) {
3114 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3118 td.
tableId = nextTempTableId_++;
3120 for (
auto cd : columns) {
3122 const bool is_foreign_col =
3123 setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
3125 if (!is_foreign_col) {
3127 std::string fileName(
"");
3128 std::string folderPath(
"");
3129 DictRef dict_ref(currentDB_.dbId, nextTempDictId_);
3146 if (toplevel_column_names.count(cd.
columnName)) {
3157 serializeTableJsonUnlocked(&td, cds);
3162 auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
3164 CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.
tableId}))
3165 <<
"Disk cache at " + cache->getCacheDirectory()
3166 <<
" contains preexisting data for new table. Please "
3167 "delete or clear cache before continuing";
3170 addTableToMap(&td, cds, dds);
3171 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
3173 dataMgr_->getForeignStorageInterface()->registerTable(
this, td, cds);
3175 }
catch (std::exception& e) {
3176 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3177 removeTableFromMap(td.tableName, td.tableId,
true);
3180 sqliteConnector_.query(
"END TRANSACTION");
3183 write_lock.unlock();
3184 sqlite_lock.unlock();
3185 getMetadataForTable(td.tableName,
3191 const std::list<ColumnDescriptor>& cds)
const {
3193 using namespace rapidjson;
3195 VLOG(1) <<
"Serializing temporary table " << td->
tableName <<
" to JSON for Calcite.";
3197 const auto db_name = currentDB_.dbName;
3201 if (boost::filesystem::exists(file_path)) {
3203 std::ifstream reader(file_path.string());
3204 CHECK(reader.is_open());
3205 IStreamWrapper json_read_wrapper(reader);
3206 d.ParseStream(json_read_wrapper);
3210 CHECK(d.IsObject());
3213 Value table(kObjectType);
3215 "name",
Value().SetString(StringRef(td->
tableName.c_str())), d.GetAllocator());
3216 table.AddMember(
"id",
Value().SetInt(td->
tableId), d.GetAllocator());
3217 table.AddMember(
"columns",
Value(kArrayType), d.GetAllocator());
3219 for (
const auto& cd : cds) {
3220 Value column(kObjectType);
3222 "name",
Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
3223 column.AddMember(
"coltype",
3224 Value().SetInt(static_cast<int>(cd.columnType.get_type())),
3226 column.AddMember(
"colsubtype",
3227 Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
3229 column.AddMember(
"compression",
3230 Value().SetInt(static_cast<int>(cd.columnType.get_compression())),
3232 column.AddMember(
"comp_param",
3233 Value().SetInt(static_cast<int>(cd.columnType.get_comp_param())),
3235 column.AddMember(
"size",
3236 Value().SetInt(static_cast<int>(cd.columnType.get_size())),
3239 "coldim",
Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
3241 "colscale",
Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
3243 "is_notnull",
Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
3244 column.AddMember(
"is_systemcol",
Value().SetBool(cd.isSystemCol), d.GetAllocator());
3245 column.AddMember(
"is_virtualcol",
Value().SetBool(cd.isVirtualCol), d.GetAllocator());
3246 column.AddMember(
"is_deletedcol",
Value().SetBool(cd.isDeletedCol), d.GetAllocator());
3247 table[
"columns"].PushBack(column, d.GetAllocator());
3249 d.AddMember(StringRef(td->
tableName.c_str()), table, d.GetAllocator());
3252 std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3253 CHECK(writer.is_open());
3254 OStreamWrapper json_wrapper(writer);
3256 Writer<OStreamWrapper> json_writer(json_wrapper);
3257 d.Accept(json_writer);
3261 void Catalog::dropTableFromJsonUnlocked(
const std::string& table_name)
const {
3263 using namespace rapidjson;
3265 VLOG(1) <<
"Dropping temporary table " << table_name <<
" to JSON for Calcite.";
3267 const auto db_name = currentDB_.dbName;
3270 CHECK(boost::filesystem::exists(file_path));
3273 std::ifstream reader(file_path.string());
3274 CHECK(reader.is_open());
3275 IStreamWrapper json_read_wrapper(reader);
3276 d.ParseStream(json_read_wrapper);
3278 CHECK(d.IsObject());
3279 auto table_name_ref = StringRef(table_name.c_str());
3280 CHECK(d.HasMember(table_name_ref));
3281 CHECK(d.RemoveMember(table_name_ref));
3284 std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3285 CHECK(writer.is_open());
3286 OStreamWrapper json_wrapper(writer);
3288 Writer<OStreamWrapper> json_writer(json_wrapper);
3289 d.Accept(json_writer);
3293 void Catalog::createForeignServer(
3294 std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3295 bool if_not_exists) {
3298 createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
3301 void Catalog::createForeignServerNoLocks(
3302 std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3303 bool if_not_exists) {
3304 const auto&
name = foreign_server->name;
3306 sqliteConnector_.query_with_text_params(
3307 "SELECT name from omnisci_foreign_servers where name = ?",
3308 std::vector<std::string>{
name});
3310 if (sqliteConnector_.getNumRows() == 0) {
3311 foreign_server->creation_time = std::time(
nullptr);
3312 sqliteConnector_.query_with_text_params(
3313 "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
3316 "VALUES (?, ?, ?, ?, ?)",
3317 std::vector<std::string>{
name,
3318 foreign_server->data_wrapper_type,
3321 foreign_server->getOptionsAsJsonString()});
3322 sqliteConnector_.query_with_text_params(
3323 "SELECT id from omnisci_foreign_servers where name = ?",
3324 std::vector<std::string>{
name});
3325 CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
3326 foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
3327 std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
3328 std::move(foreign_server);
3329 CHECK(foreignServerMap_.find(
name) == foreignServerMap_.end())
3330 <<
"Attempting to insert a foreign server into foreign server map that already "
3332 foreignServerMap_[
name] = foreign_server_shared;
3333 foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
3334 }
else if (!if_not_exists) {
3335 throw std::runtime_error{
"A foreign server with name \"" + foreign_server->name +
3336 "\" already exists."};
3339 const auto& server_it = foreignServerMap_.find(
name);
3340 CHECK(server_it != foreignServerMap_.end());
3341 CHECK(foreignServerMapById_.find(server_it->second->id) != foreignServerMapById_.end());
3345 const std::string& server_name)
const {
3349 if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
3350 foreign_server = foreignServerMap_.find(server_name)->second.get();
3352 return foreign_server;
3355 const std::unique_ptr<const foreign_storage::ForeignServer>
3356 Catalog::getForeignServerFromStorage(
const std::string& server_name) {
3357 std::unique_ptr<foreign_storage::ForeignServer> foreign_server =
nullptr;
3359 sqliteConnector_.query_with_text_params(
3360 "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
3361 "FROM omnisci_foreign_servers WHERE name = ?",
3362 std::vector<std::string>{server_name});
3363 if (sqliteConnector_.getNumRows() > 0) {
3364 foreign_server = std::make_unique<foreign_storage::ForeignServer>(
3365 sqliteConnector_.getData<
int>(0, 0),
3366 sqliteConnector_.getData<std::string>(0, 1),
3367 sqliteConnector_.getData<std::string>(0, 2),
3368 sqliteConnector_.getData<std::string>(0, 3),
3369 sqliteConnector_.getData<std::int32_t>(0, 4),
3370 sqliteConnector_.getData<std::int32_t>(0, 5));
3372 return foreign_server;
3375 const std::unique_ptr<const foreign_storage::ForeignTable>
3376 Catalog::getForeignTableFromStorage(
int table_id) {
3377 std::unique_ptr<foreign_storage::ForeignTable> foreign_table =
nullptr;
3379 sqliteConnector_.query_with_text_params(
3380 "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
3381 "omnisci_foreign_tables WHERE table_id = ?",
3382 std::vector<std::string>{
to_string(table_id)});
3383 auto num_rows = sqliteConnector_.getNumRows();
3386 foreign_table = std::make_unique<foreign_storage::ForeignTable>(
3387 sqliteConnector_.getData<
int>(0, 0),
3388 foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
3389 sqliteConnector_.getData<std::string>(0, 2),
3390 sqliteConnector_.getData<int64_t>(0, 3),
3391 sqliteConnector_.getData<int64_t>(0, 4));
3393 return foreign_table;
3396 void Catalog::changeForeignServerOwner(
const std::string& server_name,
3397 const int new_owner_id) {
3400 foreignServerMap_.find(server_name)->second.get();
3401 CHECK(foreign_server);
3402 setForeignServerProperty(server_name,
"owner_user_id",
std::to_string(new_owner_id));
3404 foreign_server->
user_id = new_owner_id;
3407 void Catalog::setForeignServerDataWrapper(
const std::string& server_name,
3408 const std::string& data_wrapper) {
3410 auto data_wrapper_type =
to_upper(data_wrapper);
3413 foreignServerMap_.find(server_name)->second.get();
3414 CHECK(foreign_server);
3419 }
catch (
const std::exception& e) {
3425 setForeignServerProperty(server_name,
"data_wrapper_type", data_wrapper_type);
3428 void Catalog::setForeignServerOptions(
const std::string& server_name,
3429 const std::string& options) {
3433 foreignServerMap_.find(server_name)->second.get();
3434 CHECK(foreign_server);
3435 auto saved_options = foreign_server->
options;
3439 }
catch (
const std::exception& e) {
3442 foreign_server->
options = saved_options;
3445 setForeignServerProperty(server_name,
"options", options);
3448 void Catalog::renameForeignServer(
const std::string& server_name,
3449 const std::string&
name) {
3451 auto foreign_server_it = foreignServerMap_.find(server_name);
3452 CHECK(foreign_server_it != foreignServerMap_.end());
3453 setForeignServerProperty(server_name,
"name", name);
3454 auto foreign_server_shared = foreign_server_it->second;
3455 foreign_server_shared->name =
name;
3456 foreignServerMap_[
name] = foreign_server_shared;
3457 foreignServerMap_.erase(foreign_server_it);
3460 void Catalog::dropForeignServer(
const std::string& server_name) {
3464 sqliteConnector_.query_with_text_params(
3465 "SELECT id from omnisci_foreign_servers where name = ?",
3466 std::vector<std::string>{server_name});
3467 auto num_rows = sqliteConnector_.getNumRows();
3470 auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
3471 sqliteConnector_.query_with_text_param(
3472 "SELECT table_id from omnisci_foreign_tables where server_id = ?",
3474 if (sqliteConnector_.getNumRows() > 0) {
3475 throw std::runtime_error{
"Foreign server \"" + server_name +
3477 "by existing foreign tables and cannot be dropped."};
3479 sqliteConnector_.query(
"BEGIN TRANSACTION");
3481 sqliteConnector_.query_with_text_params(
3482 "DELETE FROM omnisci_foreign_servers WHERE name = ?",
3483 std::vector<std::string>{server_name});
3484 }
catch (
const std::exception& e) {
3485 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3488 sqliteConnector_.query(
"END TRANSACTION");
3489 foreignServerMap_.erase(server_name);
3490 foreignServerMapById_.erase(server_id);
3494 void Catalog::getForeignServersForUser(
3495 const rapidjson::Value* filters,
3497 std::vector<const foreign_storage::ForeignServer*>& results) {
3502 std::map<std::string, std::string> col_names{{
"server_name",
"name"},
3503 {
"data_wrapper",
"data_wrapper_type"},
3504 {
"created_at",
"creation_time"},
3505 {
"options",
"options"}};
3508 std::stringstream filter_string;
3509 std::vector<std::string> arguments;
3511 if (filters !=
nullptr) {
3513 int num_filters = 0;
3514 filter_string <<
" WHERE";
3515 for (
auto& filter_def : filters->GetArray()) {
3516 if (num_filters > 0) {
3517 filter_string <<
" " << std::string(filter_def[
"chain"].GetString());
3521 if (col_names.find(std::string(filter_def[
"attribute"].GetString())) ==
3523 throw std::runtime_error{
"Attribute with name \"" +
3524 std::string(filter_def[
"attribute"].GetString()) +
3525 "\" does not exist."};
3528 filter_string <<
" " << col_names[std::string(filter_def[
"attribute"].GetString())];
3530 bool equals_operator =
false;
3531 if (std::strcmp(filter_def[
"operation"].GetString(),
"EQUALS") == 0) {
3532 filter_string <<
" = ? ";
3533 equals_operator =
true;
3535 filter_string <<
" LIKE ? ";
3538 bool timestamp_column =
3539 (std::strcmp(filter_def[
"attribute"].GetString(),
"created_at") == 0);
3541 if (timestamp_column && !equals_operator) {
3542 throw std::runtime_error{
"LIKE operator is incompatible with TIMESTAMP data"};
3545 if (timestamp_column && equals_operator) {
3547 dateTimeParse<kTIMESTAMP>(filter_def[
"value"].GetString(), 0)));
3549 arguments.emplace_back(filter_def[
"value"].GetString());
3556 std::string query = std::string(
"SELECT name from omnisci_foreign_servers ");
3557 query += filter_string.str();
3559 sqliteConnector_.query_with_text_params(query, arguments);
3560 auto num_rows = sqliteConnector_.getNumRows();
3562 if (sqliteConnector_.getNumRows() == 0) {
3566 CHECK(sqliteConnector_.getNumCols() == 1);
3568 results.reserve(num_rows);
3569 for (
size_t row = 0; row < num_rows; ++row) {
3570 const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
3575 CHECK(foreign_server !=
nullptr);
3579 std::vector<DBObject> privObjects = {dbObject};
3584 results.push_back(foreign_server);
3589 int32_t Catalog::getTableEpoch(
const int32_t db_id,
const int32_t table_id)
const {
3591 const auto td = getMetadataForTable(table_id,
false);
3593 std::stringstream table_not_found_error_message;
3594 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3596 throw std::runtime_error(table_not_found_error_message.str());
3598 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
3599 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3601 const auto physicalTables = physicalTableIt->second;
3602 CHECK(!physicalTables.empty());
3603 size_t curr_epoch{0}, first_epoch{0};
3604 int32_t first_table_id{0};
3605 bool are_epochs_inconsistent{
false};
3606 for (
size_t i = 0; i < physicalTables.size(); i++) {
3607 int32_t physical_tb_id = physicalTables[i];
3608 const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id,
false);
3611 curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
3612 LOG(
INFO) <<
"Got sharded table epoch for db id: " << db_id
3613 <<
", table id: " << physical_tb_id <<
", epoch: " << curr_epoch;
3615 first_epoch = curr_epoch;
3616 first_table_id = physical_tb_id;
3617 }
else if (first_epoch != curr_epoch) {
3618 are_epochs_inconsistent =
true;
3619 LOG(
ERROR) <<
"Epochs on shards do not all agree on table id: " << table_id
3620 <<
", db id: " << db_id
3621 <<
". First table (table id: " << first_table_id
3622 <<
") has epoch: " << first_epoch <<
". Table id: " << physical_tb_id
3623 <<
", has inconsistent epoch: " << curr_epoch
3624 <<
". See previous INFO logs for all epochs and their table ids.";
3627 if (are_epochs_inconsistent) {
3633 auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3634 LOG(
INFO) <<
"Got table epoch for db id: " << db_id <<
", table id: " << table_id
3635 <<
", epoch: " << epoch;
3640 std::vector<const foreign_storage::ForeignTable*>
3641 Catalog::getAllForeignTablesForForeignServer(
const int32_t foreign_server_id) {
3643 std::vector<const foreign_storage::ForeignTable*> foreign_tables;
3644 for (
auto entry : tableDescriptorMapById_) {
3645 auto table_descriptor = entry.second;
3648 CHECK(foreign_table);
3649 if (foreign_table->foreign_server->id == foreign_server_id) {
3650 foreign_tables.emplace_back(foreign_table);
3654 return foreign_tables;
3657 void Catalog::setTableEpoch(
const int db_id,
const int table_id,
int new_epoch) {
3658 LOG(
INFO) <<
"Set table epoch db:" << db_id <<
" Table ID " << table_id
3659 <<
" back to new epoch " << new_epoch;
3660 const auto td = getMetadataForTable(table_id,
false);
3662 std::stringstream table_not_found_error_message;
3663 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3665 throw std::runtime_error(table_not_found_error_message.str());
3668 std::stringstream is_temp_table_error_message;
3669 is_temp_table_error_message <<
"Cannot set epoch on temporary table";
3670 throw std::runtime_error(is_temp_table_error_message.str());
3674 file_mgr_params.
epoch = new_epoch;
3677 const auto physical_tables = getPhysicalTablesDescriptors(td,
false);
3678 CHECK(!physical_tables.empty());
3679 for (
const auto table : physical_tables) {
3680 auto table_id = table->tableId;
3681 LOG(
INFO) <<
"Set sharded table epoch db:" << db_id <<
" Table ID " << table_id
3682 <<
" back to new epoch " << new_epoch;
3685 removeChunks(table_id);
3686 dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3690 void Catalog::alterPhysicalTableMetadata(
3701 sqliteConnector_.query_with_text_params(
3702 "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
3709 sqliteConnector_.query_with_text_params(
3710 "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
3721 sqliteConnector_.query(
"BEGIN TRANSACTION");
3723 const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->
tableId);
3724 if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3725 const auto physical_tables = physical_table_it->second;
3726 CHECK(!physical_tables.empty());
3727 for (
size_t i = 0; i < physical_tables.size(); i++) {
3728 int32_t physical_tb_id = physical_tables[i];
3729 const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id,
false);
3731 alterPhysicalTableMetadata(phys_td, table_update_params);
3734 alterPhysicalTableMetadata(td, table_update_params);
3735 }
catch (std::exception& e) {
3736 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3739 sqliteConnector_.query(
"END TRANSACTION");
3742 void Catalog::setMaxRollbackEpochs(
const int32_t table_id,
3743 const int32_t max_rollback_epochs) {
3746 if (max_rollback_epochs <= -1) {
3747 throw std::runtime_error(
"Cannot set max_rollback_epochs < 0.");
3749 const auto td = getMetadataForTable(
3755 if (table_update_params == td) {
3756 LOG(
INFO) <<
"Setting max_rollback_epochs for table " << table_id
3757 <<
" to existing value, skipping operation";
3761 file_mgr_params.
epoch = -1;
3763 setTableFileMgrParams(table_id, file_mgr_params);
3764 alterTableMetadata(td, table_update_params);
3767 void Catalog::setMaxRows(
const int32_t table_id,
const int64_t max_rows) {
3769 throw std::runtime_error(
"Max rows cannot be a negative number.");
3771 const auto td = getMetadataForTable(table_id);
3774 table_update_params.
max_rows = max_rows;
3775 if (table_update_params == td) {
3776 LOG(
INFO) <<
"Max rows value of " << max_rows
3777 <<
" is the same as the existing value. Skipping update.";
3780 alterTableMetadata(td, table_update_params);
3781 CHECK(td->fragmenter);
3782 td->fragmenter->dropFragmentsToSize(max_rows);
3786 void Catalog::setUncappedTableEpoch(
const std::string& table_name) {
3788 auto td_entry = tableDescriptorMap_.find(
to_upper(table_name));
3789 CHECK(td_entry != tableDescriptorMap_.end());
3790 auto td = td_entry->second;
3798 alterTableMetadata(td, table_update_params);
3801 setTableFileMgrParams(td->tableId, file_mgr_params);
3804 void Catalog::setTableFileMgrParams(
3808 const auto td = getMetadataForTable(table_id,
false);
3809 const auto db_id = this->getDatabaseId();
3811 std::stringstream table_not_found_error_message;
3812 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3814 throw std::runtime_error(table_not_found_error_message.str());
3817 std::stringstream is_temp_table_error_message;
3818 is_temp_table_error_message <<
"Cannot set storage params on temporary table";
3819 throw std::runtime_error(is_temp_table_error_message.str());
3822 const auto physical_tables = getPhysicalTablesDescriptors(td,
false);
3823 CHECK(!physical_tables.empty());
3824 for (
const auto table : physical_tables) {
3825 auto table_id = table->tableId;
3826 removeChunks(table_id);
3827 dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3831 std::vector<TableEpochInfo> Catalog::getTableEpochs(
const int32_t db_id,
3832 const int32_t table_id)
const {
3834 std::vector<TableEpochInfo> table_epochs;
3835 const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3836 if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3837 const auto physical_tables = physical_table_it->second;
3838 CHECK(!physical_tables.empty());
3840 for (
const auto physical_tb_id : physical_tables) {
3841 const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3844 auto table_id = phys_td->tableId;
3845 auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3846 table_epochs.emplace_back(table_id, epoch);
3847 LOG(
INFO) <<
"Got sharded table epoch for db id: " << db_id
3848 <<
", table id: " << table_id <<
", epoch: " << epoch;
3851 auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3852 LOG(
INFO) <<
"Got table epoch for db id: " << db_id <<
", table id: " << table_id
3853 <<
", epoch: " << epoch;
3854 table_epochs.emplace_back(table_id, epoch);
3856 return table_epochs;
3859 void Catalog::setTableEpochs(
const int32_t db_id,
3860 const std::vector<TableEpochInfo>& table_epochs)
const {
3861 const auto td = getMetadataForTable(table_epochs[0].table_id,
false);
3866 for (
const auto& table_epoch_info : table_epochs) {
3867 removeChunks(table_epoch_info.table_id);
3868 file_mgr_params.
epoch = table_epoch_info.table_epoch;
3869 dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3870 db_id, table_epoch_info.table_id, file_mgr_params);
3871 LOG(
INFO) <<
"Set table epoch for db id: " << db_id
3872 <<
", table id: " << table_epoch_info.table_id
3873 <<
", back to epoch: " << table_epoch_info.table_epoch;
3879 std::string table_epochs_str{
"["};
3880 bool first_entry{
true};
3881 for (
const auto& table_epoch : table_epochs) {
3883 first_entry =
false;
3885 table_epochs_str +=
", ";
3887 table_epochs_str +=
"(table_id: " +
std::to_string(table_epoch.table_id) +
3890 table_epochs_str +=
"]";
3891 return table_epochs_str;
3895 void Catalog::setTableEpochsLogExceptions(
3896 const int32_t db_id,
3897 const std::vector<TableEpochInfo>& table_epochs)
const {
3899 setTableEpochs(db_id, table_epochs);
3900 }
catch (std::exception& e) {
3901 LOG(
ERROR) <<
"An error occurred when attempting to set table epochs. DB id: "
3903 <<
", Error: " << e.what();
3909 const auto it = deletedColumnPerTable_.find(td);
3910 return it != deletedColumnPerTable_.end() ? it->second :
nullptr;
3914 int delete_column_id)
const {
3919 return fragmenter->hasDeletedRows(delete_column_id);
3927 std::vector<const TableDescriptor*> tds;
3932 const auto it = deletedColumnPerTable_.find(td);
3934 if (it == deletedColumnPerTable_.end()) {
3938 tds = getPhysicalTablesDescriptors(td,
false);
3941 for (
auto tdd : tds) {
3942 if (checkMetadataForDeletedRecs(tdd, cd->
columnId)) {
3952 setDeletedColumnUnlocked(td, cd);
3957 const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3958 CHECK(it_ok.second);
3977 const bool persist_reference) {
3980 CHECK(foreign_ref_col);
3981 referencing_column.
columnType = foreign_ref_col->columnType;
3983 const DictRef dict_ref(currentDB_.dbId, dict_id);
3984 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3985 CHECK(dictIt != dictDescriptorMapByRef_.end());
3986 const auto& dd = dictIt->second;
3989 if (persist_reference) {
3991 sqliteConnector_.query_with_text_params(
3992 "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3997 bool Catalog::setColumnSharedDictionary(
3999 std::list<ColumnDescriptor>& cdd,
4000 std::list<DictDescriptor>& dds,
4002 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
4006 if (shared_dict_defs.empty()) {
4009 for (
const auto& shared_dict_def : shared_dict_defs) {
4011 const auto& column = shared_dict_def.get_column();
4013 if (!shared_dict_def.get_foreign_table().compare(td.
tableName)) {
4015 const auto& ref_column = shared_dict_def.get_foreign_column();
4017 std::find_if(cdd.begin(), cdd.end(), [ref_column](
const ColumnDescriptor it) {
4018 return !ref_column.compare(it.columnName);
4020 CHECK(colIt != cdd.end());
4025 auto dictIt = std::find_if(
4026 dds.begin(), dds.end(), [
this, dict_id](
const DictDescriptor it) {
4027 return it.dictRef.dbId == this->currentDB_.dbId &&
4028 it.dictRef.dictId == dict_id;
4030 if (dictIt != dds.end()) {
4036 sqliteConnector_.query_with_text_params(
4037 "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
4047 const auto& foreign_table_name = shared_dict_def.get_foreign_table();
4048 const auto foreign_td = getMetadataForTable(foreign_table_name,
false);
4051 throw std::runtime_error(
4052 "Only temporary tables can share dictionaries with other temporary "
4055 addReferenceToForeignDict(cd, shared_dict_def,
false);
4067 std::list<DictDescriptor>& dds,
4069 bool is_logical_table,
4070 bool use_temp_dictionary) {
4073 std::string dictName{
"Initial_key"};
4075 std::string folderPath;
4076 if (is_logical_table) {
4079 sqliteConnector_.query_with_text_params(
4080 "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
4082 std::vector<std::string>{
4084 sqliteConnector_.query_with_text_param(
4085 "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
4086 dictId = sqliteConnector_.getData<
int>(0, 0);
4088 sqliteConnector_.query_with_text_param(
4089 "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
4100 use_temp_dictionary);
4109 void Catalog::createShardedTable(
4111 const list<ColumnDescriptor>& cols,
4112 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
4115 createTable(*tdl, cols, shared_dict_defs,
true);
4116 int32_t logical_tb_id = tdl->
tableId;
4117 std::string logical_table_name = tdl->
tableName;
4120 std::vector<int32_t> physicalTables;
4121 for (int32_t i = 1; i <= td.
nShards; i++) {
4123 tdp->
tableName = generatePhysicalTableName(logical_table_name, i);
4125 createTable(*tdp, cols, shared_dict_defs,
false);
4126 int32_t physical_tb_id = tdp->
tableId;
4129 physicalTables.push_back(physical_tb_id);
4132 if (!physicalTables.empty()) {
4136 logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
4137 CHECK(it_ok.second);
4140 updateLogicalToPhysicalTableMap(logical_tb_id);
4147 const auto physical_tables = getPhysicalTablesDescriptors(td);
4148 for (
const auto table : physical_tables) {
4149 doTruncateTable(table);
4155 removeFragmenterForTable(td->
tableId);
4157 const int tableId = td->
tableId;
4158 ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4163 dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4166 std::unique_ptr<StringDictionaryClient> client;
4168 CHECK(!string_dict_hosts_.empty());
4169 DictRef dict_ref(currentDB_.dbId, -1);
4174 for (
const auto& columnDescriptor : columnDescriptorMapById_) {
4175 auto cd = columnDescriptor.second;
4182 const DictRef dict_ref(currentDB_.dbId, dict_id);
4183 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
4184 CHECK(dictIt != dictDescriptorMapByRef_.end());
4185 const auto& dd = dictIt->second;
4188 if (dd->refcount == 1) {
4190 dd->stringDict.reset();
4193 client->drop(dd->dictRef);
4195 if (!dd->dictIsTemp) {
4196 boost::filesystem::create_directory(dd->dictFolderPath);
4207 dictDescriptorMapByRef_.erase(dictIt);
4212 dictDescriptorMapByRef_[new_dd->
dictRef].reset(new_dd);
4220 for (
auto col_id = 0; col_id < td.
nColumns; ++col_id) {
4221 if (
auto it = columnDescriptorMapById_.find({td.
tableId, col_id});
4222 it != columnDescriptorMapById_.end()) {
4223 auto cd = it->second;
4226 DictRef dict_ref(currentDB_.dbId, dict_id);
4227 if (
auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4228 dict_it != dictDescriptorMapByRef_.end()) {
4230 dict_it->second->stringDict =
nullptr;
4232 getMetadataForDict(dict_id,
true);
4239 void Catalog::invalidateCachesForTable(
const int table_id) {
4242 ChunkKey const table_key{getDatabaseId(), table_id};
4243 auto td = getMutableMetadataForTableUnlocked(table_id);
4249 refreshDictionaryCachesForTableUnlocked(*td);
4253 if (td->fragmenter !=
nullptr) {
4254 auto tableDescIt = tableDescriptorMapById_.find(table_id);
4255 CHECK(tableDescIt != tableDescriptorMapById_.end());
4256 tableDescIt->second->fragmenter =
nullptr;
4257 CHECK(td->fragmenter ==
nullptr);
4261 if (dynamic_cast<foreign_storage::ForeignTable*>(td)) {
4262 dataMgr_->getPersistentStorageMgr()->getForeignStorageMgr()->refreshTable(table_key,
4265 dataMgr_->removeMutableTableDiskCacheData(currentDB_.dbId, table_id);
4267 instantiateFragmenter(td);
4270 void Catalog::removeFragmenterForTable(
const int table_id)
const {
4272 auto td = getMetadataForTable(table_id,
false);
4273 if (td->fragmenter !=
nullptr) {
4274 auto tableDescIt = tableDescriptorMapById_.find(table_id);
4275 CHECK(tableDescIt != tableDescriptorMapById_.end());
4276 tableDescIt->second->fragmenter =
nullptr;
4277 CHECK(td->fragmenter ==
nullptr);
4282 void Catalog::removeChunks(
const int table_id)
const {
4283 removeFragmenterForTable(table_id);
4286 ChunkKey chunkKey = {currentDB_.dbId, table_id};
4295 std::vector<const TableDescriptor*> tables_to_drop;
4298 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->
tableId);
4299 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4301 const auto physicalTables = physicalTableIt->second;
4302 CHECK(!physicalTables.empty());
4303 for (
size_t i = 0; i < physicalTables.size(); i++) {
4304 int32_t physical_tb_id = physicalTables[i];
4306 getMutableMetadataForTableUnlocked(physical_tb_id);
4308 tables_to_drop.emplace_back(phys_td);
4311 tables_to_drop.emplace_back(td);
4314 for (
auto table : tables_to_drop) {
4315 eraseTablePhysicalData(table);
4317 deleteTableCatalogMetadata(td, tables_to_drop);
4320 void Catalog::deleteTableCatalogMetadata(
4322 const std::vector<const TableDescriptor*>& physical_tables) {
4325 sqliteConnector_.query(
"BEGIN TRANSACTION");
4328 sqliteConnector_.query_with_text_param(
4329 "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
4331 logicalToPhysicalTableMapById_.erase(logical_table->
tableId);
4332 for (
auto table : physical_tables) {
4333 eraseTableMetadata(table);
4335 }
catch (std::exception& e) {
4336 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4339 sqliteConnector_.query(
"END TRANSACTION");
4343 executeDropTableSqliteQueries(td);
4345 dropTableFromJsonUnlocked(td->
tableName);
4347 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4355 const int tableId = td->
tableId;
4356 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_tables WHERE tableid = ?",
4358 sqliteConnector_.query_with_text_params(
4359 "select comp_param from mapd_columns where compression = ? and tableid = ?",
4361 int numRows = sqliteConnector_.getNumRows();
4362 std::vector<int> dict_id_list;
4363 for (
int r = 0; r < numRows; ++r) {
4364 dict_id_list.push_back(sqliteConnector_.getData<
int>(r, 0));
4366 for (
auto dict_id : dict_id_list) {
4367 sqliteConnector_.query_with_text_params(
4368 "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
4371 sqliteConnector_.query_with_text_params(
4372 "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
4373 "mapd_columns where compression = ? "
4374 "and tableid = ?) and refcount = 0",
4376 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_columns WHERE tableid = ?",
4379 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_views WHERE tableid = ?",
4383 sqliteConnector_.query_with_text_param(
4384 "DELETE FROM omnisci_foreign_tables WHERE table_id = ?",
std::to_string(tableId));
4388 void Catalog::renamePhysicalTable(
const TableDescriptor* td,
const string& newTableName) {
4392 sqliteConnector_.query(
"BEGIN TRANSACTION");
4394 sqliteConnector_.query_with_text_params(
4395 "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4397 }
catch (std::exception& e) {
4398 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4401 sqliteConnector_.query(
"END TRANSACTION");
4402 TableDescriptorMap::iterator tableDescIt =
4404 CHECK(tableDescIt != tableDescriptorMap_.end());
4405 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4409 tableDescriptorMap_.erase(tableDescIt);
4410 tableDescriptorMap_[
to_upper(newTableName)] = changeTd;
4411 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4419 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->
tableId);
4420 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4421 const auto physicalTables = physicalTableIt->second;
4422 CHECK(!physicalTables.empty());
4423 for (
size_t i = 0; i < physicalTables.size(); i++) {
4424 int32_t physical_tb_id = physicalTables[i];
4427 std::string newPhysTableName =
4428 generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
4429 renamePhysicalTable(phys_td, newPhysTableName);
4432 renamePhysicalTable(td, newTableName);
4438 key.
dbId = currentDB_.dbId;
4441 object.setObjectKey(key);
4444 for (
auto obj : objdescs) {
4454 void Catalog::renamePhysicalTables(
4455 std::vector<std::pair<std::string, std::string>>& names,
4456 std::vector<int>& tableIds) {
4461 for (
size_t i = 0; i < names.size(); i++) {
4462 int tableId = tableIds[i];
4463 const std::string& newTableName = names[i].second;
4464 sqliteConnector_.query_with_text_params(
4465 "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4466 std::vector<std::string>{newTableName,
std::to_string(tableId)});
4470 for (
size_t i = 0; i < names.size(); i++) {
4471 const auto& [curTableName, newTableName] = names[i];
4473 TableDescriptorMap::iterator tableDescIt =
4474 tableDescriptorMap_.find(
to_upper(curTableName));
4475 CHECK(tableDescIt != tableDescriptorMap_.end());
4476 calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4481 tableDescriptorMap_.erase(tableDescIt);
4482 tableDescriptorMap_[
to_upper(newTableName)] = changeTd;
4483 calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4491 const std::map<std::string, int>& cached_table_map,
4492 const std::string& cur_table_name) {
4493 if (
auto it = cached_table_map.find(cur_table_name); it != cached_table_map.end()) {
4494 auto table_id = it->second;
4495 return (table_id == -1) ? NULL : getMetadataForTable(table_id);
4497 return getMetadataForTable(cur_table_name);
4502 const std::string& curTableName,
4503 const std::string& newTableName,
4506 cachedTableMap[curTableName] = -1;
4509 cachedTableMap[newTableName] = tableId;
4513 void Catalog::renameTables(
4514 const std::vector<std::pair<std::string, std::string>>& names) {
4517 std::vector<int> tableIds;
4524 std::map<int, size_t> uniqueOrderedTableIds;
4527 std::map<std::string, int> cachedTableMap;
4532 for (
size_t i = 0; i < names.size(); i++) {
4533 const auto& [curTableName, newTableName] = names[i];
4537 auto td = getCachedTableDescriptor(cachedTableMap, curTableName);
4540 tableIds.push_back(td->tableId);
4541 if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
4543 uniqueOrderedTableIds[td->tableId] = i;
4548 CHECK_EQ(tableIds.size(), names.size());
4562 for (
auto& idPair : uniqueOrderedTableIds) {
4563 const std::string& tableName = names[idPair.second].first;
4564 tableLocks.emplace_back(
4567 *
this, tableName,
false)));
4575 std::vector<std::pair<std::string, std::string>> allNames;
4576 std::vector<int> allTableIds;
4578 for (
size_t i = 0; i < names.size(); i++) {
4579 int tableId = tableIds[i];
4580 const auto& [curTableName, newTableName] = names[i];
4583 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
4584 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4585 const auto physicalTables = physicalTableIt->second;
4586 CHECK(!physicalTables.empty());
4587 for (
size_t k = 0; k < physicalTables.size(); k++) {
4588 int32_t physical_tb_id = physicalTables[k];
4591 std::string newPhysTableName = generatePhysicalTableName(newTableName, (k + 1));
4592 allNames.emplace_back(phys_td->
tableName, newPhysTableName);
4593 allTableIds.push_back(phys_td->
tableId);
4596 allNames.emplace_back(curTableName, newTableName);
4597 allTableIds.push_back(tableId);
4601 execInTransaction(&Catalog::renamePhysicalTables, allNames, allTableIds);
4605 for (
size_t i = 0; i < names.size(); i++) {
4606 int tableId = tableIds[i];
4607 const std::string& newTableName = names[i].second;
4611 key.
dbId = currentDB_.dbId;
4616 object.setObjectKey(key);
4620 for (
auto obj : objdescs) {
4633 const string& newColumnName) {
4636 sqliteConnector_.query(
"BEGIN TRANSACTION");
4641 std::string new_column_name = cdx->columnName;
4642 new_column_name.replace(0, cd->
columnName.size(), newColumnName);
4643 sqliteConnector_.query_with_text_params(
4644 "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
4645 std::vector<std::string>{new_column_name,
4649 }
catch (std::exception& e) {
4650 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4653 sqliteConnector_.query(
"END TRANSACTION");
4654 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4658 ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
4660 CHECK(columnDescIt != columnDescriptorMap_.end());
4663 columnDescriptorMap_.erase(columnDescIt);
4664 columnDescriptorMap_[std::make_tuple(td->
tableId,
to_upper(changeCd->columnName))] =
4667 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4673 sqliteConnector_.query(
"BEGIN TRANSACTION");
4676 sqliteConnector_.query_with_text_params(
4677 "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
4679 if (sqliteConnector_.getNumRows() > 0) {
4680 sqliteConnector_.query_with_text_params(
4681 "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
4683 "datetime('now') where name = ? "
4691 sqliteConnector_.query_with_text_params(
4692 "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
4697 "datetime('now'), ?)",
4704 }
catch (std::exception& e) {
4705 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4708 sqliteConnector_.query(
"END TRANSACTION");
4712 sqliteConnector_.query_with_text_params(
4713 "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
4714 "WHERE name = ? and userid = ?",
4716 vd.
dashboardId = sqliteConnector_.getData<
int>(0, 0);
4717 vd.
updateTime = sqliteConnector_.getData<std::string>(0, 1);
4718 }
catch (std::exception& e) {
4723 addFrontendViewToMap(vd);
4726 if (!isInfoSchemaDb()) {
4728 createOrUpdateDashboardSystemRole(
4738 CHECK(sqliteConnector_.getSqlitePtr());
4739 sqliteConnector_.query(
"BEGIN TRANSACTION");
4741 sqliteConnector_.query_with_text_params(
4742 "SELECT id FROM mapd_dashboards WHERE id = ?",
4744 if (sqliteConnector_.getNumRows() > 0) {
4745 sqliteConnector_.query_with_text_params(
4746 "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
4747 "?, userid = ?, update_time = datetime('now') where id = ? ",
4756 <<
" does not exist in db";
4757 throw runtime_error(
"Error replacing dashboard id " +
4760 }
catch (std::exception& e) {
4761 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4764 sqliteConnector_.query(
"END TRANSACTION");
4767 for (
auto descp : dashboardDescriptorMap_) {
4768 auto dash = descp.second.get();
4771 auto viewDescIt = dashboardDescriptorMap_.find(
std::to_string(dash->userId) +
":" +
4772 dash->dashboardName);
4774 dashboardDescriptorMap_.end()) {
4775 LOG(
ERROR) <<
"No metadata for dashboard for user " << dash->userId
4776 <<
" dashboard " << dash->dashboardName <<
" does not exist in map";
4777 throw runtime_error(
"No metadata for dashboard for user " +
4779 dash->dashboardName +
" does not exist in map");
4781 dashboardDescriptorMap_.erase(viewDescIt);
4787 <<
" does not exist in map";
4789 " does not exist in map");
4793 sqliteConnector_.query_with_text_params(
4794 "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4798 vd.
updateTime = sqliteConnector_.getData<
string>(0, 1);
4801 addFrontendViewToMapNoLock(vd);
4804 if (!isInfoSchemaDb()) {
4806 createOrUpdateDashboardSystemRole(
4811 std::string Catalog::calculateSHA1(
const std::string& data) {
4812 boost::uuids::detail::sha1 sha1;
4813 unsigned int digest[5];
4814 sha1.process_bytes(data.c_str(), data.length());
4815 sha1.get_digest(digest);
4816 std::stringstream ss;
4817 for (
size_t i = 0; i < 5; i++) {
4818 ss << std::hex << digest[i];
4826 sqliteConnector_.query(
"BEGIN TRANSACTION");
4830 sqliteConnector_.query_with_text_params(
4831 "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4833 if (sqliteConnector_.getNumRows() > 0) {
4834 sqliteConnector_.query_with_text_params(
4835 "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4839 sqliteConnector_.query_with_text_params(
4840 "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4841 "update_time) VALUES (?,?,?,?, datetime('now'))",
4842 std::vector<std::string>{
4846 sqliteConnector_.query_with_text_param(
4847 "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4850 ld.
linkId = sqliteConnector_.getData<
int>(0, 0);
4851 ld.
updateTime = sqliteConnector_.getData<std::string>(0, 1);
4852 }
catch (std::exception& e) {
4853 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4856 sqliteConnector_.query(
"END TRANSACTION");
4865 const auto column_descriptors =
4866 getAllColumnMetadataForTable(td->
tableId,
false,
true,
true);
4870 for (
auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4879 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4881 bool populate_fragmenter)
const {
4883 const auto physicalTableIt =
4884 logicalToPhysicalTableMapById_.find(logical_table_desc->
tableId);
4885 if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4886 return {logical_table_desc};
4888 const auto physicalTablesIds = physicalTableIt->second;
4889 CHECK(!physicalTablesIds.empty());
4891 std::vector<const TableDescriptor*> physicalTables;
4892 for (
size_t i = 0; i < physicalTablesIds.size(); i++) {
4893 physicalTables.push_back(
4894 getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4896 return physicalTables;
4899 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4902 std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4903 table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4904 for (
const auto [table_id, td] : tableDescriptorMapById_) {
4906 if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4907 logicalToPhysicalTableMapById_.find(table_id) ==
4908 logicalToPhysicalTableMapById_.end()) {
4909 table_and_shard_ids.emplace_back(table_id, td->shard);
4912 return table_and_shard_ids;
4915 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4918 std::map<int, const ColumnDescriptor*> mapping;
4920 const auto tables = getAllTableMetadata();
4921 for (
const auto td :
tables) {
4922 if (td->shard >= 0) {
4927 for (
auto& cd : getAllColumnMetadataForTable(td->tableId,
false,
false,
true)) {
4928 const auto& ti = cd->columnType;
4929 if (ti.is_string()) {
4932 const auto dict_id = ti.get_comp_param();
4935 if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4936 mapping[dict_id] = cd;
4949 if (td->
shard >= 0) {
4953 switch (get_tables_type) {
4971 std::vector<DBObject> privObjects = {dbObject};
4979 std::vector<std::string> Catalog::getTableNamesForUser(
4984 std::vector<std::string> table_names;
4985 const auto tables = getAllTableMetadata();
4986 for (
const auto td :
tables) {
4987 if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4988 table_names.push_back(td->tableName);
4994 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4997 const std::string& filter_table_name)
const {
5001 std::vector<TableMetadata> tables_metadata;
5002 const auto tables = getAllTableMetadata();
5003 for (
const auto td :
tables) {
5004 if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
5005 if (!filter_table_name.empty()) {
5006 if (td->tableName != filter_table_name) {
5012 tables_metadata.emplace_back(table_metadata);
5015 return tables_metadata;
5018 int Catalog::getLogicalTableId(
const int physicalTableId)
const {
5020 for (
const auto& l : logicalToPhysicalTableMapById_) {
5021 if (l.second.end() != std::find_if(l.second.begin(),
5023 [&](decltype(*l.second.begin()) tid) ->
bool {
5024 return physicalTableId == tid;
5029 return physicalTableId;
5032 void Catalog::checkpoint(
const int logicalTableId)
const {
5033 const auto td = getMetadataForTable(logicalTableId);
5034 const auto shards = getPhysicalTablesDescriptors(td);
5035 for (
const auto shard : shards) {
5036 getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
5040 void Catalog::checkpointWithAutoRollback(
const int logical_table_id)
const {
5041 auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
5043 checkpoint(logical_table_id);
5045 setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
5050 void Catalog::resetTableEpochFloor(
const int logicalTableId)
const {
5052 const auto td = getMetadataForTable(logicalTableId,
false);
5053 const auto shards = getPhysicalTablesDescriptors(td,
false);
5054 for (
const auto shard : shards) {
5055 getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
5059 void Catalog::eraseDbMetadata() {
5060 const auto tables = getAllTableMetadata();
5061 for (
const auto table :
tables) {
5062 eraseTableMetadata(table);
5067 calciteMgr_->updateMetadata(currentDB_.dbName,
"");
5070 void Catalog::eraseDbPhysicalData() {
5071 const auto tables = getAllTableMetadata();
5072 for (
const auto table :
tables) {
5073 eraseTablePhysicalData(table);
5078 const int tableId = td->
tableId;
5080 removeFragmenterForTable(tableId);
5082 ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
5091 dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
5095 std::string Catalog::generatePhysicalTableName(
const std::string& logicalTableName,
5096 const size_t shardNumber) {
5097 std::string physicalTableName =
5098 logicalTableName + physicalTableNameTag_ +
std::to_string(shardNumber);
5099 return (physicalTableName);
5102 void Catalog::buildForeignServerMapUnlocked() {
5104 sqliteConnector_.query(
5105 "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
5106 "omnisci_foreign_servers");
5107 auto num_rows = sqliteConnector_.getNumRows();
5109 for (
size_t row = 0; row < num_rows; row++) {
5110 auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
5111 sqliteConnector_.getData<
int>(row, 0),
5112 sqliteConnector_.getData<std::string>(row, 1),
5113 sqliteConnector_.getData<std::string>(row, 2),
5114 sqliteConnector_.getData<std::string>(row, 3),
5115 sqliteConnector_.getData<std::int32_t>(row, 4),
5116 sqliteConnector_.getData<std::int32_t>(row, 5));
5117 foreignServerMap_[foreign_server->name] = foreign_server;
5118 foreignServerMapById_[foreign_server->id] = foreign_server;
5122 void Catalog::updateForeignTablesInMapUnlocked() {
5124 sqliteConnector_.query(
5125 "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
5126 "omnisci_foreign_tables");
5127 auto num_rows = sqliteConnector_.getNumRows();
5128 for (
size_t r = 0; r < num_rows; r++) {
5129 const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
5130 const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
5131 const auto& options = sqliteConnector_.getData<std::string>(r, 2);
5132 const auto last_refresh_time = sqliteConnector_.getData<int64_t>(r, 3);
5133 const auto next_refresh_time = sqliteConnector_.getData<int64_t>(r, 4);
5135 CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
5136 auto foreign_table =
5138 CHECK(foreign_table);
5139 foreign_table->foreign_server = foreignServerMapById_[server_id].get();
5140 CHECK(foreign_table->foreign_server);
5141 foreign_table->populateOptionsMap(options);
5142 foreign_table->last_refresh_time = last_refresh_time;
5143 foreign_table->next_refresh_time = next_refresh_time;
5144 if (foreign_table->is_system_table) {
5145 foreign_table->is_in_memory_system_table =
5147 foreign_table->foreign_server->data_wrapper_type);
5155 <<
"reloadForeignTable expects a table with valid id";
5156 sqliteConnector_.query(
5157 "SELECT server_id, options, last_refresh_time, next_refresh_time from "
5158 "omnisci_foreign_tables WHERE table_id == " +
5160 auto num_rows = sqliteConnector_.getNumRows();
5161 CHECK_EQ(num_rows, 1U) <<
"Expected single entry in omnisci_foreign_tables for table'"
5162 << foreign_table.
tableName <<
"', instead got " << num_rows;
5163 const auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
5164 const auto& options = sqliteConnector_.getData<std::string>(0, 1);
5165 const auto last_refresh_time = sqliteConnector_.getData<int64_t>(0, 2);
5166 const auto next_refresh_time = sqliteConnector_.getData<int64_t>(0, 3);
5168 foreign_table.
foreign_server = foreignServerMapById_[server_id].get();
5180 void Catalog::reloadDictionariesFromDiskUnlocked() {
5181 std::string dictQuery(
5182 "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
5183 sqliteConnector_.query(dictQuery);
5184 auto numRows = sqliteConnector_.getNumRows();
5185 for (
size_t r = 0; r < numRows; ++r) {
5186 auto dictId = sqliteConnector_.getData<
int>(r, 0);
5187 auto dictName = sqliteConnector_.getData<
string>(r, 1);
5188 auto dictNBits = sqliteConnector_.getData<
int>(r, 2);
5189 auto is_shared = sqliteConnector_.getData<
bool>(r, 3);
5190 auto refcount = sqliteConnector_.getData<
int>(r, 4);
5193 DictRef dict_ref(currentDB_.dbId, dictId);
5194 DictDescriptor dd(dict_ref, dictName, dictNBits, is_shared, refcount, fname,
false);
5195 if (
auto it = dictDescriptorMapByRef_.find(dict_ref);
5196 it == dictDescriptorMapByRef_.end()) {
5197 dictDescriptorMapByRef_[dict_ref] = std::make_unique<DictDescriptor>(dd);
5204 std::list<ColumnDescriptor*> Catalog::sqliteGetColumnsForTableUnlocked(int32_t table_id) {
5205 std::list<ColumnDescriptor*> cds;
5208 std::list<std::unique_ptr<ColumnDescriptor>> smart_cds;
5209 std::string columnQuery(
5210 "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
5211 "is_notnull, compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
5212 "virtual_expr, is_deletedcol, default_value from mapd_columns WHERE tableid = " +
5214 sqliteConnector_.query(columnQuery);
5215 auto numRows = sqliteConnector_.getNumRows();
5216 int32_t skip_physical_cols = 0;
5217 for (
size_t r = 0; r < numRows; ++r) {
5218 std::unique_ptr<ColumnDescriptor> cd = std::make_unique<ColumnDescriptor>();
5219 cd->tableId = sqliteConnector_.getData<
int>(r, 0);
5220 cd->columnId = sqliteConnector_.getData<
int>(r, 1);
5221 cd->columnName = sqliteConnector_.getData<
string>(r, 2);
5222 cd->columnType.set_type((
SQLTypes)sqliteConnector_.getData<
int>(r, 3));
5223 cd->columnType.set_subtype((
SQLTypes)sqliteConnector_.getData<
int>(r, 4));
5224 cd->columnType.set_dimension(sqliteConnector_.getData<
int>(r, 5));
5225 cd->columnType.set_scale(sqliteConnector_.getData<
int>(r, 6));
5226 cd->columnType.set_notnull(sqliteConnector_.getData<
bool>(r, 7));
5227 cd->columnType.set_compression((
EncodingType)sqliteConnector_.getData<
int>(r, 8));
5228 cd->columnType.set_comp_param(sqliteConnector_.getData<
int>(r, 9));
5229 cd->columnType.set_size(sqliteConnector_.getData<
int>(r, 10));
5230 cd->chunks = sqliteConnector_.getData<
string>(r, 11);
5231 cd->isSystemCol = sqliteConnector_.getData<
bool>(r, 12);
5232 cd->isVirtualCol = sqliteConnector_.getData<
bool>(r, 13);
5233 cd->virtualExpr = sqliteConnector_.getData<
string>(r, 14);
5234 cd->isDeletedCol = sqliteConnector_.getData<
bool>(r, 15);
5235 if (sqliteConnector_.isNull(r, 16)) {
5236 cd->default_value = std::nullopt;
5238 cd->default_value = std::make_optional(sqliteConnector_.getData<
string>(r, 16));
5240 cd->isGeoPhyCol = skip_physical_cols-- > 0;
5241 cd->db_id = getDatabaseId();
5243 smart_cds.emplace_back(std::move(cd));
5247 for (
auto& cd : smart_cds) {
5248 cds.emplace_back(cd.release());
5255 "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
5256 "max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, "
5257 "num_shards, key_metainfo, userid, sort_column_id, storage_type, "
5258 "max_rollback_epochs, is_system_table from mapd_tables WHERE tableid = " +
5260 sqliteConnector_.query(query);
5261 auto numRows = sqliteConnector_.getNumRows();
5266 const auto& storage_type = sqliteConnector_.getData<
string>(0, 17);
5268 const auto& table_name = sqliteConnector_.getData<
string>(0, 1);
5269 LOG(
FATAL) <<
"Unable to read Catalog metadata: storage type is currently not a "
5270 "supported table option (table "
5271 << table_name <<
" [" << table_id <<
"] in database " << currentDB_.dbName
5278 std::unique_ptr<TableDescriptor> td;
5280 ? std::make_unique<foreign_storage::ForeignTable>()
5281 : std::make_unique<TableDescriptor>();
5283 td->tableId = sqliteConnector_.getData<
int>(0, 0);
5284 td->tableName = sqliteConnector_.getData<
string>(0, 1);
5285 td->nColumns = sqliteConnector_.getData<
int>(0, 2);
5286 td->isView = sqliteConnector_.getData<
bool>(0, 3);
5287 td->fragments = sqliteConnector_.getData<
string>(0, 4);
5289 sqliteConnector_.getData<
int>(0, 5));
5290 td->maxFragRows = sqliteConnector_.getData<
int>(0, 6);
5291 td->maxChunkSize = sqliteConnector_.getData<int64_t>(0, 7);
5292 td->fragPageSize = sqliteConnector_.getData<
int>(0, 8);
5293 td->maxRows = sqliteConnector_.getData<int64_t>(0, 9);
5294 td->partitions = sqliteConnector_.getData<
string>(0, 10);
5295 td->shardedColumnId = sqliteConnector_.getData<
int>(0, 11);
5296 td->shard = sqliteConnector_.getData<
int>(0, 12);
5297 td->nShards = sqliteConnector_.getData<
int>(0, 13);
5298 td->keyMetainfo = sqliteConnector_.getData<
string>(0, 14);
5299 td->userId = sqliteConnector_.getData<
int>(0, 15);
5300 td->sortedColumnId =
5301 sqliteConnector_.isNull(0, 16) ? 0 : sqliteConnector_.getData<
int>(0, 16);
5302 td->storageType = storage_type;
5303 td->maxRollbackEpochs = sqliteConnector_.getData<
int>(0, 18);
5304 td->is_system_table = sqliteConnector_.getData<
bool>(0, 19);
5305 td->hasDeletedCol =
false;
5308 updateViewUnlocked(*td);
5310 td->fragmenter =
nullptr;
5313 if (
auto ftd = dynamic_cast<foreign_storage::ForeignTable*>(td.get())) {
5314 reloadForeignTableUnlocked(*ftd);
5317 return td.release();
5320 void Catalog::setForeignServerProperty(
const std::string& server_name,
5321 const std::string& property,
5322 const std::string& value) {
5324 sqliteConnector_.query_with_text_params(
5325 "SELECT id from omnisci_foreign_servers where name = ?",
5326 std::vector<std::string>{server_name});
5327 auto num_rows = sqliteConnector_.getNumRows();
5330 auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
5331 sqliteConnector_.query_with_text_params(
5332 "UPDATE omnisci_foreign_servers SET " + property +
" = ? WHERE id = ?",
5335 throw std::runtime_error{
"Can not change property \"" +
property +
5336 "\" for foreign server." +
" Foreign server \"" +
5337 server_name +
"\" is not found."};
5341 void Catalog::createDefaultServersIfNotExists() {
5347 auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
5348 "default_local_delimited",
5352 local_csv_server->validate();
5353 createForeignServerNoLocks(std::move(local_csv_server),
true);
5355 #ifdef ENABLE_IMPORT_PARQUET
5356 auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
5357 "default_local_parquet",
5361 local_parquet_server->validate();
5362 createForeignServerNoLocks(std::move(local_parquet_server),
true);
5365 auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
5366 "default_local_regex_parsed",
5370 local_regex_parser_server->validate();
5371 createForeignServerNoLocks(std::move(local_regex_parser_server),
true);
5375 void Catalog::setForReload(
const int32_t tableId) {
5376 const auto td = getMetadataForTable(tableId);
5377 for (
const auto shard : getPhysicalTablesDescriptors(td)) {
5378 const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
5379 setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
5384 std::vector<std::string> Catalog::getTableDataDirectories(
5386 const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
5387 std::vector<std::string> file_paths;
5388 for (
auto shard : getPhysicalTablesDescriptors(td)) {
5390 global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
5391 boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
5392 file_paths.push_back(file_path.filename().string());
5399 bool file_name_only)
const {