26 #include <boost/algorithm/string/predicate.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/range/adaptor/map.hpp>
29 #include <boost/version.hpp>
42 #if BOOST_VERSION >= 106600
43 #include <boost/uuid/detail/sha1.hpp>
45 #include <boost/uuid/sha1.hpp>
47 #include <rapidjson/document.h>
48 #include <rapidjson/istreamwrapper.h>
49 #include <rapidjson/ostreamwrapper.h>
50 #include <rapidjson/writer.h>
80 #include "MapDRelease.h"
92 using std::runtime_error;
111 namespace Catalog_Namespace {
137 "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
144 "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
145 "userid integer references mapd_users, state text, image_hash text, update_time "
147 "metadata text, UNIQUE(userid, name) )");
150 "insert into mapd_dashboards (id, name , "
151 "userid, state, image_hash, update_time , "
153 "SELECT viewid , name , userid, view_state, image_hash, update_time, "
155 "from mapd_frontend_views");
156 }
catch (
const std::exception& e) {
166 const std::string& db_name) {
168 db_name +
"_temp_tables.json");
178 std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
179 const std::vector<LeafHostInfo>& string_dict_hosts,
180 std::shared_ptr<Calcite> calcite,
182 : basePath_(basePath)
186 , string_dict_hosts_(string_dict_hosts)
187 , calciteMgr_(calcite)
190 , dcatalogMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
202 ,
dsqliteMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
215 CheckAndExecuteMigrations();
221 createDefaultServersIfNotExists();
224 CheckAndExecuteMigrationsPostBuildMaps();
229 conditionallyInitializeSystemObjects();
241 tableDescIt->second->fragmenter =
nullptr;
242 delete tableDescIt->second;
250 delete columnDescIt->second;
273 std::vector<std::string> cols;
277 if (std::find(cols.begin(), cols.end(), std::string(
"max_chunk_size")) ==
279 string queryString(
"ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
283 if (std::find(cols.begin(), cols.end(), std::string(
"shard_column_id")) ==
285 string queryString(
"ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
289 if (std::find(cols.begin(), cols.end(), std::string(
"shard")) == cols.end()) {
290 string queryString(
"ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
294 if (std::find(cols.begin(), cols.end(), std::string(
"num_shards")) == cols.end()) {
295 string queryString(
"ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
299 if (std::find(cols.begin(), cols.end(), std::string(
"key_metainfo")) == cols.end()) {
300 string queryString(
"ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
303 if (std::find(cols.begin(), cols.end(), std::string(
"userid")) == cols.end()) {
304 string queryString(
"ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
308 if (std::find(cols.begin(), cols.end(), std::string(
"sort_column_id")) ==
311 "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
313 if (std::find(cols.begin(), cols.end(), std::string(
"storage_type")) == cols.end()) {
314 string queryString(
"ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
317 if (std::find(cols.begin(), cols.end(), std::string(
"max_rollback_epochs")) ==
319 string queryString(
"ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
323 if (std::find(cols.begin(), cols.end(), std::string(
"is_system_table")) ==
325 string queryString(
"ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
328 }
catch (std::exception& e) {
340 "select name from sqlite_master WHERE type='table' AND "
341 "name='mapd_version_history'");
344 "CREATE TABLE mapd_version_history(version integer, migration_history text "
348 "select * from mapd_version_history where migration_history = "
349 "'notnull_fixlen_arrays'");
359 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
361 LOG(
INFO) <<
"Updating mapd_columns, legacy fixlen arrays";
363 string queryString(
"UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
366 }
catch (std::exception& e) {
378 "select name from sqlite_master WHERE type='table' AND "
379 "name='mapd_version_history'");
382 "CREATE TABLE mapd_version_history(version integer, migration_history text "
386 "select * from mapd_version_history where migration_history = "
387 "'notnull_geo_columns'");
397 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
399 LOG(
INFO) <<
"Updating mapd_columns, legacy geo columns";
408 }
catch (std::exception& e) {
421 "SELECT name FROM sqlite_master WHERE type='table' AND "
422 "name='mapd_frontend_views'");
430 std::vector<std::string> cols;
434 if (std::find(cols.begin(), cols.end(), std::string(
"image_hash")) == cols.end()) {
437 if (std::find(cols.begin(), cols.end(), std::string(
"update_time")) == cols.end()) {
440 if (std::find(cols.begin(), cols.end(), std::string(
"view_metadata")) == cols.end()) {
443 }
catch (std::exception& e) {
455 "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
456 "integer references mapd_users, "
457 "link text unique, view_state text, update_time timestamp, view_metadata text)");
459 std::vector<std::string> cols;
463 if (std::find(cols.begin(), cols.end(), std::string(
"view_metadata")) == cols.end()) {
466 }
catch (
const std::exception& e) {
480 "SELECT name FROM sqlite_master WHERE type='table' AND "
481 "name='mapd_frontend_views'");
489 "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
490 }
catch (
const std::exception& e) {
510 std::vector<std::string> cols;
514 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
515 LOG(
INFO) <<
"Updating mapd_tables updatePageSize";
520 string queryString(
"ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
524 }
catch (std::exception& e) {
536 std::vector<std::string> cols;
540 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
541 LOG(
INFO) <<
"Updating mapd_columns updateDeletedColumnIndicator";
543 string queryString(
"ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
549 "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
551 }
catch (std::exception& e) {
563 std::vector<std::string> cols;
567 if (std::find(cols.begin(), cols.end(), std::string(
"default_value")) == cols.end()) {
568 LOG(
INFO) <<
"Adding support for default values to mapd_columns";
571 }
catch (std::exception& e) {
573 LOG(
ERROR) <<
"Failed to make metadata update for default values` support";
591 std::vector<std::string> cols;
595 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
598 string dictQuery(
"SELECT dictid, name from mapd_dictionaries");
601 for (
size_t r = 0; r < numRows; ++r) {
611 int result = rename(oldName.c_str(), newName.c_str());
614 LOG(
INFO) <<
"Dictionary upgrade: successfully renamed " << oldName <<
" to "
617 LOG(
ERROR) <<
"Failed to rename old dictionary directory " << oldName <<
" to "
623 string queryString(
"ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
627 }
catch (std::exception& e) {
639 "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
640 "logical_table_id integer, physical_table_id integer)");
641 }
catch (
const std::exception& e) {
658 const auto physicalTables = physicalTableIt->second;
659 CHECK(!physicalTables.empty());
660 for (
size_t i = 0; i < physicalTables.size(); i++) {
661 int32_t physical_tb_id = physicalTables[i];
663 "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
664 "physical_table_id) VALUES (?1, ?2)",
669 }
catch (std::exception& e) {
681 std::vector<std::string> cols;
685 if (std::find(cols.begin(), cols.end(), std::string(
"refcount")) == cols.end()) {
688 }
catch (std::exception& e) {
701 }
catch (std::exception& e) {
714 "select name from sqlite_master WHERE type='table' AND "
715 "name='mapd_version_history'");
716 static const std::string migration_name{
"rename_legacy_data_wrappers"};
719 "CREATE TABLE mapd_version_history(version integer, migration_history text "
723 "select * from mapd_version_history where migration_history = "
725 migration_name +
"'");
732 LOG(
INFO) <<
"Executing " << migration_name <<
" migration.";
737 std::map<std::string, std::string> old_to_new_wrapper_names{
738 {
"OMNISCI_CSV", DataWrapperType::CSV},
739 {
"OMNISCI_PARQUET", DataWrapperType::PARQUET},
740 {
"OMNISCI_REGEX_PARSER", DataWrapperType::REGEX_PARSER},
741 {
"OMNISCI_INTERNAL_CATALOG", DataWrapperType::INTERNAL_CATALOG},
742 {
"INTERNAL_OMNISCI_MEMORY_STATS", DataWrapperType::INTERNAL_MEMORY_STATS},
743 {
"INTERNAL_OMNISCI_STORAGE_STATS", DataWrapperType::INTERNAL_STORAGE_STATS}
747 for (
const auto& [old_wrapper_name, new_wrapper_name] : old_to_new_wrapper_names) {
749 "UPDATE omnisci_foreign_servers SET data_wrapper_type = ? WHERE "
750 "data_wrapper_type = ?",
751 std::vector<std::string>{new_wrapper_name, old_wrapper_name});
756 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
758 LOG(
INFO) << migration_name <<
" migration completed.";
759 }
catch (std::exception& e) {
771 }
catch (
const std::exception& e) {
779 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
780 "omnisci_foreign_servers(id integer primary key, name text unique, " +
781 "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
786 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
787 "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
788 "options text, last_refresh_time integer, next_refresh_time integer, " +
789 "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
790 "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
794 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
795 "omnisci_custom_expressions(id integer primary key, name text, " +
796 "expression_json text, data_source_type text, " +
797 "data_source_id integer, is_deleted boolean)";
803 std::vector<DBObject> objects;
806 "SELECT name FROM sqlite_master WHERE type='table' AND "
807 "name='mapd_record_ownership_marker'");
834 "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
837 static const std::map<const DBObjectType, const AccessPrivileges>
838 object_level_all_privs_lookup{
848 auto _key_place = [&key](
auto type) {
852 for (
auto& it : object_level_all_privs_lookup) {
853 objects.emplace_back(_key_place(it.first), it.second, db.
dbOwner);
860 "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
863 for (
size_t r = 0; r < numRows; ++r) {
882 objects.push_back(obj);
888 string tableQuery(
"SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
891 for (
size_t r = 0; r < numRows; ++r) {
907 objects.push_back(obj);
910 }
catch (
const std::exception& e) {
920 }
catch (
const std::exception& e) {
921 LOG(
ERROR) <<
" Issue during migration of DB " <<
name() <<
" issue was " << e.what();
922 throw std::runtime_error(
" Issue during migration of DB " +
name() +
" issue was " +
940 std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
941 std::vector<std::string> dashboard_ids;
942 static const std::string migration_name{
"dashboard_roles_migration"};
950 "select * from mapd_version_history where migration_history = '" +
951 migration_name +
"'");
957 LOG(
INFO) <<
"Performing dashboard internal roles Migration.";
971 }
catch (
const std::exception& e) {
978 const auto active_grantees =
983 for (
auto dash : dashboards) {
988 auto result = active_grantees.find(dash.first);
989 if (
result != active_grantees.end()) {
999 "select * from mapd_version_history where migration_history = '" +
1000 migration_name +
"'");
1005 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
1007 }
catch (
const std::exception& e) {
1008 LOG(
ERROR) <<
"Failed to create dashboard system roles during migration: "
1012 LOG(
INFO) <<
"Successfully created dashboard system roles during migration.";
1047 std::map<int32_t, std::string> user_name_by_user_id;
1048 for (
const auto& user : users) {
1049 user_name_by_user_id[user.userId] = user.userName;
1051 return user_name_by_user_id;
1056 const std::map<int32_t, std::string>& user_name_by_user_id) {
1057 auto entry = user_name_by_user_id.find(
id);
1058 if (entry != user_name_by_user_id.end()) {
1059 return entry->second;
1068 if (column_type.is_dict_encoded_string() ||
1069 column_type.is_subtype_dict_encoded_string()) {
1076 std::string dictQuery(
1077 "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1080 for (
size_t r = 0; r < numRows; ++r) {
1090 dict_ref, dictName, dictNBits, is_shared, refcount, fname,
false);
1111 std::list<ColumnDescriptor*> original_cds;
1114 original_td = it1->second;
1115 if (dynamic_cast<foreign_storage::ForeignTable*>(original_td)) {
1126 CHECK_EQ(original_td, it2->second);
1133 for (
int column_id = 0; column_id < original_td->
nColumns; ++column_id) {
1137 original_cds.push_back(original_cd);
1145 td = createTableFromDiskUnlocked(table_id);
1146 }
catch (
const NoTableFoundException& e) {
1151 if (
auto tableDescIt = tableDescriptorMapById_.find(table_id);
1152 tableDescIt != tableDescriptorMapById_.end()) {
1153 tableDescIt->second->fragmenter =
nullptr;
1154 delete tableDescIt->second;
1158 auto cds = sqliteGetColumnsForTableUnlocked(table_id);
1162 td->mutex_ = original_td->mutex_;
1164 original_td =
nullptr;
1169 original_cds.clear();
1170 tableDescriptorMap_[
to_upper(td->tableName)] = td;
1171 tableDescriptorMapById_[td->tableId] = td;
1172 int32_t skip_physical_cols = 0;
1176 if (skip_physical_cols <= 0) {
1177 skip_physical_cols = cd->columnType.get_physical_cols();
1180 if (cd->isDeletedCol) {
1181 td->hasDeletedCol =
true;
1182 setDeletedColumnUnlocked(td, cd);
1183 }
else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1184 td->columnIdBySpi_.push_back(cd->columnId);
1189 calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
1193 void Catalog::reloadCatalogMetadata(
1194 const std::map<int32_t, std::string>& user_name_by_user_id) {
1201 void Catalog::reloadCatalogMetadataUnlocked(
1202 const std::map<int32_t, std::string>& user_name_by_user_id) {
1212 std::set<int> cluster_table_ids;
1213 std::string tableQuery(
"SELECT tableid from mapd_tables");
1214 sqliteConnector_.query(tableQuery);
1215 auto numRows = sqliteConnector_.getNumRows();
1216 for (
size_t r = 0; r < numRows; ++r) {
1217 const auto table_id = sqliteConnector_.getData<
int>(r, 0);
1218 cluster_table_ids.insert(table_id);
1223 std::set<int> ignored_table_ids;
1236 std::set<int> reload_table_ids;
1237 for (
auto const& cluster_table_id : cluster_table_ids) {
1238 if (ignored_table_ids.find(cluster_table_id) == ignored_table_ids.end()) {
1239 reload_table_ids.insert(cluster_table_id);
1242 for (
auto const& [cached_table_id, td] : tableDescriptorMapById_) {
1243 if (cluster_table_ids.find(cached_table_id) == cluster_table_ids.end()) {
1244 reload_table_ids.insert(cached_table_id);
1249 for (
auto const& reload_table_id : reload_table_ids) {
1250 reloadTableMetadataUnlocked(reload_table_id);
1255 dashboardDescriptorMap_.clear();
1256 linkDescriptorMap_.clear();
1257 linkDescriptorMapById_.clear();
1258 foreignServerMap_.clear();
1259 foreignServerMapById_.clear();
1260 custom_expr_map_by_id_.clear();
1263 buildForeignServerMapUnlocked();
1266 updateViewsInMapUnlocked();
1267 buildDashboardsMapUnlocked(user_name_by_user_id);
1268 buildLinksMapUnlocked();
1269 buildCustomExpressionsMapUnlocked();
1273 calciteMgr_->updateMetadata(currentDB_.dbName, {});
1277 void Catalog::buildTablesMapUnlocked() {
1278 std::string tableQuery(
1279 "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1280 "max_chunk_size, frag_page_size, "
1281 "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
1282 "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
1283 "from mapd_tables");
1284 sqliteConnector_.query(tableQuery);
1285 auto numRows = sqliteConnector_.getNumRows();
1286 for (
size_t r = 0; r < numRows; ++r) {
1288 const auto& storage_type = sqliteConnector_.getData<
string>(r, 17);
1290 const auto table_id = sqliteConnector_.getData<
int>(r, 0);
1291 const auto& table_name = sqliteConnector_.getData<
string>(r, 1);
1292 LOG(
FATAL) <<
"Unable to read Catalog metadata: storage type is currently not a "
1293 "supported table option (table "
1294 << table_name <<
" [" << table_id <<
"] in database "
1295 << currentDB_.dbName <<
").";
1305 td->
tableId = sqliteConnector_.getData<
int>(r, 0);
1306 td->
tableName = sqliteConnector_.getData<
string>(r, 1);
1307 td->
nColumns = sqliteConnector_.getData<
int>(r, 2);
1308 td->
isView = sqliteConnector_.getData<
bool>(r, 3);
1309 td->
fragments = sqliteConnector_.getData<
string>(r, 4);
1312 td->
maxFragRows = sqliteConnector_.getData<
int>(r, 6);
1313 td->
maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1314 td->
fragPageSize = sqliteConnector_.getData<
int>(r, 8);
1315 td->
maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1316 td->
partitions = sqliteConnector_.getData<
string>(r, 10);
1318 td->
shard = sqliteConnector_.getData<
int>(r, 12);
1319 td->
nShards = sqliteConnector_.getData<
int>(r, 13);
1320 td->
keyMetainfo = sqliteConnector_.getData<
string>(r, 14);
1321 td->
userId = sqliteConnector_.getData<
int>(r, 15);
1323 sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<
int>(r, 16);
1332 tableDescriptorMapById_[td->
tableId] = td;
1336 void Catalog::buildColumnsMapUnlocked() {
1337 std::string columnQuery(
1338 "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1339 "is_notnull, compression, comp_param, "
1340 "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1341 "default_value from "
1342 "mapd_columns ORDER BY tableid, "
1344 sqliteConnector_.query(columnQuery);
1345 auto numRows = sqliteConnector_.getNumRows();
1346 int32_t skip_physical_cols = 0;
1347 for (
size_t r = 0; r < numRows; ++r) {
1349 cd->
tableId = sqliteConnector_.getData<
int>(r, 0);
1350 cd->
columnId = sqliteConnector_.getData<
int>(r, 1);
1351 cd->
columnName = sqliteConnector_.getData<
string>(r, 2);
1360 cd->
chunks = sqliteConnector_.getData<
string>(r, 11);
1361 cd->
isSystemCol = sqliteConnector_.getData<
bool>(r, 12);
1362 cd->
isVirtualCol = sqliteConnector_.getData<
bool>(r, 13);
1363 cd->
virtualExpr = sqliteConnector_.getData<
string>(r, 14);
1364 cd->
isDeletedCol = sqliteConnector_.getData<
bool>(r, 15);
1365 if (sqliteConnector_.isNull(r, 16)) {
1368 cd->
default_value = std::make_optional(sqliteConnector_.getData<
string>(r, 16));
1371 cd->
db_id = getDatabaseId();
1375 if (skip_physical_cols <= 0) {
1379 auto td_itr = tableDescriptorMapById_.find(cd->
tableId);
1380 CHECK(td_itr != tableDescriptorMapById_.end());
1383 td_itr->second->hasDeletedCol =
true;
1384 setDeletedColumnUnlocked(td_itr->second, cd);
1386 tableDescriptorMapById_[cd->
tableId]->columnIdBySpi_.push_back(cd->
columnId);
1391 for (
auto& tit : tableDescriptorMapById_) {
1392 std::sort(tit.second->columnIdBySpi_.begin(),
1393 tit.second->columnIdBySpi_.end(),
1394 [](
const size_t a,
const size_t b) ->
bool {
return a < b; });
1399 std::string viewQuery(
"SELECT sql FROM mapd_views where tableid = " +
1401 sqliteConnector_.query(viewQuery);
1402 auto num_rows = sqliteConnector_.getNumRows();
1403 CHECK_EQ(num_rows, 1U) <<
"Expected single entry in mapd_views for view '"
1404 << td.
tableName <<
"', instead got " << num_rows;
1405 td.
viewSQL = sqliteConnector_.getData<
string>(0, 0);
1408 void Catalog::updateViewsInMapUnlocked() {
1409 std::string viewQuery(
"SELECT tableid, sql FROM mapd_views");
1410 sqliteConnector_.query(viewQuery);
1411 auto numRows = sqliteConnector_.getNumRows();
1412 for (
size_t r = 0; r < numRows; ++r) {
1413 auto tableId = sqliteConnector_.getData<
int>(r, 0);
1414 auto td = tableDescriptorMapById_[tableId];
1415 td->viewSQL = sqliteConnector_.getData<
string>(r, 1);
1416 td->fragmenter =
nullptr;
1420 void Catalog::buildDashboardsMapUnlocked(
1421 const std::map<int32_t, std::string>& user_name_by_user_id) {
1422 std::string frontendViewQuery(
1423 "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1426 "FROM mapd_dashboards");
1427 sqliteConnector_.query(frontendViewQuery);
1428 auto numRows = sqliteConnector_.getNumRows();
1429 for (
size_t r = 0; r < numRows; ++r) {
1430 auto vd = std::make_shared<DashboardDescriptor>();
1431 vd->dashboardId = sqliteConnector_.getData<
int>(r, 0);
1432 vd->dashboardState = sqliteConnector_.getData<
string>(r, 1);
1433 vd->dashboardName = sqliteConnector_.getData<
string>(r, 2);
1434 vd->imageHash = sqliteConnector_.getData<
string>(r, 3);
1435 vd->updateTime = sqliteConnector_.getData<
string>(r, 4);
1436 vd->userId = sqliteConnector_.getData<
int>(r, 5);
1437 vd->dashboardMetadata = sqliteConnector_.getData<
string>(r, 6);
1440 std::to_string(currentDB_.dbId), sqliteConnector_.getData<
string>(r, 0));
1441 dashboardDescriptorMap_[
std::to_string(vd->userId) +
":" + vd->dashboardName] = vd;
1445 void Catalog::buildLinksMapUnlocked() {
1446 std::string linkQuery(
1447 "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1448 "update_time), view_metadata "
1450 sqliteConnector_.query(linkQuery);
1451 auto numRows = sqliteConnector_.getNumRows();
1452 for (
size_t r = 0; r < numRows; ++r) {
1454 ld->linkId = sqliteConnector_.getData<
int>(r, 0);
1455 ld->userId = sqliteConnector_.getData<
int>(r, 1);
1456 ld->link = sqliteConnector_.getData<
string>(r, 2);
1457 ld->viewState = sqliteConnector_.getData<
string>(r, 3);
1458 ld->updateTime = sqliteConnector_.getData<
string>(r, 4);
1459 ld->viewMetadata = sqliteConnector_.getData<
string>(r, 5);
1460 linkDescriptorMap_[
std::to_string(currentDB_.dbId) + ld->link] = ld;
1461 linkDescriptorMapById_[ld->linkId] = ld;
1465 void Catalog::buildLogicalToPhysicalMapUnlocked() {
1467 std::string logicalToPhysicalTableMapQuery(
1468 "SELECT logical_table_id, physical_table_id "
1469 "FROM mapd_logical_to_physical");
1470 sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1471 auto numRows = sqliteConnector_.getNumRows();
1472 for (
size_t r = 0; r < numRows; ++r) {
1473 auto logical_tb_id = sqliteConnector_.getData<
int>(r, 0);
1474 auto physical_tb_id = sqliteConnector_.getData<
int>(r, 1);
1475 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1476 if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1478 std::vector<int32_t> physicalTables{physical_tb_id};
1480 logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1481 CHECK(it_ok.second);
1484 physicalTableIt->second.push_back(physical_tb_id);
1492 void Catalog::buildMaps() {
1501 buildDictionaryMapUnlocked();
1502 buildTablesMapUnlocked();
1505 buildForeignServerMapUnlocked();
1506 updateForeignTablesInMapUnlocked();
1509 buildColumnsMapUnlocked();
1510 updateViewsInMapUnlocked();
1511 buildDashboardsMapUnlocked(user_name_by_user_id);
1512 buildLinksMapUnlocked();
1513 buildLogicalToPhysicalMapUnlocked();
1514 buildCustomExpressionsMapUnlocked();
1517 void Catalog::buildCustomExpressionsMapUnlocked() {
1518 sqliteConnector_.query(
1519 "SELECT id, name, expression_json, data_source_type, data_source_id, "
1521 "FROM omnisci_custom_expressions");
1522 auto num_rows = sqliteConnector_.getNumRows();
1523 for (
size_t row = 0; row < num_rows; row++) {
1524 auto custom_expr = getCustomExpressionFromConnector(row);
1525 custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1529 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(
size_t row) {
1530 auto id = sqliteConnector_.getData<
int>(row, 0);
1531 auto name = sqliteConnector_.getData<
string>(row, 1);
1532 auto expression_json = sqliteConnector_.getData<
string>(row, 2);
1533 auto data_source_type_str = sqliteConnector_.getData<
string>(row, 3);
1534 auto data_source_id = sqliteConnector_.getData<
int>(row, 4);
1535 auto is_deleted = sqliteConnector_.getData<
bool>(row, 5);
1536 return std::make_unique<CustomExpression>(
1540 CustomExpression::dataSourceTypeFromString(data_source_type_str),
1546 const list<ColumnDescriptor>& columns,
1547 const list<DictDescriptor>& dicts) {
1552 if (foreign_table) {
1554 *new_foreign_table = *foreign_table;
1555 new_td = new_foreign_table;
1561 new_td->
mutex_ = std::make_shared<std::mutex>();
1563 tableDescriptorMapById_[td->
tableId] = new_td;
1564 for (
auto cd : columns) {
1567 addToColumnMap(new_cd);
1570 if (cd.isDeletedCol) {
1572 setDeletedColumnUnlocked(new_td, new_cd);
1578 [](
const size_t a,
const size_t b) ->
bool {
return a < b; });
1582 std::unique_ptr<StringDictionaryClient> client;
1583 DictRef dict_ref(currentDB_.dbId, -1);
1584 if (!string_dict_hosts_.empty()) {
1587 for (
auto dd : dicts) {
1588 if (!dd.dictRef.dictId) {
1592 dict_ref.
dictId = dd.dictRef.dictId;
1594 client->create(dict_ref, dd.dictIsTemp);
1597 dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1598 if (!dd.dictIsTemp) {
1604 void Catalog::removeTableFromMap(
const string& tableName,
1606 const bool is_on_error) {
1608 TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1609 if (tableDescIt == tableDescriptorMapById_.end()) {
1616 const auto ret = deletedColumnPerTable_.erase(td);
1620 tableDescriptorMapById_.erase(tableDescIt);
1621 tableDescriptorMap_.erase(
to_upper(tableName));
1623 dict_columns_by_table_id_.erase(tableId);
1628 std::unique_ptr<StringDictionaryClient> client;
1630 CHECK(!string_dict_hosts_.empty());
1631 DictRef dict_ref(currentDB_.dbId, -1);
1637 for (
auto cit = columnDescriptorMapById_.begin();
1638 cit != columnDescriptorMapById_.end();) {
1639 if (tableId != std::get<0>(cit->first)) {
1642 int i = std::get<1>(cit++->first);
1644 ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1646 columnDescriptorMapById_.erase(colDescIt);
1648 columnDescriptorMap_.erase(cnameKey);
1654 DictRef dict_ref(currentDB_.dbId, dictId);
1655 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1660 CHECK(dictIt != dictDescriptorMapByRef_.end());
1662 if (dictIt == dictDescriptorMapByRef_.end()) {
1666 const auto& dd = dictIt->second;
1669 if (!dd->refcount) {
1670 dd->stringDict.reset();
1675 client->drop(dict_ref);
1677 dictDescriptorMapByRef_.erase(dictIt);
1688 addFrontendViewToMapNoLock(vd);
1694 std::make_shared<DashboardDescriptor>(vd);
1697 std::vector<DBObject> Catalog::parseDashboardObjects(
const std::string& view_meta,
1698 const int& user_id) {
1699 std::vector<DBObject> objects;
1701 key.
dbId = currentDB_.dbId;
1702 auto _key_place = [&key](
auto type,
auto id) {
1708 auto td = getMetadataForTable(object_name,
false);
1712 LOG(
INFO) <<
"Ignoring dashboard source Table/View: " << object_name
1713 <<
" no longer exists in current DB.";
1720 objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1722 objects.back().setName(td->tableName);
1727 void Catalog::createOrUpdateDashboardSystemRole(
const std::string& view_meta,
1728 const int32_t& user_id,
1729 const std::string& dash_role_name) {
1730 auto objects = parseDashboardObjects(view_meta, user_id);
1737 dash_role_name,
false,
false);
1742 std::set<DBObjectKey> revoke_keys;
1744 for (
auto key : *ex_objects | boost::adaptors::map_keys) {
1750 for (
auto obj : objects) {
1751 found = key == obj.getObjectKey() ?
true :
false;
1757 revoke_keys.insert(key);
1760 for (
auto& key : revoke_keys) {
1778 linkDescriptorMapById_[ld.
linkId] = new_ld;
1786 vector<Chunk> chunkVec;
1787 auto columnDescs = getAllColumnMetadataForTable(td->
tableId,
true,
false,
true);
1788 Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1791 td->
fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1803 td->
fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1817 LOG(
INFO) <<
"Instantiating Fragmenter for table " << td->
tableName <<
" took "
1822 const std::string& tableName)
const {
1823 auto tableDescIt = tableDescriptorMap_.find(
to_upper(tableName));
1824 if (tableDescIt == tableDescriptorMap_.end()) {
1831 const std::string& tableName)
const {
1833 return getForeignTableUnlocked(tableName);
1836 const TableDescriptor* Catalog::getMetadataForTable(
const string& tableName,
1837 const bool populateFragmenter)
const {
1841 auto td = getMutableMetadataForTableUnlocked(tableName);
1846 if (populateFragmenter) {
1847 std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1848 if (td->fragmenter ==
nullptr && !td->isView) {
1849 instantiateFragmenter(td);
1856 bool populateFragmenter)
const {
1858 auto td = getMutableMetadataForTableUnlocked(table_id);
1863 if (populateFragmenter) {
1864 std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1865 if (td->fragmenter ==
nullptr && !td->isView) {
1866 instantiateFragmenter(td);
1872 std::optional<std::string> Catalog::getTableName(int32_t table_id)
const {
1874 auto td = getMutableMetadataForTableUnlocked(table_id);
1878 return td->tableName;
1881 std::optional<int32_t> Catalog::getTableId(
const std::string& table_name)
const {
1883 auto td = getMutableMetadataForTableUnlocked(table_name);
1891 const std::string& table_name)
const {
1892 auto it = tableDescriptorMap_.find(
to_upper(table_name));
1893 if (it == tableDescriptorMap_.end()) {
1900 auto tableDescIt = tableDescriptorMapById_.find(table_id);
1901 if (tableDescIt == tableDescriptorMapById_.end()) {
1904 return tableDescIt->second;
1908 const bool load_dict)
const {
1910 const DictRef dictRef(currentDB_.dbId, dict_id);
1911 auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1913 dictDescriptorMapByRef_.end()) {
1916 auto& dd = dictDescIt->second;
1920 if (!dd->stringDict) {
1922 if (string_dict_hosts_.empty()) {
1923 if (dd->dictIsTemp) {
1924 dd->stringDict = std::make_shared<StringDictionary>(
1927 dd->stringDict = std::make_shared<StringDictionary>(
1932 std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1935 LOG(
INFO) <<
"Time to load Dictionary " << dd->dictRef.dbId <<
"_"
1936 << dd->dictRef.dictId <<
" was " << time_ms <<
"ms";
1943 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts()
const {
1944 return string_dict_hosts_;
1948 const string& columnName)
const {
1952 auto colDescIt = columnDescriptorMap_.find(columnKey);
1954 columnDescriptorMap_.end()) {
1957 return colDescIt->second;
1963 auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1964 if (colDescIt == columnDescriptorMapById_
1968 return colDescIt->second;
1971 const std::optional<std::string> Catalog::getColumnName(
int table_id,
1972 int column_id)
const {
1974 auto it = columnDescriptorMapById_.find(
ColumnIdKey{table_id, column_id});
1975 if (it == columnDescriptorMapById_.end()) {
1978 return it->second->columnName;
1981 const int Catalog::getColumnIdBySpiUnlocked(
const int table_id,
const size_t spi)
const {
1982 const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1983 CHECK(tableDescriptorMapById_.end() != tabDescIt);
1984 const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1994 CHECK(0 < spx && spx <= columnIdBySpi.size())
1995 <<
"spx = " << spx <<
", size = " << columnIdBySpi.size();
1996 return columnIdBySpi[spx - 1] + phi;
1999 const int Catalog::getColumnIdBySpi(
const int table_id,
const size_t spi)
const {
2001 return getColumnIdBySpiUnlocked(table_id, spi);
2005 const size_t spi)
const {
2008 const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
2010 const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2011 return columnDescriptorMapById_.end() == colDescIt ?
nullptr : colDescIt->second;
2014 void Catalog::deleteMetadataForDashboards(
const std::vector<int32_t> dashboard_ids,
2016 std::stringstream invalid_ids, restricted_ids;
2018 for (int32_t dashboard_id : dashboard_ids) {
2019 if (!getMetadataForDashboard(dashboard_id)) {
2020 invalid_ids << (!invalid_ids.str().empty() ?
", " :
"") << dashboard_id;
2024 object.loadKey(*
this);
2026 std::vector<DBObject> privs = {
object};
2028 restricted_ids << (!restricted_ids.str().empty() ?
", " :
"") << dashboard_id;
2032 if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
2033 std::stringstream error_message;
2034 error_message <<
"Delete dashboard(s) failed with error(s):";
2035 if (invalid_ids.str().size() > 0) {
2036 error_message <<
"\nDashboard id: " << invalid_ids.str()
2037 <<
" - Dashboard id does not exist";
2039 if (restricted_ids.str().size() > 0) {
2041 <<
"\nDashboard id: " << restricted_ids.str()
2042 <<
" - User should be either owner of dashboard or super user to delete it";
2044 throw std::runtime_error(error_message.str());
2046 std::vector<DBObject> dash_objs;
2048 for (int32_t dashboard_id : dashboard_ids) {
2057 sqliteConnector_.query(
"BEGIN TRANSACTION");
2059 for (int32_t dashboard_id : dashboard_ids) {
2060 auto dash = getMetadataForDashboard(dashboard_id);
2064 throw std::runtime_error(
2065 std::string(
"Delete dashboard(s) failed with error(s):\nDashboard id: ") +
2066 std::to_string(dashboard_id) +
" - Dashboard id does not exist ");
2069 std::string dash_name = dash->dashboardName;
2070 auto viewDescIt = dashboardDescriptorMap_.find(user_id +
":" + dash_name);
2071 dashboardDescriptorMap_.erase(viewDescIt);
2072 sqliteConnector_.query_with_text_params(
2073 "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
2074 std::vector<std::string>{dash_name, user_id});
2076 }
catch (std::exception& e) {
2077 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2080 sqliteConnector_.query(
"END TRANSACTION");
2085 const string& userId,
2086 const string& dashName)
const {
2089 auto viewDescIt = dashboardDescriptorMap_.find(userId +
":" + dashName);
2090 if (viewDescIt == dashboardDescriptorMap_.end()) {
2093 return viewDescIt->second.get();
2102 for (
auto descp : dashboardDescriptorMap_) {
2103 auto dash = descp.second.get();
2104 if (dash->dashboardId ==
id) {
2106 name = dash->dashboardName;
2113 return getMetadataForDashboard(userId, name);
2118 const LinkDescriptor* Catalog::getMetadataForLink(
const string& link)
const {
2120 auto linkDescIt = linkDescriptorMap_.find(link);
2121 if (linkDescIt == linkDescriptorMap_.end()) {
2124 return linkDescIt->second;
2129 auto linkDescIt = linkDescriptorMapById_.find(linkId);
2130 if (linkDescIt == linkDescriptorMapById_.end()) {
2133 return linkDescIt->second;
2138 const auto table = getMutableMetadataForTableUnlocked(table_id);
2141 CHECK(foreign_table);
2142 return foreign_table;
2145 void Catalog::getAllColumnMetadataForTableImpl(
2147 list<const ColumnDescriptor*>& columnDescriptors,
2148 const bool fetchSystemColumns,
2149 const bool fetchVirtualColumns,
2150 const bool fetchPhysicalColumns)
const {
2151 int32_t skip_physical_cols = 0;
2152 for (
const auto& columnDescriptor : columnDescriptorMapById_) {
2153 if (!fetchPhysicalColumns && skip_physical_cols > 0) {
2154 --skip_physical_cols;
2157 auto cd = columnDescriptor.second;
2158 if (cd->tableId != td->
tableId) {
2161 if (!fetchSystemColumns && cd->isSystemCol) {
2164 if (!fetchVirtualColumns && cd->isVirtualCol) {
2167 if (!fetchPhysicalColumns) {
2168 const auto& col_ti = cd->columnType;
2169 skip_physical_cols = col_ti.get_physical_cols();
2171 columnDescriptors.push_back(cd);
2175 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
2177 const bool fetchSystemColumns,
2178 const bool fetchVirtualColumns,
2179 const bool fetchPhysicalColumns)
const {
2181 std::list<const ColumnDescriptor*> columnDescriptors;
2182 const TableDescriptor* td = getMutableMetadataForTableUnlocked(tableId);
2183 getAllColumnMetadataForTableImpl(td,
2186 fetchVirtualColumns,
2187 fetchPhysicalColumns);
2188 return columnDescriptors;
2191 list<const TableDescriptor*> Catalog::getAllTableMetadata()
const {
2193 list<const TableDescriptor*> table_list;
2194 for (
auto p : tableDescriptorMapById_) {
2195 table_list.push_back(p.second);
2200 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy()
const {
2202 std::vector<TableDescriptor>
tables;
2203 tables.reserve(tableDescriptorMapById_.size());
2204 for (
auto table_entry : tableDescriptorMapById_) {
2205 tables.emplace_back(*table_entry.second);
2206 tables.back().fragmenter =
nullptr;
2211 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata()
const {
2213 list<const DashboardDescriptor*> dashboards;
2214 for (
auto dashboard_entry : dashboardDescriptorMap_) {
2215 dashboards.push_back(dashboard_entry.second.get());
2220 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataForSysTable()
const {
2222 std::vector<DashboardDescriptor> dashboards;
2223 dashboards.reserve(dashboardDescriptorMap_.size());
2224 for (
auto dashboard_entry : dashboardDescriptorMap_) {
2225 const auto& cat_dashboard = dashboard_entry.second;
2226 dashboards.emplace_back();
2227 auto& dashboard = dashboards.back();
2228 dashboard.dashboardId = cat_dashboard->dashboardId;
2229 dashboard.dashboardName = cat_dashboard->dashboardName;
2230 dashboard.userId = cat_dashboard->userId;
2231 dashboard.updateTime = cat_dashboard->updateTime;
2232 dashboard.dashboardMetadata = cat_dashboard->dashboardMetadata;
2241 sqliteConnector_.query(
"BEGIN TRANSACTION");
2243 ref = addDictionaryNontransactional(cd);
2244 }
catch (std::exception& e) {
2245 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2248 sqliteConnector_.query(
"END TRANSACTION");
2254 const auto& td = *tableDescriptorMapById_[cd.
tableId];
2255 list<DictDescriptor> dds;
2256 setColumnDictionary(cd, dds, td,
true);
2257 auto& dd = dds.back();
2258 CHECK(dd.dictRef.dictId);
2260 std::unique_ptr<StringDictionaryClient> client;
2261 if (!string_dict_hosts_.empty()) {
2263 string_dict_hosts_.front(),
DictRef(currentDB_.dbId, -1),
true));
2266 client->create(dd.dictRef, dd.dictIsTemp);
2270 dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
2271 if (!dd.dictIsTemp) {
2280 sqliteConnector_.query(
"BEGIN TRANSACTION");
2282 delDictionaryNontransactional(cd);
2283 }
catch (std::exception& e) {
2284 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2287 sqliteConnector_.query(
"END TRANSACTION");
2302 const auto td = getMetadataForTable(cd.
tableId,
false);
2304 sqliteConnector_.query_with_text_param(
2305 "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
2307 sqliteConnector_.query_with_text_param(
2308 "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?",
std::to_string(dictId));
2309 const auto refcount = sqliteConnector_.getData<
int>(0, 0);
2310 VLOG(3) <<
"Dictionary " << dictId <<
"from dropped table has reference count "
2315 const DictRef dictRef(currentDB_.dbId, dictId);
2316 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_dictionaries WHERE dictid = ?",
2322 std::unique_ptr<StringDictionaryClient> client;
2323 if (!string_dict_hosts_.empty()) {
2327 client->drop(dictRef);
2330 dictDescriptorMapByRef_.erase(dictRef);
2333 std::list<const DictDescriptor*> Catalog::getAllDictionariesWithColumnInName(
2336 std::list<const DictDescriptor*> dds;
2338 auto table_name_opt = getTableName(cd->
tableId);
2339 CHECK(table_name_opt.has_value());
2340 auto table_name = table_name_opt.value();
2342 for (
const auto& [dkey, dd] : dictDescriptorMapByRef_) {
2343 if (dd->dictName.find(table_name +
"_" + cd->
columnName +
"_dict") !=
2344 std::string::npos) {
2345 dds.push_back(dd.get());
2353 std::map<int, StringDictionary*>& stringDicts) {
2356 CHECK(cit != columnDescriptorMap_.end());
2357 auto& ccd = *cit->second;
2359 if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
2365 if (!(ccd.columnType.get_comp_param() > 0)) {
2369 auto dictId = ccd.columnType.get_comp_param();
2370 getMetadataForDict(dictId);
2372 const DictRef dictRef(currentDB_.dbId, dictId);
2373 auto dit = dictDescriptorMapByRef_.find(dictRef);
2374 CHECK(dit != dictDescriptorMapByRef_.end());
2376 CHECK(dit->second.get()->stringDict);
2377 stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
2380 size_t Catalog::getTotalMemorySizeForDictionariesForDatabase()
const {
2382 for (
auto const& kv : dictDescriptorMapByRef_) {
2383 if (kv.first.dbId == currentDB_.dbId) {
2384 auto dictionary = kv.second.get()->stringDict.get();
2386 ret += dictionary->computeCacheSize();
2397 sqliteConnector_.query(
"BEGIN TRANSACTION");
2399 const auto table_id = cd.
tableId;
2401 auto catalog_cd = getMetadataForColumn(table_id, cd.
columnId);
2403 CHECK(catalog_cd) <<
" can not alter non existing column";
2406 std::vector<BindType> types(11, BindType::TEXT);
2408 types[8] = BindType::NULL_TYPE;
2410 sqliteConnector_.query_with_text_params(
2411 "UPDATE mapd_columns SET "
2420 "default_value = ? "
2421 "WHERE tableid = ? and columnid = ?",
2438 ColumnDescriptorMap::iterator columnDescIt =
2440 CHECK(columnDescIt != columnDescriptorMap_.end());
2441 auto ocd = columnDescIt->second;
2443 updateInColumnMap(ncd, ocd);
2444 }
catch (std::exception& e) {
2445 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2448 sqliteConnector_.query(
"END TRANSACTION");
2453 sqliteConnector_.query_with_text_params(
2454 "SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?",
2456 return sqliteConnector_.getData<
int>(0, 0);
2462 sqliteConnector_.query(
"BEGIN TRANSACTION");
2464 addColumnNontransactional(td, cd);
2465 }
catch (std::exception& e) {
2466 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2469 sqliteConnector_.query(
"END TRANSACTION");
2476 cd.
db_id = getDatabaseId();
2478 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2480 addColumnNontransactional(*shard, shard_cd);
2484 addDictionaryNontransactional(cd);
2488 std::vector<BindType> types(17, BindType::TEXT);
2490 types[16] = BindType::NULL_TYPE;
2492 sqliteConnector_.query_with_text_params(
2493 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2494 "colscale, is_notnull, "
2495 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2496 "is_deletedcol, default_value) "
2498 "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2501 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2521 sqliteConnector_.query_with_text_params(
2522 "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2525 sqliteConnector_.query_with_text_params(
2526 "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2528 cd.
columnId = sqliteConnector_.getData<
int>(0, 0);
2530 ++tableDescriptorMapById_[td.
tableId]->nColumns;
2532 addToColumnMap(ncd);
2533 addColumnDescriptor(ncd);
2534 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
2541 cd.
db_id = getDatabaseId();
2543 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2545 addColumn(*shard, shard_cd);
2549 addDictionaryNontransactional(cd);
2553 std::vector<BindType> types(17, BindType::TEXT);
2555 types[16] = BindType::NULL_TYPE;
2557 sqliteConnector_.query_with_text_params(
2558 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2559 "colscale, is_notnull, "
2560 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2561 "is_deletedcol, default_value) "
2563 "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2566 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2586 sqliteConnector_.query_with_text_params(
2587 "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2590 sqliteConnector_.query_with_text_params(
2591 "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2593 cd.
columnId = sqliteConnector_.getData<
int>(0, 0);
2595 ++tableDescriptorMapById_[td.
tableId]->nColumns;
2597 addToColumnMap(ncd);
2598 columnDescriptorsForRoll.emplace_back(
nullptr, ncd);
2603 dropColumnPolicies(td, cd);
2607 sqliteConnector_.query(
"BEGIN TRANSACTION");
2609 dropColumnNontransactional(td, cd);
2610 }
catch (std::exception& e) {
2611 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2614 sqliteConnector_.query(
"END TRANSACTION");
2622 sqliteConnector_.query_with_text_params(
2623 "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2626 sqliteConnector_.query_with_text_params(
2627 "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2630 ColumnDescriptorMap::iterator columnDescIt =
2632 CHECK(columnDescIt != columnDescriptorMap_.end());
2634 auto ocd = columnDescIt->second;
2635 removeFromColumnMap(ocd);
2636 --tableDescriptorMapById_[td.
tableId]->nColumns;
2637 removeColumnDescriptor(ocd);
2638 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
2642 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2643 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2645 dropColumnNontransactional(*shard, *shard_cd);
2654 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2655 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2657 dropColumnPolicies(*shard, *shard_cd);
2668 sqliteConnector_.query_with_text_params(
2669 "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2673 sqliteConnector_.query_with_text_params(
2674 "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2677 ColumnDescriptorMap::iterator columnDescIt =
2679 CHECK(columnDescIt != columnDescriptorMap_.end());
2681 columnDescriptorsForRoll.emplace_back(columnDescIt->second,
nullptr);
2682 removeFromColumnMap(columnDescIt->second);
2683 --tableDescriptorMapById_[td.
tableId]->nColumns;
2688 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2689 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2691 dropColumn(*shard, *shard_cd);
2701 auto tabDescIt = tableDescriptorMapById_.find(cd->
tableId);
2702 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2703 auto td = tabDescIt->second;
2704 auto& cd_by_spi = td->columnIdBySpi_;
2705 cd_by_spi.erase(std::remove(cd_by_spi.begin(), cd_by_spi.end(), cd->
columnId),
2708 std::sort(cd_by_spi.begin(), cd_by_spi.end());
2716 auto tabDescIt = tableDescriptorMapById_.find(cd->
tableId);
2717 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2718 auto td = tabDescIt->second;
2719 auto& cd_by_spi = td->columnIdBySpi_;
2721 if (cd_by_spi.end() == std::find(cd_by_spi.begin(), cd_by_spi.end(), cd->
columnId)) {
2724 std::sort(cd_by_spi.begin(), cd_by_spi.end());
2728 void Catalog::rollLegacy(
const bool forward) {
2730 std::set<const TableDescriptor*> tds;
2732 for (
const auto& cdr : columnDescriptorsForRoll) {
2733 auto ocd = cdr.first;
2734 auto ncd = cdr.second;
2736 auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2737 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2738 auto td = tabDescIt->second;
2739 auto& vc = td->columnIdBySpi_;
2742 if (
nullptr == ncd ||
2743 ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2744 delDictionaryNontransactional(*ocd);
2747 vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2753 if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2754 if (!ncd->isGeoPhyCol) {
2755 vc.push_back(ncd->columnId);
2762 addToColumnMap(ocd);
2766 removeFromColumnMap(ncd);
2767 if (
nullptr == ocd ||
2768 ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2769 delDictionaryNontransactional(*ncd);
2775 columnDescriptorsForRoll.clear();
2778 for (
const auto td : tds) {
2779 calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2785 list<ColumnDescriptor>& columns) {
2787 if (
IS_GEO(col_ti.get_type())) {
2788 switch (col_ti.get_type()) {
2797 col_ti.get_comp_param() == 32) {
2798 unit_size = 4 *
sizeof(int8_t);
2801 unit_size = 8 *
sizeof(int8_t);
2805 columns.push_back(physical_cd_coords);
2819 columns.push_back(physical_cd_coords);
2825 bounds_ti.
set_size(4 *
sizeof(
double));
2827 columns.push_back(physical_cd_bounds);
2840 columns.push_back(physical_cd_coords);
2846 physical_cd_linestring_sizes.
columnType = linestring_sizes_ti;
2847 columns.push_back(physical_cd_linestring_sizes);
2853 bounds_ti.
set_size(4 *
sizeof(
double));
2855 columns.push_back(physical_cd_bounds);
2868 columns.push_back(physical_cd_coords);
2874 physical_cd_ring_sizes.
columnType = ring_sizes_ti;
2875 columns.push_back(physical_cd_ring_sizes);
2881 bounds_ti.
set_size(4 *
sizeof(
double));
2883 columns.push_back(physical_cd_bounds);
2896 columns.push_back(physical_cd_coords);
2902 physical_cd_ring_sizes.
columnType = ring_sizes_ti;
2903 columns.push_back(physical_cd_ring_sizes);
2909 physical_cd_poly_rings.
columnType = poly_rings_ti;
2910 columns.push_back(physical_cd_poly_rings);
2916 bounds_ti.
set_size(4 *
sizeof(
double));
2918 columns.push_back(physical_cd_bounds);
2925 throw runtime_error(
"Unrecognized geometry type.");
2933 auto timing_type_entry =
2935 CHECK(timing_type_entry != foreign_table.
options.end());
2936 if (timing_type_entry->second ==
2945 void Catalog::createTable(
2947 const list<ColumnDescriptor>& cols,
2948 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2949 bool isLogicalTable) {
2951 list<ColumnDescriptor> cds = cols;
2952 list<DictDescriptor> dds;
2953 std::set<std::string> toplevel_column_names;
2954 list<ColumnDescriptor> columns;
2959 throw std::runtime_error(
"Only temporary tables can be backed by foreign storage.");
2961 dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2964 for (
auto cd : cds) {
2965 if (cd.columnName ==
"rowid") {
2966 throw std::runtime_error(
2967 "Cannot create column with name rowid. rowid is a system defined column.");
2969 columns.push_back(cd);
2970 toplevel_column_names.insert(cd.columnName);
2971 if (cd.columnType.is_geometry()) {
2972 expandGeoColumn(cd, columns);
2982 #ifdef MATERIALIZED_ROWID
2986 cd.
virtualExpr =
"MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2988 columns.push_back(cd);
2999 columns.push_back(cd_del);
3002 for (
auto& column : columns) {
3003 column.db_id = getDatabaseId();
3009 sqliteConnector_.query(
"BEGIN TRANSACTION");
3012 sqliteConnector_.query_with_text_params(
3013 R
"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
3035 sqliteConnector_.query_with_text_param(
3036 "SELECT tableid FROM mapd_tables WHERE name = ?", td.
tableName);
3037 td.
tableId = sqliteConnector_.getData<
int>(0, 0);
3039 for (
auto cd : columns) {
3041 const bool is_foreign_col =
3042 setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
3043 if (!is_foreign_col) {
3050 auto use_temp_dictionary =
false;
3051 setColumnDictionary(cd, dds, td, isLogicalTable, use_temp_dictionary);
3055 if (toplevel_column_names.count(cd.
columnName)) {
3062 std::vector<BindType> types(17, BindType::TEXT);
3064 types[16] = BindType::NULL_TYPE;
3066 sqliteConnector_.query_with_text_params(
3067 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
3068 "coldim, colscale, is_notnull, "
3069 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
3070 "virtual_expr, is_deletedcol, default_value) "
3071 "VALUES (?, ?, ?, ?, ?, "
3073 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
3097 sqliteConnector_.query_with_text_params(
3098 "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
3104 sqliteConnector_.query_with_text_params(
3105 "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
3106 "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
3109 foreign_table.getOptionsAsJsonString(),
3113 }
catch (std::exception& e) {
3114 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3118 td.
tableId = nextTempTableId_++;
3120 for (
auto cd : columns) {
3122 const bool is_foreign_col =
3123 setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
3125 if (!is_foreign_col) {
3127 std::string fileName(
"");
3128 std::string folderPath(
"");
3129 DictRef dict_ref(currentDB_.dbId, nextTempDictId_);
3146 if (toplevel_column_names.count(cd.
columnName)) {
3157 serializeTableJsonUnlocked(&td, cds);
3162 auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
3164 CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.
tableId}))
3165 <<
"Disk cache at " + cache->getCacheDirectory()
3166 <<
" contains preexisting data for new table. Please "
3167 "delete or clear cache before continuing";
3170 addTableToMap(&td, cds, dds);
3171 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
3173 dataMgr_->getForeignStorageInterface()->registerTable(
this, td, cds);
3175 }
catch (std::exception& e) {
3176 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3177 removeTableFromMap(td.tableName, td.tableId,
true);
3180 sqliteConnector_.query(
"END TRANSACTION");
3183 write_lock.unlock();
3184 sqlite_lock.unlock();
3185 getMetadataForTable(td.tableName,
3191 const std::list<ColumnDescriptor>& cds)
const {
3193 using namespace rapidjson;
3195 VLOG(1) <<
"Serializing temporary table " << td->
tableName <<
" to JSON for Calcite.";
3197 const auto db_name = currentDB_.dbName;
3201 if (boost::filesystem::exists(file_path)) {
3203 std::ifstream reader(file_path.string());
3204 CHECK(reader.is_open());
3205 IStreamWrapper json_read_wrapper(reader);
3206 d.ParseStream(json_read_wrapper);
3210 CHECK(d.IsObject());
3213 Value table(kObjectType);
3215 "name",
Value().SetString(StringRef(td->
tableName.c_str())), d.GetAllocator());
3216 table.AddMember(
"id",
Value().SetInt(td->
tableId), d.GetAllocator());
3217 table.AddMember(
"columns",
Value(kArrayType), d.GetAllocator());
3219 for (
const auto& cd : cds) {
3220 Value column(kObjectType);
3222 "name",
Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
3223 column.AddMember(
"coltype",
3224 Value().SetInt(static_cast<int>(cd.columnType.get_type())),
3226 column.AddMember(
"colsubtype",
3227 Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
3229 column.AddMember(
"compression",
3230 Value().SetInt(static_cast<int>(cd.columnType.get_compression())),
3232 column.AddMember(
"comp_param",
3233 Value().SetInt(static_cast<int>(cd.columnType.get_comp_param())),
3235 column.AddMember(
"size",
3236 Value().SetInt(static_cast<int>(cd.columnType.get_size())),
3239 "coldim",
Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
3241 "colscale",
Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
3243 "is_notnull",
Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
3244 column.AddMember(
"is_systemcol",
Value().SetBool(cd.isSystemCol), d.GetAllocator());
3245 column.AddMember(
"is_virtualcol",
Value().SetBool(cd.isVirtualCol), d.GetAllocator());
3246 column.AddMember(
"is_deletedcol",
Value().SetBool(cd.isDeletedCol), d.GetAllocator());
3247 table[
"columns"].PushBack(column, d.GetAllocator());
3249 d.AddMember(StringRef(td->
tableName.c_str()), table, d.GetAllocator());
3252 std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3253 CHECK(writer.is_open());
3254 OStreamWrapper json_wrapper(writer);
3256 Writer<OStreamWrapper> json_writer(json_wrapper);
3257 d.Accept(json_writer);
3261 void Catalog::dropTableFromJsonUnlocked(
const std::string& table_name)
const {
3263 using namespace rapidjson;
3265 VLOG(1) <<
"Dropping temporary table " << table_name <<
" to JSON for Calcite.";
3267 const auto db_name = currentDB_.dbName;
3270 CHECK(boost::filesystem::exists(file_path));
3273 std::ifstream reader(file_path.string());
3274 CHECK(reader.is_open());
3275 IStreamWrapper json_read_wrapper(reader);
3276 d.ParseStream(json_read_wrapper);
3278 CHECK(d.IsObject());
3279 auto table_name_ref = StringRef(table_name.c_str());
3280 CHECK(d.HasMember(table_name_ref));
3281 CHECK(d.RemoveMember(table_name_ref));
3284 std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3285 CHECK(writer.is_open());
3286 OStreamWrapper json_wrapper(writer);
3288 Writer<OStreamWrapper> json_writer(json_wrapper);
3289 d.Accept(json_writer);
3293 void Catalog::createForeignServer(
3294 std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3295 bool if_not_exists) {
3298 createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
3301 void Catalog::createForeignServerNoLocks(
3302 std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3303 bool if_not_exists) {
3304 const auto&
name = foreign_server->name;
3306 sqliteConnector_.query_with_text_params(
3307 "SELECT name from omnisci_foreign_servers where name = ?",
3308 std::vector<std::string>{
name});
3310 if (sqliteConnector_.getNumRows() == 0) {
3311 foreign_server->creation_time = std::time(
nullptr);
3312 sqliteConnector_.query_with_text_params(
3313 "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
3316 "VALUES (?, ?, ?, ?, ?)",
3317 std::vector<std::string>{
name,
3318 foreign_server->data_wrapper_type,
3321 foreign_server->getOptionsAsJsonString()});
3322 sqliteConnector_.query_with_text_params(
3323 "SELECT id from omnisci_foreign_servers where name = ?",
3324 std::vector<std::string>{
name});
3325 CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
3326 foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
3327 std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
3328 std::move(foreign_server);
3329 CHECK(foreignServerMap_.find(
name) == foreignServerMap_.end())
3330 <<
"Attempting to insert a foreign server into foreign server map that already "
3332 foreignServerMap_[
name] = foreign_server_shared;
3333 foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
3334 }
else if (!if_not_exists) {
3335 throw std::runtime_error{
"A foreign server with name \"" + foreign_server->name +
3336 "\" already exists."};
3339 const auto& server_it = foreignServerMap_.find(
name);
3340 CHECK(server_it != foreignServerMap_.end());
3341 CHECK(foreignServerMapById_.find(server_it->second->id) != foreignServerMapById_.end());
3345 const std::string& server_name)
const {
3349 if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
3350 foreign_server = foreignServerMap_.find(server_name)->second.get();
3352 return foreign_server;
3355 const std::unique_ptr<const foreign_storage::ForeignServer>
3356 Catalog::getForeignServerFromStorage(
const std::string& server_name) {
3357 std::unique_ptr<foreign_storage::ForeignServer> foreign_server =
nullptr;
3359 sqliteConnector_.query_with_text_params(
3360 "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
3361 "FROM omnisci_foreign_servers WHERE name = ?",
3362 std::vector<std::string>{server_name});
3363 if (sqliteConnector_.getNumRows() > 0) {
3364 foreign_server = std::make_unique<foreign_storage::ForeignServer>(
3365 sqliteConnector_.getData<
int>(0, 0),
3366 sqliteConnector_.getData<std::string>(0, 1),
3367 sqliteConnector_.getData<std::string>(0, 2),
3368 sqliteConnector_.getData<std::string>(0, 3),
3369 sqliteConnector_.getData<std::int32_t>(0, 4),
3370 sqliteConnector_.getData<std::int32_t>(0, 5));
3372 return foreign_server;
3375 const std::unique_ptr<const foreign_storage::ForeignTable>
3376 Catalog::getForeignTableFromStorage(
int table_id) {
3377 std::unique_ptr<foreign_storage::ForeignTable> foreign_table =
nullptr;
3379 sqliteConnector_.query_with_text_params(
3380 "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
3381 "omnisci_foreign_tables WHERE table_id = ?",
3382 std::vector<std::string>{
to_string(table_id)});
3383 auto num_rows = sqliteConnector_.getNumRows();
3386 foreign_table = std::make_unique<foreign_storage::ForeignTable>(
3387 sqliteConnector_.getData<
int>(0, 0),
3388 foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
3389 sqliteConnector_.getData<std::string>(0, 2),
3390 sqliteConnector_.getData<int64_t>(0, 3),
3391 sqliteConnector_.getData<int64_t>(0, 4));
3393 return foreign_table;
3396 void Catalog::changeForeignServerOwner(
const std::string& server_name,
3397 const int new_owner_id) {
3400 foreignServerMap_.find(server_name)->second.get();
3401 CHECK(foreign_server);
3402 setForeignServerProperty(server_name,
"owner_user_id",
std::to_string(new_owner_id));
3404 foreign_server->
user_id = new_owner_id;
3407 void Catalog::setForeignServerDataWrapper(
const std::string& server_name,
3408 const std::string& data_wrapper) {
3410 auto data_wrapper_type =
to_upper(data_wrapper);
3413 foreignServerMap_.find(server_name)->second.get();
3414 CHECK(foreign_server);
3419 }
catch (
const std::exception& e) {
3425 setForeignServerProperty(server_name,
"data_wrapper_type", data_wrapper_type);
3428 void Catalog::setForeignServerOptions(
const std::string& server_name,
3429 const std::string& options) {
3433 foreignServerMap_.find(server_name)->second.get();
3434 CHECK(foreign_server);
3435 auto saved_options = foreign_server->
options;
3439 }
catch (
const std::exception& e) {
3442 foreign_server->
options = saved_options;
3445 setForeignServerProperty(server_name,
"options", options);
3448 void Catalog::renameForeignServer(
const std::string& server_name,
3449 const std::string&
name) {
3451 auto foreign_server_it = foreignServerMap_.find(server_name);
3452 CHECK(foreign_server_it != foreignServerMap_.end());
3453 setForeignServerProperty(server_name,
"name", name);
3454 auto foreign_server_shared = foreign_server_it->second;
3455 foreign_server_shared->name =
name;
3456 foreignServerMap_[
name] = foreign_server_shared;
3457 foreignServerMap_.erase(foreign_server_it);
3460 void Catalog::dropForeignServer(
const std::string& server_name) {
3464 sqliteConnector_.query_with_text_params(
3465 "SELECT id from omnisci_foreign_servers where name = ?",
3466 std::vector<std::string>{server_name});
3467 auto num_rows = sqliteConnector_.getNumRows();
3470 auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
3471 sqliteConnector_.query_with_text_param(
3472 "SELECT table_id from omnisci_foreign_tables where server_id = ?",
3474 if (sqliteConnector_.getNumRows() > 0) {
3475 throw std::runtime_error{
"Foreign server \"" + server_name +
3477 "by existing foreign tables and cannot be dropped."};
3479 sqliteConnector_.query(
"BEGIN TRANSACTION");
3481 sqliteConnector_.query_with_text_params(
3482 "DELETE FROM omnisci_foreign_servers WHERE name = ?",
3483 std::vector<std::string>{server_name});
3484 }
catch (
const std::exception& e) {
3485 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3488 sqliteConnector_.query(
"END TRANSACTION");
3489 foreignServerMap_.erase(server_name);
3490 foreignServerMapById_.erase(server_id);
3494 void Catalog::getForeignServersForUser(
3495 const rapidjson::Value* filters,
3497 std::vector<const foreign_storage::ForeignServer*>& results) {
3502 std::map<std::string, std::string> col_names{{
"server_name",
"name"},
3503 {
"data_wrapper",
"data_wrapper_type"},
3504 {
"created_at",
"creation_time"},
3505 {
"options",
"options"}};
3508 std::stringstream filter_string;
3509 std::vector<std::string> arguments;
3511 if (filters !=
nullptr) {
3513 int num_filters = 0;
3514 filter_string <<
" WHERE";
3515 for (
auto& filter_def : filters->GetArray()) {
3516 if (num_filters > 0) {
3517 filter_string <<
" " << std::string(filter_def[
"chain"].GetString());
3521 if (col_names.find(std::string(filter_def[
"attribute"].GetString())) ==
3523 throw std::runtime_error{
"Attribute with name \"" +
3524 std::string(filter_def[
"attribute"].GetString()) +
3525 "\" does not exist."};
3528 filter_string <<
" " << col_names[std::string(filter_def[
"attribute"].GetString())];
3530 bool equals_operator =
false;
3531 if (std::strcmp(filter_def[
"operation"].GetString(),
"EQUALS") == 0) {
3532 filter_string <<
" = ? ";
3533 equals_operator =
true;
3535 filter_string <<
" LIKE ? ";
3538 bool timestamp_column =
3539 (std::strcmp(filter_def[
"attribute"].GetString(),
"created_at") == 0);
3541 if (timestamp_column && !equals_operator) {
3542 throw std::runtime_error{
"LIKE operator is incompatible with TIMESTAMP data"};
3545 if (timestamp_column && equals_operator) {
3547 dateTimeParse<kTIMESTAMP>(filter_def[
"value"].GetString(), 0)));
3549 arguments.emplace_back(filter_def[
"value"].GetString());
3556 std::string query = std::string(
"SELECT name from omnisci_foreign_servers ");
3557 query += filter_string.str();
3559 sqliteConnector_.query_with_text_params(query, arguments);
3560 auto num_rows = sqliteConnector_.getNumRows();
3562 if (sqliteConnector_.getNumRows() == 0) {
3566 CHECK(sqliteConnector_.getNumCols() == 1);
3568 results.reserve(num_rows);
3569 for (
size_t row = 0; row < num_rows; ++row) {
3570 const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
3575 CHECK(foreign_server !=
nullptr);
3579 std::vector<DBObject> privObjects = {dbObject};
3584 results.push_back(foreign_server);
3589 int32_t Catalog::getTableEpoch(
const int32_t db_id,
const int32_t table_id)
const {
3591 const auto td = getMetadataForTable(table_id,
false);
3593 std::stringstream table_not_found_error_message;
3594 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3596 throw std::runtime_error(table_not_found_error_message.str());
3598 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
3599 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3601 const auto physicalTables = physicalTableIt->second;
3602 CHECK(!physicalTables.empty());
3603 size_t curr_epoch{0}, first_epoch{0};
3604 int32_t first_table_id{0};
3605 bool are_epochs_inconsistent{
false};
3606 for (
size_t i = 0; i < physicalTables.size(); i++) {
3607 int32_t physical_tb_id = physicalTables[i];
3608 const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id,
false);
3611 curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
3612 LOG(
INFO) <<
"Got sharded table epoch for db id: " << db_id
3613 <<
", table id: " << physical_tb_id <<
", epoch: " << curr_epoch;
3615 first_epoch = curr_epoch;
3616 first_table_id = physical_tb_id;
3617 }
else if (first_epoch != curr_epoch) {
3618 are_epochs_inconsistent =
true;
3619 LOG(
ERROR) <<
"Epochs on shards do not all agree on table id: " << table_id
3620 <<
", db id: " << db_id
3621 <<
". First table (table id: " << first_table_id
3622 <<
") has epoch: " << first_epoch <<
". Table id: " << physical_tb_id
3623 <<
", has inconsistent epoch: " << curr_epoch
3624 <<
". See previous INFO logs for all epochs and their table ids.";
3627 if (are_epochs_inconsistent) {
3633 auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3634 LOG(
INFO) <<
"Got table epoch for db id: " << db_id <<
", table id: " << table_id
3635 <<
", epoch: " << epoch;
3640 std::vector<const foreign_storage::ForeignTable*>
3641 Catalog::getAllForeignTablesForForeignServer(
const int32_t foreign_server_id) {
3643 std::vector<const foreign_storage::ForeignTable*> foreign_tables;
3644 for (
auto entry : tableDescriptorMapById_) {
3645 auto table_descriptor = entry.second;
3648 CHECK(foreign_table);
3649 if (foreign_table->foreign_server->id == foreign_server_id) {
3650 foreign_tables.emplace_back(foreign_table);
3654 return foreign_tables;
3657 void Catalog::setTableEpoch(
const int db_id,
const int table_id,
int new_epoch) {
3658 LOG(
INFO) <<
"Set table epoch db:" << db_id <<
" Table ID " << table_id
3659 <<
" back to new epoch " << new_epoch;
3660 const auto td = getMetadataForTable(table_id,
false);
3662 std::stringstream table_not_found_error_message;
3663 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3665 throw std::runtime_error(table_not_found_error_message.str());
3668 std::stringstream is_temp_table_error_message;
3669 is_temp_table_error_message <<
"Cannot set epoch on temporary table";
3670 throw std::runtime_error(is_temp_table_error_message.str());
3674 file_mgr_params.
epoch = new_epoch;
3677 const auto physical_tables = getPhysicalTablesDescriptors(td,
false);
3678 CHECK(!physical_tables.empty());
3679 for (
const auto table : physical_tables) {
3680 auto table_id = table->tableId;
3681 LOG(
INFO) <<
"Set sharded table epoch db:" << db_id <<
" Table ID " << table_id
3682 <<
" back to new epoch " << new_epoch;
3685 removeChunks(table_id);
3686 dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3690 void Catalog::alterPhysicalTableMetadata(
3701 sqliteConnector_.query_with_text_params(
3702 "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
3709 sqliteConnector_.query_with_text_params(
3710 "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
3721 sqliteConnector_.query(
"BEGIN TRANSACTION");
3723 const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->
tableId);
3724 if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3725 const auto physical_tables = physical_table_it->second;
3726 CHECK(!physical_tables.empty());
3727 for (
size_t i = 0; i < physical_tables.size(); i++) {
3728 int32_t physical_tb_id = physical_tables[i];
3729 const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id,
false);
3731 alterPhysicalTableMetadata(phys_td, table_update_params);
3734 alterPhysicalTableMetadata(td, table_update_params);
3735 }
catch (std::exception& e) {
3736 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3739 sqliteConnector_.query(
"END TRANSACTION");
3742 void Catalog::setMaxRollbackEpochs(
const int32_t table_id,
3743 const int32_t max_rollback_epochs) {
3746 if (max_rollback_epochs <= -1) {
3747 throw std::runtime_error(
"Cannot set max_rollback_epochs < 0.");
3749 const auto td = getMetadataForTable(
3755 if (table_update_params == td) {
3756 LOG(
INFO) <<
"Setting max_rollback_epochs for table " << table_id
3757 <<
" to existing value, skipping operation";
3761 file_mgr_params.
epoch = -1;
3763 setTableFileMgrParams(table_id, file_mgr_params);
3764 alterTableMetadata(td, table_update_params);
3767 void Catalog::setMaxRows(
const int32_t table_id,
const int64_t max_rows) {
3769 throw std::runtime_error(
"Max rows cannot be a negative number.");
3771 const auto td = getMetadataForTable(table_id);
3774 table_update_params.
max_rows = max_rows;
3775 if (table_update_params == td) {
3776 LOG(
INFO) <<
"Max rows value of " << max_rows
3777 <<
" is the same as the existing value. Skipping update.";
3780 alterTableMetadata(td, table_update_params);
3781 CHECK(td->fragmenter);
3782 td->fragmenter->dropFragmentsToSize(max_rows);
3786 void Catalog::setUncappedTableEpoch(
const std::string& table_name) {
3788 auto td_entry = tableDescriptorMap_.find(
to_upper(table_name));
3789 CHECK(td_entry != tableDescriptorMap_.end());
3790 auto td = td_entry->second;
3798 alterTableMetadata(td, table_update_params);
3801 setTableFileMgrParams(td->tableId, file_mgr_params);
3804 void Catalog::setTableFileMgrParams(
3808 const auto td = getMetadataForTable(table_id,
false);
3809 const auto db_id = this->getDatabaseId();
3811 std::stringstream table_not_found_error_message;
3812 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3814 throw std::runtime_error(table_not_found_error_message.str());
3817 std::stringstream is_temp_table_error_message;
3818 is_temp_table_error_message <<
"Cannot set storage params on temporary table";
3819 throw std::runtime_error(is_temp_table_error_message.str());
3822 const auto physical_tables = getPhysicalTablesDescriptors(td,
false);
3823 CHECK(!physical_tables.empty());
3824 for (
const auto table : physical_tables) {
3825 auto table_id = table->tableId;
3826 removeChunks(table_id);
3827 dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3831 std::vector<TableEpochInfo> Catalog::getTableEpochs(
const int32_t db_id,
3832 const int32_t table_id)
const {
3834 std::vector<TableEpochInfo> table_epochs;
3835 const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3836 if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3837 const auto physical_tables = physical_table_it->second;
3838 CHECK(!physical_tables.empty());
3840 for (
const auto physical_tb_id : physical_tables) {
3841 const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3844 auto table_id = phys_td->tableId;
3845 auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3846 table_epochs.emplace_back(table_id, epoch);
3847 LOG(
INFO) <<
"Got sharded table epoch for db id: " << db_id
3848 <<
", table id: " << table_id <<
", epoch: " << epoch;
3851 auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3852 LOG(
INFO) <<
"Got table epoch for db id: " << db_id <<
", table id: " << table_id
3853 <<
", epoch: " << epoch;
3854 table_epochs.emplace_back(table_id, epoch);
3856 return table_epochs;
3859 void Catalog::setTableEpochs(
const int32_t db_id,
3860 const std::vector<TableEpochInfo>& table_epochs)
const {
3861 const auto td = getMetadataForTable(table_epochs[0].table_id,
false);
3866 for (
const auto& table_epoch_info : table_epochs) {
3867 removeChunks(table_epoch_info.table_id);
3868 file_mgr_params.
epoch = table_epoch_info.table_epoch;
3869 dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3870 db_id, table_epoch_info.table_id, file_mgr_params);
3871 LOG(
INFO) <<
"Set table epoch for db id: " << db_id
3872 <<
", table id: " << table_epoch_info.table_id
3873 <<
", back to epoch: " << table_epoch_info.table_epoch;
3879 std::string table_epochs_str{
"["};
3880 bool first_entry{
true};
3881 for (
const auto& table_epoch : table_epochs) {
3883 first_entry =
false;
3885 table_epochs_str +=
", ";
3887 table_epochs_str +=
"(table_id: " +
std::to_string(table_epoch.table_id) +
3890 table_epochs_str +=
"]";
3891 return table_epochs_str;
3895 void Catalog::setTableEpochsLogExceptions(
3896 const int32_t db_id,
3897 const std::vector<TableEpochInfo>& table_epochs)
const {
3899 setTableEpochs(db_id, table_epochs);
3900 }
catch (std::exception& e) {
3901 LOG(
ERROR) <<
"An error occurred when attempting to set table epochs. DB id: "
3903 <<
", Error: " << e.what();
3909 const auto it = deletedColumnPerTable_.find(td);
3910 return it != deletedColumnPerTable_.end() ? it->second :
nullptr;
3914 int delete_column_id)
const {
3919 return fragmenter->hasDeletedRows(delete_column_id);
3927 std::vector<const TableDescriptor*> tds;
3932 const auto it = deletedColumnPerTable_.find(td);
3934 if (it == deletedColumnPerTable_.end()) {
3938 tds = getPhysicalTablesDescriptors(td,
false);
3941 for (
auto tdd : tds) {
3942 if (checkMetadataForDeletedRecs(tdd, cd->
columnId)) {
3952 setDeletedColumnUnlocked(td, cd);
3957 const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3958 CHECK(it_ok.second);
3977 const bool persist_reference) {
3980 CHECK(foreign_ref_col);
3981 referencing_column.
columnType = foreign_ref_col->columnType;
3983 const DictRef dict_ref(currentDB_.dbId, dict_id);
3984 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3985 CHECK(dictIt != dictDescriptorMapByRef_.end());
3986 const auto& dd = dictIt->second;
3989 if (persist_reference) {
3991 sqliteConnector_.query_with_text_params(
3992 "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3997 bool Catalog::setColumnSharedDictionary(
3999 std::list<ColumnDescriptor>& cdd,
4000 std::list<DictDescriptor>& dds,
4002 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
4006 if (shared_dict_defs.empty()) {
4009 for (
const auto& shared_dict_def : shared_dict_defs) {
4011 const auto& column = shared_dict_def.get_column();
4013 if (!shared_dict_def.get_foreign_table().compare(td.
tableName)) {
4015 const auto& ref_column = shared_dict_def.get_foreign_column();
4017 std::find_if(cdd.begin(), cdd.end(), [ref_column](
const ColumnDescriptor it) {
4018 return !ref_column.compare(it.columnName);
4020 CHECK(colIt != cdd.end());
4025 auto dictIt = std::find_if(
4026 dds.begin(), dds.end(), [
this, dict_id](
const DictDescriptor it) {
4027 return it.dictRef.dbId == this->currentDB_.dbId &&
4028 it.dictRef.dictId == dict_id;
4030 if (dictIt != dds.end()) {
4036 sqliteConnector_.query_with_text_params(
4037 "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
4047 const auto& foreign_table_name = shared_dict_def.get_foreign_table();
4048 const auto foreign_td = getMetadataForTable(foreign_table_name,
false);
4051 throw std::runtime_error(
4052 "Only temporary tables can share dictionaries with other temporary "
4055 addReferenceToForeignDict(cd, shared_dict_def,
false);
4067 std::list<DictDescriptor>& dds,
4069 bool is_logical_table,
4070 bool use_temp_dictionary) {
4073 std::string dictName{
"Initial_key"};
4075 std::string folderPath;
4076 if (is_logical_table) {
4079 sqliteConnector_.query_with_text_params(
4080 "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
4082 std::vector<std::string>{
4084 sqliteConnector_.query_with_text_param(
4085 "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
4086 dictId = sqliteConnector_.getData<
int>(0, 0);
4088 sqliteConnector_.query_with_text_param(
4089 "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
4100 use_temp_dictionary);
4109 void Catalog::createShardedTable(
4111 const list<ColumnDescriptor>& cols,
4112 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
4115 createTable(*tdl, cols, shared_dict_defs,
true);
4116 int32_t logical_tb_id = tdl->
tableId;
4117 std::string logical_table_name = tdl->
tableName;
4120 std::vector<int32_t> physicalTables;
4121 for (int32_t i = 1; i <= td.
nShards; i++) {
4123 tdp->
tableName = generatePhysicalTableName(logical_table_name, i);
4125 createTable(*tdp, cols, shared_dict_defs,
false);
4126 int32_t physical_tb_id = tdp->
tableId;
4129 physicalTables.push_back(physical_tb_id);
4132 if (!physicalTables.empty()) {
4136 logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
4137 CHECK(it_ok.second);
4140 updateLogicalToPhysicalTableMap(logical_tb_id);
4147 const auto physical_tables = getPhysicalTablesDescriptors(td);
4148 for (
const auto table : physical_tables) {
4149 doTruncateTable(table);
4155 removeFragmenterForTable(td->
tableId);
4157 const int tableId = td->
tableId;
4158 ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4163 dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4166 std::unique_ptr<StringDictionaryClient> client;
4168 CHECK(!string_dict_hosts_.empty());
4169 DictRef dict_ref(currentDB_.dbId, -1);
4174 for (
const auto& columnDescriptor : columnDescriptorMapById_) {
4175 auto cd = columnDescriptor.second;
4182 const DictRef dict_ref(currentDB_.dbId, dict_id);
4183 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
4184 CHECK(dictIt != dictDescriptorMapByRef_.end());
4185 const auto& dd = dictIt->second;
4188 if (dd->refcount == 1) {
4190 dd->stringDict.reset();
4193 client->drop(dd->dictRef);
4195 if (!dd->dictIsTemp) {
4196 boost::filesystem::create_directory(dd->dictFolderPath);
4207 dictDescriptorMapByRef_.erase(dictIt);
4212 dictDescriptorMapByRef_[new_dd->
dictRef].reset(new_dd);
4220 for (
auto col_id = 0; col_id < td.
nColumns; ++col_id) {
4221 if (
auto it = columnDescriptorMapById_.find({td.
tableId, col_id});
4222 it != columnDescriptorMapById_.end()) {
4223 auto cd = it->second;
4226 DictRef dict_ref(currentDB_.dbId, dict_id);
4227 if (
auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4228 dict_it != dictDescriptorMapByRef_.end()) {
4230 dict_it->second->stringDict =
nullptr;
4232 getMetadataForDict(dict_id,
true);
4239 void Catalog::invalidateCachesForTable(
const int table_id) {
4242 ChunkKey const table_key{getDatabaseId(), table_id};
4243 auto td = getMutableMetadataForTableUnlocked(table_id);
4249 refreshDictionaryCachesForTableUnlocked(*td);
4253 if (td->fragmenter !=
nullptr) {
4254 auto tableDescIt = tableDescriptorMapById_.find(table_id);
4255 CHECK(tableDescIt != tableDescriptorMapById_.end());
4256 tableDescIt->second->fragmenter =
nullptr;
4257 CHECK(td->fragmenter ==
nullptr);
4261 if (dynamic_cast<foreign_storage::ForeignTable*>(td)) {
4262 dataMgr_->getPersistentStorageMgr()->getForeignStorageMgr()->refreshTable(table_key,
4265 dataMgr_->removeMutableTableDiskCacheData(currentDB_.dbId, table_id);
4267 instantiateFragmenter(td);
4270 void Catalog::removeFragmenterForTable(
const int table_id)
const {
4272 auto td = getMetadataForTable(table_id,
false);
4273 if (td->fragmenter !=
nullptr) {
4274 auto tableDescIt = tableDescriptorMapById_.find(table_id);
4275 CHECK(tableDescIt != tableDescriptorMapById_.end());
4276 tableDescIt->second->fragmenter =
nullptr;
4277 CHECK(td->fragmenter ==
nullptr);
4282 void Catalog::removeChunks(
const int table_id)
const {
4283 removeFragmenterForTable(table_id);
4286 ChunkKey chunkKey = {currentDB_.dbId, table_id};
4295 std::vector<const TableDescriptor*> tables_to_drop;
4298 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->
tableId);
4299 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4301 const auto physicalTables = physicalTableIt->second;
4302 CHECK(!physicalTables.empty());
4303 for (
size_t i = 0; i < physicalTables.size(); i++) {
4304 int32_t physical_tb_id = physicalTables[i];
4306 getMutableMetadataForTableUnlocked(physical_tb_id);
4308 tables_to_drop.emplace_back(phys_td);
4311 tables_to_drop.emplace_back(td);
4314 for (
auto table : tables_to_drop) {
4315 eraseTablePhysicalData(table);
4317 deleteTableCatalogMetadata(td, tables_to_drop);
4320 void Catalog::deleteTableCatalogMetadata(
4322 const std::vector<const TableDescriptor*>& physical_tables) {
4325 sqliteConnector_.query(
"BEGIN TRANSACTION");
4328 sqliteConnector_.query_with_text_param(
4329 "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
4331 logicalToPhysicalTableMapById_.erase(logical_table->
tableId);
4332 for (
auto table : physical_tables) {
4333 eraseTableMetadata(table);
4335 }
catch (std::exception& e) {
4336 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4339 sqliteConnector_.query(
"END TRANSACTION");
4343 executeDropTableSqliteQueries(td);
4345 dropTableFromJsonUnlocked(td->
tableName);
4347 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4355 const int tableId = td->
tableId;
4356 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_tables WHERE tableid = ?",
4358 sqliteConnector_.query_with_text_params(
4359 "select comp_param from mapd_columns where compression = ? and tableid = ?",
4361 int numRows = sqliteConnector_.getNumRows();
4362 std::vector<int> dict_id_list;
4363 for (
int r = 0; r < numRows; ++r) {
4364 dict_id_list.push_back(sqliteConnector_.getData<
int>(r, 0));
4366 for (
auto dict_id : dict_id_list) {
4367 sqliteConnector_.query_with_text_params(
4368 "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
4371 sqliteConnector_.query_with_text_params(
4372 "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
4373 "mapd_columns where compression = ? "
4374 "and tableid = ?) and refcount = 0",
4376 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_columns WHERE tableid = ?",
4379 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_views WHERE tableid = ?",
4383 sqliteConnector_.query_with_text_param(
4384 "DELETE FROM omnisci_foreign_tables WHERE table_id = ?",
std::to_string(tableId));
4388 void Catalog::renamePhysicalTable(
const TableDescriptor* td,
const string& newTableName) {
4392 sqliteConnector_.query(
"BEGIN TRANSACTION");
4394 sqliteConnector_.query_with_text_params(
4395 "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4397 }
catch (std::exception& e) {
4398 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4401 sqliteConnector_.query(
"END TRANSACTION");
4402 TableDescriptorMap::iterator tableDescIt =
4404 CHECK(tableDescIt != tableDescriptorMap_.end());
4405 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4409 tableDescriptorMap_.erase(tableDescIt);
4410 tableDescriptorMap_[
to_upper(newTableName)] = changeTd;
4411 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4419 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->
tableId);
4420 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4421 const auto physicalTables = physicalTableIt->second;
4422 CHECK(!physicalTables.empty());
4423 for (
size_t i = 0; i < physicalTables.size(); i++) {
4424 int32_t physical_tb_id = physicalTables[i];
4427 std::string newPhysTableName =
4428 generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
4429 renamePhysicalTable(phys_td, newPhysTableName);
4432 renamePhysicalTable(td, newTableName);
4438 key.
dbId = currentDB_.dbId;
4441 object.setObjectKey(key);
4444 for (
auto obj : objdescs) {
4454 void Catalog::renamePhysicalTables(
4455 std::vector<std::pair<std::string, std::string>>& names,
4456 std::vector<int>& tableIds) {
4461 for (
size_t i = 0; i < names.size(); i++) {
4462 int tableId = tableIds[i];
4463 const std::string& newTableName = names[i].second;
4464 sqliteConnector_.query_with_text_params(
4465 "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4466 std::vector<std::string>{newTableName,
std::to_string(tableId)});
4470 for (
size_t i = 0; i < names.size(); i++) {
4471 const auto& [curTableName, newTableName] = names[i];
4473 TableDescriptorMap::iterator tableDescIt =
4474 tableDescriptorMap_.find(
to_upper(curTableName));
4475 CHECK(tableDescIt != tableDescriptorMap_.end());
4476 calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4481 tableDescriptorMap_.erase(tableDescIt);
4482 tableDescriptorMap_[
to_upper(newTableName)] = changeTd;
4483 calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4491 const std::map<std::string, int>& cached_table_map,
4492 const std::string& cur_table_name) {
4493 if (
auto it = cached_table_map.find(cur_table_name); it != cached_table_map.end()) {
4494 auto table_id = it->second;
4495 return (table_id == -1) ? NULL : getMetadataForTable(table_id);
4497 return getMetadataForTable(cur_table_name);
4502 const std::string& curTableName,
4503 const std::string& newTableName,
4506 cachedTableMap[curTableName] = -1;
4509 cachedTableMap[newTableName] = tableId;
4513 void Catalog::renameTables(
4514 const std::vector<std::pair<std::string, std::string>>& names) {
4517 std::vector<int> tableIds;
4524 std::map<int, size_t> uniqueOrderedTableIds;
4527 std::map<std::string, int> cachedTableMap;
4532 for (
size_t i = 0; i < names.size(); i++) {
4533 const auto& [curTableName, newTableName] = names[i];
4537 auto td = getCachedTableDescriptor(cachedTableMap, curTableName);
4540 tableIds.push_back(td->tableId);
4541 if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
4543 uniqueOrderedTableIds[td->tableId] = i;
4548 CHECK_EQ(tableIds.size(), names.size());
4562 for (
auto& idPair : uniqueOrderedTableIds) {
4563 const std::string& tableName = names[idPair.second].first;
4564 tableLocks.emplace_back(
4567 *
this, tableName,
false)));
4575 std::vector<std::pair<std::string, std::string>> allNames;
4576 std::vector<int> allTableIds;
4578 for (
size_t i = 0; i < names.size(); i++) {
4579 int tableId = tableIds[i];
4580 const auto& [curTableName, newTableName] = names[i];
4583 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
4584 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4585 const auto physicalTables = physicalTableIt->second;
4586 CHECK(!physicalTables.empty());
4587 for (
size_t k = 0; k < physicalTables.size(); k++) {
4588 int32_t physical_tb_id = physicalTables[k];
4591 std::string newPhysTableName = generatePhysicalTableName(newTableName, (k + 1));
4592 allNames.emplace_back(phys_td->
tableName, newPhysTableName);
4593 allTableIds.push_back(phys_td->
tableId);
4596 allNames.emplace_back(curTableName, newTableName);
4597 allTableIds.push_back(tableId);
4601 execInTransaction(&Catalog::renamePhysicalTables, allNames, allTableIds);
4605 for (
size_t i = 0; i < names.size(); i++) {
4606 int tableId = tableIds[i];
4607 const std::string& newTableName = names[i].second;
4611 key.
dbId = currentDB_.dbId;
4616 object.setObjectKey(key);
4620 for (
auto obj : objdescs) {
4633 const string& newColumnName) {
4636 sqliteConnector_.query(
"BEGIN TRANSACTION");
4641 std::string new_column_name = cdx->columnName;
4642 new_column_name.replace(0, cd->
columnName.size(), newColumnName);
4643 sqliteConnector_.query_with_text_params(
4644 "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
4645 std::vector<std::string>{new_column_name,
4649 }
catch (std::exception& e) {
4650 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4653 sqliteConnector_.query(
"END TRANSACTION");
4654 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4658 ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
4660 CHECK(columnDescIt != columnDescriptorMap_.end());
4663 columnDescriptorMap_.erase(columnDescIt);
4664 columnDescriptorMap_[std::make_tuple(td->
tableId,
to_upper(changeCd->columnName))] =
4667 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4673 sqliteConnector_.query(
"BEGIN TRANSACTION");
4676 sqliteConnector_.query_with_text_params(
4677 "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
4679 if (sqliteConnector_.getNumRows() > 0) {
4680 sqliteConnector_.query_with_text_params(
4681 "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
4683 "datetime('now') where name = ? "
4691 sqliteConnector_.query_with_text_params(
4692 "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
4697 "datetime('now'), ?)",
4704 }
catch (std::exception& e) {
4705 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4708 sqliteConnector_.query(
"END TRANSACTION");
4712 sqliteConnector_.query_with_text_params(
4713 "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
4714 "WHERE name = ? and userid = ?",
4716 vd.
dashboardId = sqliteConnector_.getData<
int>(0, 0);
4717 vd.
updateTime = sqliteConnector_.getData<std::string>(0, 1);
4718 }
catch (std::exception& e) {
4723 addFrontendViewToMap(vd);
4726 if (!isInfoSchemaDb()) {
4728 createOrUpdateDashboardSystemRole(
4738 CHECK(sqliteConnector_.getSqlitePtr());
4739 sqliteConnector_.query(
"BEGIN TRANSACTION");
4741 sqliteConnector_.query_with_text_params(
4742 "SELECT id FROM mapd_dashboards WHERE id = ?",
4744 if (sqliteConnector_.getNumRows() > 0) {
4745 sqliteConnector_.query_with_text_params(
4746 "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
4747 "?, userid = ?, update_time = datetime('now') where id = ? ",
4756 <<
" does not exist in db";
4757 throw runtime_error(
"Error replacing dashboard id " +
4760 }
catch (std::exception& e) {
4761 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4764 sqliteConnector_.query(
"END TRANSACTION");
4767 for (
auto descp : dashboardDescriptorMap_) {
4768 auto dash = descp.second.get();
4771 auto viewDescIt = dashboardDescriptorMap_.find(
std::to_string(dash->userId) +
":" +
4772 dash->dashboardName);
4774 dashboardDescriptorMap_.end()) {
4775 LOG(
ERROR) <<
"No metadata for dashboard for user " << dash->userId
4776 <<
" dashboard " << dash->dashboardName <<
" does not exist in map";
4777 throw runtime_error(
"No metadata for dashboard for user " +
4779 dash->dashboardName +
" does not exist in map");
4781 dashboardDescriptorMap_.erase(viewDescIt);
4787 <<
" does not exist in map";
4789 " does not exist in map");
4793 sqliteConnector_.query_with_text_params(
4794 "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4798 vd.
updateTime = sqliteConnector_.getData<
string>(0, 1);
4801 addFrontendViewToMapNoLock(vd);
4804 if (!isInfoSchemaDb()) {
4806 createOrUpdateDashboardSystemRole(
4811 std::string Catalog::calculateSHA1(
const std::string& data) {
4812 boost::uuids::detail::sha1 sha1;
4813 unsigned int digest[5];
4814 sha1.process_bytes(data.c_str(), data.length());
4815 sha1.get_digest(digest);
4816 std::stringstream ss;
4817 for (
size_t i = 0; i < 5; i++) {
4818 ss << std::hex << digest[i];
4826 sqliteConnector_.query(
"BEGIN TRANSACTION");
4830 sqliteConnector_.query_with_text_params(
4831 "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4833 if (sqliteConnector_.getNumRows() > 0) {
4834 sqliteConnector_.query_with_text_params(
4835 "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4839 sqliteConnector_.query_with_text_params(
4840 "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4841 "update_time) VALUES (?,?,?,?, datetime('now'))",
4842 std::vector<std::string>{
4846 sqliteConnector_.query_with_text_param(
4847 "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4850 ld.
linkId = sqliteConnector_.getData<
int>(0, 0);
4851 ld.
updateTime = sqliteConnector_.getData<std::string>(0, 1);
4852 }
catch (std::exception& e) {
4853 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4856 sqliteConnector_.query(
"END TRANSACTION");
4865 const auto column_descriptors =
4866 getAllColumnMetadataForTable(td->
tableId,
false,
true,
true);
4870 for (
auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4879 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4881 bool populate_fragmenter)
const {
4883 const auto physicalTableIt =
4884 logicalToPhysicalTableMapById_.find(logical_table_desc->
tableId);
4885 if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4886 return {logical_table_desc};
4888 const auto physicalTablesIds = physicalTableIt->second;
4889 CHECK(!physicalTablesIds.empty());
4891 std::vector<const TableDescriptor*> physicalTables;
4892 for (
size_t i = 0; i < physicalTablesIds.size(); i++) {
4893 physicalTables.push_back(
4894 getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4896 return physicalTables;
4899 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4902 std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4903 table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4904 for (
const auto [table_id, td] : tableDescriptorMapById_) {
4906 if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4907 logicalToPhysicalTableMapById_.find(table_id) ==
4908 logicalToPhysicalTableMapById_.end()) {
4909 table_and_shard_ids.emplace_back(table_id, td->shard);
4912 return table_and_shard_ids;
4915 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4918 std::map<int, const ColumnDescriptor*> mapping;
4920 const auto tables = getAllTableMetadata();
4921 for (
const auto td :
tables) {
4922 if (td->shard >= 0) {
4927 for (
auto& cd : getAllColumnMetadataForTable(td->tableId,
false,
false,
true)) {
4928 const auto& ti = cd->columnType;
4929 if (ti.is_string()) {
4932 const auto dict_id = ti.get_comp_param();
4935 if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4936 mapping[dict_id] = cd;
4949 if (td->
shard >= 0) {
4953 switch (get_tables_type) {
4971 std::vector<DBObject> privObjects = {dbObject};
4979 std::vector<std::string> Catalog::getTableNamesForUser(
4984 std::vector<std::string> table_names;
4985 const auto tables = getAllTableMetadata();
4986 for (
const auto td :
tables) {
4987 if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4988 table_names.push_back(td->tableName);
4994 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4997 const std::string& filter_table_name)
const {
5001 std::vector<TableMetadata> tables_metadata;
5002 const auto tables = getAllTableMetadata();
5003 for (
const auto td :
tables) {
5004 if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
5005 if (!filter_table_name.empty()) {
5006 if (td->tableName != filter_table_name) {
5012 tables_metadata.emplace_back(table_metadata);
5015 return tables_metadata;
5018 int Catalog::getLogicalTableId(
const int physicalTableId)
const {
5020 for (
const auto& l : logicalToPhysicalTableMapById_) {
5021 if (l.second.end() != std::find_if(l.second.begin(),
5023 [&](decltype(*l.second.begin()) tid) ->
bool {
5024 return physicalTableId == tid;
5029 return physicalTableId;
5032 void Catalog::checkpoint(
const int logicalTableId)
const {
5033 const auto td = getMetadataForTable(logicalTableId);
5034 const auto shards = getPhysicalTablesDescriptors(td);
5035 for (
const auto shard : shards) {
5036 getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
5040 void Catalog::checkpointWithAutoRollback(
const int logical_table_id)
const {
5041 auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
5043 checkpoint(logical_table_id);
5045 setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
5050 void Catalog::resetTableEpochFloor(
const int logicalTableId)
const {
5052 const auto td = getMetadataForTable(logicalTableId,
false);
5053 const auto shards = getPhysicalTablesDescriptors(td,
false);
5054 for (
const auto shard : shards) {
5055 getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
5059 void Catalog::eraseDbMetadata() {
5060 const auto tables = getAllTableMetadata();
5061 for (
const auto table :
tables) {
5062 eraseTableMetadata(table);
5067 calciteMgr_->updateMetadata(currentDB_.dbName,
"");
5070 void Catalog::eraseDbPhysicalData() {
5071 const auto tables = getAllTableMetadata();
5072 for (
const auto table :
tables) {
5073 eraseTablePhysicalData(table);
5078 const int tableId = td->
tableId;
5080 removeFragmenterForTable(tableId);
5082 ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
5091 dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
5095 std::string Catalog::generatePhysicalTableName(
const std::string& logicalTableName,
5096 const size_t shardNumber) {
5097 std::string physicalTableName =
5098 logicalTableName + physicalTableNameTag_ +
std::to_string(shardNumber);
5099 return (physicalTableName);
5102 void Catalog::buildForeignServerMapUnlocked() {
5104 sqliteConnector_.query(
5105 "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
5106 "omnisci_foreign_servers");
5107 auto num_rows = sqliteConnector_.getNumRows();
5109 for (
size_t row = 0; row < num_rows; row++) {
5110 auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
5111 sqliteConnector_.getData<
int>(row, 0),
5112 sqliteConnector_.getData<std::string>(row, 1),
5113 sqliteConnector_.getData<std::string>(row, 2),
5114 sqliteConnector_.getData<std::string>(row, 3),
5115 sqliteConnector_.getData<std::int32_t>(row, 4),
5116 sqliteConnector_.getData<std::int32_t>(row, 5));
5117 foreignServerMap_[foreign_server->name] = foreign_server;
5118 foreignServerMapById_[foreign_server->id] = foreign_server;
5122 void Catalog::updateForeignTablesInMapUnlocked() {
5124 sqliteConnector_.query(
5125 "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
5126 "omnisci_foreign_tables");
5127 auto num_rows = sqliteConnector_.getNumRows();
5128 for (
size_t r = 0; r < num_rows; r++) {
5129 const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
5130 const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
5131 const auto& options = sqliteConnector_.getData<std::string>(r, 2);
5132 const auto last_refresh_time = sqliteConnector_.getData<int64_t>(r, 3);
5133 const auto next_refresh_time = sqliteConnector_.getData<int64_t>(r, 4);
5135 CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
5136 auto foreign_table =
5138 CHECK(foreign_table);
5139 foreign_table->foreign_server = foreignServerMapById_[server_id].get();
5140 CHECK(foreign_table->foreign_server);
5141 foreign_table->populateOptionsMap(options);
5142 foreign_table->last_refresh_time = last_refresh_time;
5143 foreign_table->next_refresh_time = next_refresh_time;
5144 if (foreign_table->is_system_table) {
5145 foreign_table->is_in_memory_system_table =
5147 foreign_table->foreign_server->data_wrapper_type);
5155 <<
"reloadForeignTable expects a table with valid id";
5156 sqliteConnector_.query(
5157 "SELECT server_id, options, last_refresh_time, next_refresh_time from "
5158 "omnisci_foreign_tables WHERE table_id == " +
5160 auto num_rows = sqliteConnector_.getNumRows();
5161 CHECK_EQ(num_rows, 1U) <<
"Expected single entry in omnisci_foreign_tables for table'"
5162 << foreign_table.
tableName <<
"', instead got " << num_rows;
5163 const auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
5164 const auto& options = sqliteConnector_.getData<std::string>(0, 1);
5165 const auto last_refresh_time = sqliteConnector_.getData<int64_t>(0, 2);
5166 const auto next_refresh_time = sqliteConnector_.getData<int64_t>(0, 3);
5168 foreign_table.
foreign_server = foreignServerMapById_[server_id].get();
5180 void Catalog::reloadDictionariesFromDiskUnlocked() {
5181 std::string dictQuery(
5182 "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
5183 sqliteConnector_.query(dictQuery);
5184 auto numRows = sqliteConnector_.getNumRows();
5185 for (
size_t r = 0; r < numRows; ++r) {
5186 auto dictId = sqliteConnector_.getData<
int>(r, 0);
5187 auto dictName = sqliteConnector_.getData<
string>(r, 1);
5188 auto dictNBits = sqliteConnector_.getData<
int>(r, 2);
5189 auto is_shared = sqliteConnector_.getData<
bool>(r, 3);
5190 auto refcount = sqliteConnector_.getData<
int>(r, 4);
5193 DictRef dict_ref(currentDB_.dbId, dictId);
5194 DictDescriptor dd(dict_ref, dictName, dictNBits, is_shared, refcount, fname,
false);
5195 if (
auto it = dictDescriptorMapByRef_.find(dict_ref);
5196 it == dictDescriptorMapByRef_.end()) {
5197 dictDescriptorMapByRef_[dict_ref] = std::make_unique<DictDescriptor>(dd);
5204 std::list<ColumnDescriptor*> Catalog::sqliteGetColumnsForTableUnlocked(int32_t table_id) {
5205 std::list<ColumnDescriptor*> cds;
5208 std::list<std::unique_ptr<ColumnDescriptor>> smart_cds;
5209 std::string columnQuery(
5210 "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
5211 "is_notnull, compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
5212 "virtual_expr, is_deletedcol, default_value from mapd_columns WHERE tableid = " +
5214 sqliteConnector_.query(columnQuery);
5215 auto numRows = sqliteConnector_.getNumRows();
5216 int32_t skip_physical_cols = 0;
5217 for (
size_t r = 0; r < numRows; ++r) {
5218 std::unique_ptr<ColumnDescriptor> cd = std::make_unique<ColumnDescriptor>();
5219 cd->tableId = sqliteConnector_.getData<
int>(r, 0);
5220 cd->columnId = sqliteConnector_.getData<
int>(r, 1);
5221 cd->columnName = sqliteConnector_.getData<
string>(r, 2);
5222 cd->columnType.set_type((
SQLTypes)sqliteConnector_.getData<
int>(r, 3));
5223 cd->columnType.set_subtype((
SQLTypes)sqliteConnector_.getData<
int>(r, 4));
5224 cd->columnType.set_dimension(sqliteConnector_.getData<
int>(r, 5));
5225 cd->columnType.set_scale(sqliteConnector_.getData<
int>(r, 6));
5226 cd->columnType.set_notnull(sqliteConnector_.getData<
bool>(r, 7));
5227 cd->columnType.set_compression((
EncodingType)sqliteConnector_.getData<
int>(r, 8));
5228 cd->columnType.set_comp_param(sqliteConnector_.getData<
int>(r, 9));
5229 cd->columnType.set_size(sqliteConnector_.getData<
int>(r, 10));
5230 cd->chunks = sqliteConnector_.getData<
string>(r, 11);
5231 cd->isSystemCol = sqliteConnector_.getData<
bool>(r, 12);
5232 cd->isVirtualCol = sqliteConnector_.getData<
bool>(r, 13);
5233 cd->virtualExpr = sqliteConnector_.getData<
string>(r, 14);
5234 cd->isDeletedCol = sqliteConnector_.getData<
bool>(r, 15);
5235 if (sqliteConnector_.isNull(r, 16)) {
5236 cd->default_value = std::nullopt;
5238 cd->default_value = std::make_optional(sqliteConnector_.getData<
string>(r, 16));
5240 cd->isGeoPhyCol = skip_physical_cols-- > 0;
5241 cd->db_id = getDatabaseId();
5243 smart_cds.emplace_back(std::move(cd));
5247 for (
auto& cd : smart_cds) {
5248 cds.emplace_back(cd.release());
5255 "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
5256 "max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, "
5257 "num_shards, key_metainfo, userid, sort_column_id, storage_type, "
5258 "max_rollback_epochs, is_system_table from mapd_tables WHERE tableid = " +
5260 sqliteConnector_.query(query);
5261 auto numRows = sqliteConnector_.getNumRows();
5266 const auto& storage_type = sqliteConnector_.getData<
string>(0, 17);
5268 const auto& table_name = sqliteConnector_.getData<
string>(0, 1);
5269 LOG(
FATAL) <<
"Unable to read Catalog metadata: storage type is currently not a "
5270 "supported table option (table "
5271 << table_name <<
" [" << table_id <<
"] in database " << currentDB_.dbName
5278 std::unique_ptr<TableDescriptor> td;
5280 ? std::make_unique<foreign_storage::ForeignTable>()
5281 : std::make_unique<TableDescriptor>();
5283 td->tableId = sqliteConnector_.getData<
int>(0, 0);
5284 td->tableName = sqliteConnector_.getData<
string>(0, 1);
5285 td->nColumns = sqliteConnector_.getData<
int>(0, 2);
5286 td->isView = sqliteConnector_.getData<
bool>(0, 3);
5287 td->fragments = sqliteConnector_.getData<
string>(0, 4);
5289 sqliteConnector_.getData<
int>(0, 5));
5290 td->maxFragRows = sqliteConnector_.getData<
int>(0, 6);
5291 td->maxChunkSize = sqliteConnector_.getData<int64_t>(0, 7);
5292 td->fragPageSize = sqliteConnector_.getData<
int>(0, 8);
5293 td->maxRows = sqliteConnector_.getData<int64_t>(0, 9);
5294 td->partitions = sqliteConnector_.getData<
string>(0, 10);
5295 td->shardedColumnId = sqliteConnector_.getData<
int>(0, 11);
5296 td->shard = sqliteConnector_.getData<
int>(0, 12);
5297 td->nShards = sqliteConnector_.getData<
int>(0, 13);
5298 td->keyMetainfo = sqliteConnector_.getData<
string>(0, 14);
5299 td->userId = sqliteConnector_.getData<
int>(0, 15);
5300 td->sortedColumnId =
5301 sqliteConnector_.isNull(0, 16) ? 0 : sqliteConnector_.getData<
int>(0, 16);
5302 td->storageType = storage_type;
5303 td->maxRollbackEpochs = sqliteConnector_.getData<
int>(0, 18);
5304 td->is_system_table = sqliteConnector_.getData<
bool>(0, 19);
5305 td->hasDeletedCol =
false;
5308 updateViewUnlocked(*td);
5310 td->fragmenter =
nullptr;
5313 if (
auto ftd = dynamic_cast<foreign_storage::ForeignTable*>(td.get())) {
5314 reloadForeignTableUnlocked(*ftd);
5317 return td.release();
5320 void Catalog::setForeignServerProperty(
const std::string& server_name,
5321 const std::string& property,
5322 const std::string& value) {
5324 sqliteConnector_.query_with_text_params(
5325 "SELECT id from omnisci_foreign_servers where name = ?",
5326 std::vector<std::string>{server_name});
5327 auto num_rows = sqliteConnector_.getNumRows();
5330 auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
5331 sqliteConnector_.query_with_text_params(
5332 "UPDATE omnisci_foreign_servers SET " + property +
" = ? WHERE id = ?",
5335 throw std::runtime_error{
"Can not change property \"" +
property +
5336 "\" for foreign server." +
" Foreign server \"" +
5337 server_name +
"\" is not found."};
5341 void Catalog::createDefaultServersIfNotExists() {
5347 auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
5348 "default_local_delimited",
5352 local_csv_server->validate();
5353 createForeignServerNoLocks(std::move(local_csv_server),
true);
5355 #ifdef ENABLE_IMPORT_PARQUET
5356 auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
5357 "default_local_parquet",
5361 local_parquet_server->validate();
5362 createForeignServerNoLocks(std::move(local_parquet_server),
true);
5365 auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
5366 "default_local_regex_parsed",
5370 local_regex_parser_server->validate();
5371 createForeignServerNoLocks(std::move(local_regex_parser_server),
true);
5375 void Catalog::setForReload(
const int32_t tableId) {
5376 const auto td = getMetadataForTable(tableId);
5377 for (
const auto shard : getPhysicalTablesDescriptors(td)) {
5378 const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
5379 setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
5384 std::vector<std::string> Catalog::getTableDataDirectories(
5386 const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
5387 std::vector<std::string> file_paths;
5388 for (
auto shard : getPhysicalTablesDescriptors(td)) {
5390 global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
5391 boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
5392 file_paths.push_back(file_path.filename().string());
5399 bool file_name_only)
const {
5404 const DictRef dictRef(currentDB_.dbId, dictId);
5405 const auto dit = dictDescriptorMapByRef_.find(dictRef);
5406 CHECK(dit != dictDescriptorMapByRef_.end());
5408 if (file_name_only) {
5409 boost::filesystem::path file_path(dit->second->dictFolderPath);
5410 return file_path.filename().string();
5412 return dit->second->dictFolderPath;
5415 return std::string();
5419 std::vector<std::string> Catalog::getTableDictDirectories(
5421 std::vector<std::string> file_paths;
5422 for (
auto cd : getAllColumnMetadataForTable(td->
tableId,
false,
false,
true)) {
5423 auto file_base = getColumnDictDirectory(cd);
5424 if (!file_base.empty() &&
5425 file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
5426 file_paths.push_back(file_base);
5432 std::set<std::string> Catalog::getTableDictDirectoryPaths(int32_t table_id)
const {
5434 std::set<std::string> directory_paths;
5435 auto it = dict_columns_by_table_id_.find(table_id);
5436 if (it != dict_columns_by_table_id_.end()) {
5437 for (
auto cd : it->second) {
5438 auto directory_path = getColumnDictDirectory(cd,
false);
5439 if (!directory_path.empty()) {
5440 directory_paths.emplace(directory_path);
5444 return directory_paths;
5454 std::ostringstream os;
5455 os <<
"CREATE TABLE @T (";
5457 const auto cds = getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
5459 std::vector<std::string> shared_dicts;
5460 std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5461 for (
const auto cd : cds) {
5462 if (!(cd->isSystemCol || cd->isVirtualCol)) {
5463 const auto& ti = cd->columnType;
5464 os << comma << quoteIfRequired(cd->columnName);
5473 os <<
" " << ti.get_type_name();
5475 os << (ti.get_notnull() ?
" NOT NULL" :
"");
5476 if (cd->default_value.has_value()) {
5477 os <<
" DEFAULT " << cd->getDefaultValueLiteral();
5479 if (ti.is_string() || (ti.is_array() && ti.get_subtype() ==
kTEXT)) {
5480 auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5483 const auto dict_id = ti.get_comp_param();
5484 const DictRef dict_ref(currentDB_.dbId, dict_id);
5485 const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
5486 CHECK(dict_it != dictDescriptorMapByRef_.end());
5487 const auto dict_name = dict_it->second->dictName;
5490 if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
5491 dict_root_cds[dict_name] = cd;
5492 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << (size * 8) <<
")";
5494 const auto dict_root_cd = dict_root_cds[dict_name];
5495 shared_dicts.push_back(
"SHARED DICTIONARY (" + cd->columnName +
5496 ") REFERENCES @T(" + dict_root_cd->columnName +
")");
5501 os <<
" ENCODING NONE";
5503 }
else if (ti.is_date_in_days() ||
5504 (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5505 const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5506 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << comp_param <<
")";
5507 }
else if (ti.is_geometry()) {
5509 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << ti.get_comp_param()
5512 os <<
" ENCODING NONE";
5519 if (shared_dicts.size()) {
5523 std::vector<std::string> with_options;
5528 with_options.emplace_back(td->
hasDeletedCol ?
"VACUUM='DELAYED'"
5529 :
"VACUUM='IMMEDIATE'");
5531 with_options.push_back(
"PARTITIONS='" + td->
partitions +
"'");
5536 os <<
", SHARD KEY(" << shard_cd->columnName <<
")";
5537 with_options.push_back(
5544 with_options.push_back(
"SORT_COLUMN='" + sort_cd->columnName +
"'");
5548 with_options.push_back(
"MAX_ROLLBACK_EPOCHS=" +
5559 return std::find_if(str.begin(), str.end(), [](
const unsigned char& ch) {
5560 return std::isspace(ch);
5566 std::string_view str,
5567 std::string_view chars =
"`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
5568 return str.find_first_of(chars) != std::string_view::npos;
5578 bool multiline_formatting,
5579 bool dump_defaults)
const {
5581 return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5584 std::optional<std::string> Catalog::dumpCreateTable(int32_t table_id,
5585 bool multiline_formatting,
5586 bool dump_defaults)
const {
5588 const auto td = getMutableMetadataForTableUnlocked(table_id);
5592 return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5596 bool multiline_formatting,
5597 bool dump_defaults)
const {
5599 std::ostringstream os;
5602 os <<
"CREATE FOREIGN TABLE " << td->
tableName <<
" (";
5603 }
else if (!td->
isView) {
5614 std::vector<std::string> additional_info;
5615 std::set<std::string> shared_dict_column_names;
5617 gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
5620 const auto cds = getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
5621 std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5623 for (
const auto cd : cds) {
5624 if (!(cd->isSystemCol || cd->isVirtualCol)) {
5625 const auto& ti = cd->columnType;
5628 if (!multiline_formatting) {
5634 if (multiline_formatting) {
5638 os << quoteIfRequired(cd->columnName);
5647 os <<
" " << ti.get_type_name();
5649 os << (ti.get_notnull() ?
" NOT NULL" :
"");
5650 if (cd->default_value.has_value()) {
5651 os <<
" DEFAULT " << cd->getDefaultValueLiteral();
5653 if (shared_dict_column_names.find(cd->columnName) ==
5654 shared_dict_column_names.end()) {
5657 if (ti.is_string() || (ti.is_array() && ti.get_subtype() ==
kTEXT)) {
5658 auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5660 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << (size * 8) <<
")";
5662 os <<
" ENCODING NONE";
5664 }
else if (ti.is_date_in_days() ||
5665 (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5666 const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5667 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << comp_param <<
")";
5668 }
else if (ti.is_geometry()) {
5670 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << ti.get_comp_param()
5673 os <<
" ENCODING NONE";
5680 if (additional_info.size()) {
5682 if (!multiline_formatting) {
5692 std::vector<std::string> with_options;
5694 if (multiline_formatting) {
5699 os <<
"SERVER " << foreign_table->foreign_server->name;
5702 for (
const auto& [option, value] : foreign_table->options) {
5703 with_options.emplace_back(option +
"='" + value +
"'");
5721 with_options.push_back(
"MAX_ROLLBACK_EPOCHS=" +
5724 if (!foreign_table && (dump_defaults || !td->
hasDeletedCol)) {
5725 with_options.emplace_back(td->
hasDeletedCol ?
"VACUUM='DELAYED'"
5726 :
"VACUUM='IMMEDIATE'");
5728 if (!foreign_table && !td->
partitions.empty()) {
5729 with_options.push_back(
"PARTITIONS='" + td->
partitions +
"'");
5731 if (!foreign_table && td->
nShards > 0) {
5734 with_options.push_back(
5741 with_options.push_back(
"SORT_COLUMN='" + sort_cd->columnName +
"'");
5744 if (!with_options.empty()) {
5745 if (!multiline_formatting) {
5756 std::string Catalog::dumpCreateServer(
const std::string&
name,
5757 bool multiline_formatting)
const {
5759 auto server_it = foreignServerMap_.find(name);
5760 if (server_it == foreignServerMap_.end()) {
5761 throw std::runtime_error(
"Foreign server " + name +
" does not exist.");
5763 auto server = server_it->second.get();
5764 std::ostringstream os;
5765 os <<
"CREATE SERVER " << name <<
" FOREIGN DATA WRAPPER " << server->data_wrapper_type;
5766 std::vector<std::string> with_options;
5767 for (
const auto& [option, value] : server->options) {
5768 with_options.emplace_back(option +
"='" + value +
"'");
5770 if (!with_options.empty()) {
5771 if (!multiline_formatting) {
5782 bool Catalog::validateNonExistentTableOrView(
const std::string&
name,
5783 const bool if_not_exists) {
5784 if (getMetadataForTable(name,
false)) {
5785 if (if_not_exists) {
5788 throw std::runtime_error(
"Table or View with name \"" + name +
"\" already exists.");
5793 std::vector<const TableDescriptor*> Catalog::getAllForeignTablesForRefresh()
const {
5795 std::vector<const TableDescriptor*>
tables;
5796 for (
auto entry : tableDescriptorMapById_) {
5797 auto table_descriptor = entry.second;
5800 CHECK(foreign_table);
5801 auto timing_type_entry = foreign_table->options.find(
5803 CHECK(timing_type_entry != foreign_table->options.end());
5805 if (timing_type_entry->second ==
5807 foreign_table->next_refresh_time <= current_time) {
5808 tables.emplace_back(foreign_table);
5815 void Catalog::updateForeignTableRefreshTimes(
const int32_t table_id) {
5818 CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
5819 auto table_descriptor = tableDescriptorMapById_.find(table_id)->second;
5820 CHECK(table_descriptor);
5822 CHECK(foreign_table);
5825 sqliteConnector_.query_with_text_params(
5826 "UPDATE omnisci_foreign_tables SET last_refresh_time = ?, next_refresh_time = ? "
5827 "WHERE table_id = ?",
5831 foreign_table->last_refresh_time = last_refresh_time;
5832 foreign_table->next_refresh_time = next_refresh_time;
5837 void Catalog::setForeignTableOptions(
const std::string& table_name,
5839 bool clear_existing_options) {
5842 auto foreign_table = getForeignTableUnlocked(table_name);
5843 auto saved_options = foreign_table->options;
5844 foreign_table->populateOptionsMap(std::move(options_map), clear_existing_options);
5846 foreign_table->validateOptionValues();
5847 }
catch (
const std::exception& e) {
5850 foreign_table->options = saved_options;
5853 setForeignTableProperty(
5854 foreign_table,
"options", foreign_table->getOptionsAsJsonString());
5858 const std::string& property,
5859 const std::string& value) {
5861 sqliteConnector_.query_with_text_params(
5862 "SELECT table_id from omnisci_foreign_tables where table_id = ?",
5864 auto num_rows = sqliteConnector_.getNumRows();
5867 sqliteConnector_.query_with_text_params(
5868 "UPDATE omnisci_foreign_tables SET " + property +
" = ? WHERE table_id = ?",
5871 throw std::runtime_error{
"Can not change property \"" +
property +
5872 "\" for foreign table." +
" Foreign table \"" +
5877 std::string Catalog::quoteIfRequired(
const std::string& column_name)
const {
5888 void Catalog::gatherAdditionalInfo(std::vector<std::string>& additional_info,
5889 std::set<std::string>& shared_dict_column_names,
5893 auto scd = columnDescriptorMapById_.find(columnIdKey)->second;
5895 std::string txt =
"SHARD KEY (" + quoteIfRequired(scd->columnName) +
")";
5896 additional_info.emplace_back(txt);
5898 const auto cds = getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
5899 for (
const auto cd : cds) {
5900 if (!(cd->isSystemCol || cd->isVirtualCol)) {
5909 DictRef dict_ref(currentDB_.dbId, dictId);
5910 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
5911 if (dictIt == dictDescriptorMapByRef_.end()) {
5912 LOG(
ERROR) <<
"missing dictionary " << dictId <<
" for table " << td->
tableName;
5916 const auto& dd = dictIt->second;
5917 if (dd->refcount > 1) {
5918 auto lowest_table = td->
tableId;
5919 auto lowest_column = cd->columnId;
5920 std::string lowest_column_name;
5923 for (
auto const& [key, val] : columnDescriptorMap_) {
5925 val->columnType.get_comp_param() == dictId &&
5926 !(val->tableId == td->
tableId && val->columnId == cd->columnId)) {
5927 if (val->tableId < lowest_table) {
5928 lowest_table = val->tableId;
5929 lowest_column = val->columnId;
5930 lowest_column_name = val->columnName;
5932 if (val->columnId < lowest_column) {
5933 lowest_column = val->columnId;
5934 lowest_column_name = val->columnName;
5938 if (lowest_table != td->
tableId || lowest_column != cd->columnId) {
5940 auto lowest_td = tableDescriptorMapById_.find(lowest_table)->second;
5942 std::string txt =
"SHARED DICTIONARY (" + quoteIfRequired(cd->columnName) +
5943 ") REFERENCES " + lowest_td->tableName +
"(" +
5944 quoteIfRequired(lowest_column_name) +
")";
5946 additional_info.emplace_back(txt);
5947 shared_dict_column_names.insert(cd->columnName);
5954 int32_t Catalog::createCustomExpression(
5955 std::unique_ptr<CustomExpression> custom_expression) {
5958 sqliteConnector_.query(
"BEGIN TRANSACTION");
5959 int32_t custom_expression_id{-1};
5961 auto data_source_type_str =
5962 CustomExpression::dataSourceTypeToString(custom_expression->data_source_type);
5963 auto data_source_id_str =
std::to_string(custom_expression->data_source_id);
5964 std::string custom_expr_select_query{
5965 "SELECT id FROM omnisci_custom_expressions WHERE name = ? and data_source_type = "
5966 "? and data_source_id = ? and is_deleted = ?"};
5967 std::vector<std::string> custom_expr_select_params{custom_expression->name,
5968 data_source_type_str,
5971 sqliteConnector_.query_with_text_params(custom_expr_select_query,
5972 custom_expr_select_params);
5973 if (sqliteConnector_.getNumRows() > 0) {
5974 throw std::runtime_error{
5975 "A custom expression with the given "
5976 "name and data source already exists."};
5978 sqliteConnector_.query_with_text_params(
5979 "INSERT INTO omnisci_custom_expressions(name, expression_json, "
5980 "data_source_type, data_source_id, is_deleted) VALUES (?,?,?,?,?)",
5981 std::vector<std::string>{custom_expression->name,
5982 custom_expression->expression_json,
5983 data_source_type_str,
5986 sqliteConnector_.query_with_text_params(custom_expr_select_query,
5987 custom_expr_select_params);
5988 CHECK_EQ(sqliteConnector_.getNumRows(),
static_cast<size_t>(1));
5989 custom_expression->id = sqliteConnector_.getData<int32_t>(0, 0);
5990 custom_expression_id = custom_expression->id;
5991 CHECK(custom_expr_map_by_id_.find(custom_expression->id) ==
5992 custom_expr_map_by_id_.end());
5993 custom_expr_map_by_id_[custom_expression->id] = std::move(custom_expression);
5994 }
catch (std::exception& e) {
5995 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
5998 sqliteConnector_.query(
"END TRANSACTION");
6000 return custom_expression_id;
6005 auto it = custom_expr_map_by_id_.find(custom_expression_id);
6006 if (it != custom_expr_map_by_id_.end()) {
6007 return it->second.get();
6012 const std::unique_ptr<const CustomExpression> Catalog::getCustomExpressionFromStorage(
6013 int32_t custom_expression_id) {
6015 sqliteConnector_.query_with_text_params(
6016 "SELECT id, name, expression_json, data_source_type, data_source_id, "
6017 "is_deleted FROM omnisci_custom_expressions WHERE id = ?",
6018 std::vector<std::string>{
to_string(custom_expression_id)});
6019 if (sqliteConnector_.getNumRows() > 0) {
6020 CHECK_EQ(sqliteConnector_.getNumRows(),
static_cast<size_t>(1));
6021 return getCustomExpressionFromConnector(0);
6026 std::vector<const CustomExpression*> Catalog::getCustomExpressionsForUser(
6028 std::vector<const CustomExpression*> all_custom_expressions;
6033 for (
const auto& [
id, custom_expression] : custom_expr_map_by_id_) {
6034 all_custom_expressions.emplace_back(custom_expression.get());
6038 std::vector<const CustomExpression*> filtered_custom_expressions;
6039 for (
const auto custom_expression : all_custom_expressions) {
6040 CHECK(custom_expression->data_source_type == DataSourceType::TABLE);
6042 db_object.loadKey(*
this);
6045 filtered_custom_expressions.emplace_back(custom_expression);
6048 return filtered_custom_expressions;
6051 void Catalog::updateCustomExpression(int32_t custom_expression_id,
6052 const std::string& expression_json) {
6055 auto it = custom_expr_map_by_id_.find(custom_expression_id);
6056 if (it == custom_expr_map_by_id_.end() || it->second->is_deleted) {
6057 throw std::runtime_error{
"Custom expression with id \"" +
6060 auto old_expression_json = it->second->expression_json;
6061 sqliteConnector_.query(
"BEGIN TRANSACTION");
6063 sqliteConnector_.query_with_text_params(
6064 "SELECT id FROM omnisci_custom_expressions WHERE id = ?",
6066 CHECK_EQ(sqliteConnector_.getNumRows(),
static_cast<size_t>(1));
6067 sqliteConnector_.query_with_text_params(
6068 "UPDATE omnisci_custom_expressions SET expression_json = ? WHERE id = ?",
6069 std::vector<std::string>{expression_json,
std::to_string(custom_expression_id)});
6070 it->second->expression_json = expression_json;
6071 }
catch (std::exception& e) {
6072 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
6073 it->second->expression_json = old_expression_json;
6076 sqliteConnector_.query(
"END TRANSACTION");
6079 void Catalog::deleteCustomExpressions(
const std::vector<int32_t>& custom_expression_ids,
6080 bool do_soft_delete) {
6084 std::vector<int32_t> invalid_ids;
6085 for (
const auto id : custom_expression_ids) {
6086 if (custom_expr_map_by_id_.find(
id) == custom_expr_map_by_id_.end()) {
6087 invalid_ids.emplace_back(
id);
6090 if (!invalid_ids.empty()) {
6091 throw std::runtime_error{
"Custom expressions with ids: " +
join(invalid_ids,
",") +
6094 sqliteConnector_.query(
"BEGIN TRANSACTION");
6096 for (
const auto id : custom_expression_ids) {
6097 sqliteConnector_.query_with_text_params(
6098 "SELECT id FROM omnisci_custom_expressions WHERE id = ?",
6100 CHECK_EQ(sqliteConnector_.getNumRows(),
static_cast<size_t>(1));
6101 if (do_soft_delete) {
6102 sqliteConnector_.query_with_text_params(
6103 "UPDATE omnisci_custom_expressions SET is_deleted = ? WHERE id = ?",
6106 sqliteConnector_.query_with_text_params(
6107 "DELETE FROM omnisci_custom_expressions WHERE id = ?",
6112 for (
const auto id : custom_expression_ids) {
6113 if (do_soft_delete) {
6114 auto it = custom_expr_map_by_id_.find(
id);
6115 CHECK(it != custom_expr_map_by_id_.end());
6116 it->second->is_deleted =
true;
6118 custom_expr_map_by_id_.erase(
id);
6121 }
catch (std::exception& e) {
6122 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
6125 sqliteConnector_.query(
"END TRANSACTION");
6132 throw std::runtime_error{
"User with username \"" + user_name +
"\" does not exist."};
6139 int32_t new_owner_id,
6140 const std::map<int32_t, std::vector<DBObject>>& old_owner_db_objects) {
6141 std::stringstream
result;
6142 for (
const auto& [old_owner_id, db_objects] : old_owner_db_objects) {
6143 result <<
"db_id: " << db_id <<
", new_owner_user_id: " << new_owner_id
6144 <<
", old_owner_user_id: " << old_owner_id <<
", db_objects: [";
6145 bool first_object{
true};
6146 for (
const auto& db_object : db_objects) {
6148 first_object =
false;
6152 result <<
"\"object_id: " << db_object.getObjectKey().objectId
6157 return result.str();
6164 std::map<int32_t, std::vector<DBObject>>& db_objects) {
6165 DBObject db_object{object_name, object_type};
6167 db_objects[user_id].emplace_back(db_object);
6171 void Catalog::reassignOwners(
const std::set<std::string>& old_owners,
6172 const std::string& new_owner) {
6173 CHECK(!old_owners.empty());
6175 std::map<int32_t, std::string> old_owners_user_name_by_id;
6176 std::set<int32_t> old_owner_ids;
6177 for (
const auto& old_owner : old_owners) {
6179 if (old_owner_id != new_owner_id) {
6180 old_owner_ids.emplace(old_owner_id);
6181 old_owners_user_name_by_id[old_owner_id] = old_owner;
6187 if (old_owner_ids.empty()) {
6191 std::map<int32_t, std::vector<DBObject>> old_owner_db_objects;
6195 sqliteConnector_.query(
"BEGIN TRANSACTION");
6197 for (
const auto old_user_id : old_owner_ids) {
6198 sqliteConnector_.query_with_text_params(
6199 "UPDATE mapd_tables SET userid = ? WHERE userid = ?",
6203 sqliteConnector_.query_with_text_params(
6204 "UPDATE mapd_dashboards SET userid = ? WHERE userid = ?",
6209 sqliteConnector_.query_with_text_params(
6210 "UPDATE omnisci_foreign_servers SET owner_user_id = ? "
6211 "WHERE owner_user_id = ?",
6217 for (
const auto& [table_name, td] : tableDescriptorMap_) {
6224 old_owner_db_objects);
6230 old_owner_db_objects);
6232 td->userId = new_owner_id;
6237 for (
auto it = dashboardDescriptorMap_.begin();
6238 it != dashboardDescriptorMap_.end();) {
6239 if (
auto dashboard = it->second;
6243 old_owner_db_objects[dashboard->userId].emplace_back(db_object);
6249 dashboard->dashboardName};
6252 dashboard->dashboardName};
6253 CHECK(dashboardDescriptorMap_.find(new_key) == dashboardDescriptorMap_.end());
6254 new_owner_dashboard_map[new_key] = dashboard;
6255 dashboard->userId = new_owner_id;
6256 dashboard->user = new_owner;
6257 it = dashboardDescriptorMap_.erase(it);
6262 dashboardDescriptorMap_.merge(new_owner_dashboard_map);
6265 for (
const auto& [server_name, server] : foreignServerMap_) {
6271 old_owner_db_objects);
6272 server->user_id = new_owner_id;
6278 for (
auto& [old_owner_id, db_objects] : old_owner_db_objects) {
6279 for (
auto& db_object : db_objects) {
6280 db_object.loadKey(*
this);
6281 CHECK_EQ(db_object.getOwner(), new_owner_id);
6282 const auto& object_key = db_object.getObjectKey();
6283 CHECK_EQ(object_key.dbId, getDatabaseId());
6287 }
catch (std::exception& e) {
6288 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
6289 restoreOldOwnersInMemory(
6290 old_owners_user_name_by_id, old_owner_db_objects, new_owner_id);
6293 sqliteConnector_.query(
"END TRANSACTION");
6298 old_owner_db_objects, new_owner_id, *
this);
6299 }
catch (std::exception& e) {
6300 restoreOldOwners(old_owners_user_name_by_id, old_owner_db_objects, new_owner_id);
6305 void Catalog::restoreOldOwners(
6306 const std::map<int32_t, std::string>& old_owners_user_name_by_id,
6307 const std::map<int32_t, std::vector<DBObject>>& old_owner_db_objects,
6308 int32_t new_owner_id) {
6311 sqliteConnector_.query(
"BEGIN TRANSACTION");
6313 for (
const auto& [old_owner_id, db_objects] : old_owner_db_objects) {
6314 for (
const auto& db_object : db_objects) {
6315 auto object_id = db_object.getObjectKey().objectId;
6317 std::vector<std::string> query_params{
std::to_string(old_owner_id),
6320 auto object_type = db_object.getType();
6323 sqliteConnector_.query_with_text_params(
6324 "UPDATE mapd_tables SET userid = ? WHERE userid = ? AND tableid = ?",
6327 sqliteConnector_.query_with_text_params(
6328 "UPDATE mapd_dashboards SET userid = ? WHERE userid = ? AND id = ?",
6332 sqliteConnector_.query_with_text_params(
6333 "UPDATE omnisci_foreign_servers SET owner_user_id = ? "
6334 "WHERE owner_user_id = ? AND id = ?",
6337 UNREACHABLE() <<
"Unexpected DB object type: " <<
static_cast<int>(object_type);
6341 restoreOldOwnersInMemory(
6342 old_owners_user_name_by_id, old_owner_db_objects, new_owner_id);
6343 }
catch (std::exception& e) {
6344 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
6346 <<
"Unable to restore database objects ownership after an error occurred. "
6347 "Database object ownership information may be in an inconsistent state. " +
6349 getDatabaseId(), new_owner_id, old_owner_db_objects);
6351 sqliteConnector_.query(
"END TRANSACTION");
6354 void Catalog::restoreOldOwnersInMemory(
6355 const std::map<int32_t, std::string>& old_owners_user_name_by_id,
6356 const std::map<int32_t, std::vector<DBObject>>& old_owner_db_objects,
6357 int32_t new_owner_id) {
6358 for (
const auto& [old_owner_id, db_objects] : old_owner_db_objects) {
6359 for (
const auto& db_object : db_objects) {
6360 auto object_id = db_object.getObjectKey().objectId;
6361 auto object_type = db_object.getType();
6364 auto it = tableDescriptorMapById_.find(object_id);
6365 CHECK(it != tableDescriptorMapById_.end());
6367 it->second->userId = old_owner_id;
6369 auto it = dashboardDescriptorMap_.find(
std::to_string(new_owner_id) +
":" +
6370 db_object.getName());
6371 CHECK(it != dashboardDescriptorMap_.end());
6373 it->second->userId = old_owner_id;
6374 auto user_name_it = old_owners_user_name_by_id.find(old_owner_id);
6375 CHECK(user_name_it != old_owners_user_name_by_id.end());
6376 it->second->user = user_name_it->second;
6378 db_object.getName()] = it->second;
6379 dashboardDescriptorMap_.erase(it);
6381 auto it = foreignServerMapById_.find(object_id);
6382 CHECK(it != foreignServerMapById_.end());
6384 it->second->user_id = old_owner_id;
6386 UNREACHABLE() <<
"Unexpected DB object type: " <<
static_cast<int>(object_type);
6392 void Catalog::conditionallyInitializeSystemObjects() {
6394 initializeSystemServers();
6395 initializeSystemTables();
6399 bool Catalog::isInfoSchemaDb()
const {
6403 void Catalog::initializeSystemServers() {
6404 createSystemTableServer(CATALOG_SERVER_NAME,
6406 createSystemTableServer(MEMORY_STATS_SERVER_NAME,
6408 createSystemTableServer(STORAGE_STATS_SERVER_NAME,
6410 createSystemTableServer(EXECUTOR_STATS_SERVER_NAME,
6412 createSystemTableServer(ML_METADATA_SERVER_NAME,
6422 createSystemTableServer(LOGS_SERVER_NAME,
6424 log_server_options);
6435 sql_type_info.set_size(-1);
6436 return sql_type_info;
6442 sql_type_info.set_comp_param(32);
6443 return sql_type_info;
6449 foreign_table.
options[ForeignTable::REFRESH_TIMING_TYPE_KEY] =
6450 ForeignTable::SCHEDULE_REFRESH_TIMING_TYPE;
6453 foreign_table.
options[ForeignTable::REFRESH_START_DATE_TIME_KEY] =
6455 foreign_table.
options[ForeignTable::REFRESH_INTERVAL_KEY] =
6458 foreign_table.
options[ForeignTable::REFRESH_TIMING_TYPE_KEY] =
6459 ForeignTable::MANUAL_REFRESH_TIMING_TYPE;
6461 foreign_table.
options[ForeignTable::REFRESH_UPDATE_TYPE_KEY] =
6462 ForeignTable::APPEND_REFRESH_UPDATE_TYPE;
6464 foreign_table.
options[AbstractFileStorageDataWrapper::ALLOW_FILE_ROLL_OFF_KEY] =
"TRUE";
6471 foreign_table.
options[RegexFileBufferParser::LINE_START_REGEX_KEY] =
6472 "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}";
6474 foreign_table.
options[AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY] =
6475 ".*heavydb\\.INFO\\..*";
6489 for (
const auto& table_name : table_names) {
6499 void Catalog::initializeSystemTables() {
6500 initializeUsersSystemTable();
6501 initializeDatabasesSystemTable();
6502 initializePermissionsSystemTable();
6503 initializeRolesSystemTable();
6504 initializeTablesSystemTable();
6505 initializeDashboardsSystemTable();
6506 initializeRoleAssignmentsSystemTable();
6507 initializeMemorySummarySystemTable();
6508 initializeMemoryDetailsSystemTable();
6509 initializeStorageDetailsSystemTable();
6510 initializeExecutorResourcePoolSummarySystemTable();
6511 initializeMLModelMetadataSystemTable();
6514 initializeServerLogsSystemTables();
6515 initializeRequestLogsSystemTables();
6522 if (getForeignServer(LOGS_SERVER_NAME)) {
6523 dropForeignServer(LOGS_SERVER_NAME);
6528 void Catalog::initializeUsersSystemTable() {
6529 auto [foreign_table, columns] =
6531 CATALOG_SERVER_NAME,
6532 {{
"user_id", {
kINT}},
6535 {
"default_db_id", {
kINT}},
6539 recreateSystemTableIfUpdated(foreign_table, columns);
6542 void Catalog::initializeDatabasesSystemTable() {
6543 auto [foreign_table, columns] =
6545 CATALOG_SERVER_NAME,
6546 {{
"database_id", {
kINT}},
6548 {
"owner_id", {
kINT}},
6551 recreateSystemTableIfUpdated(foreign_table, columns);
6554 void Catalog::initializePermissionsSystemTable() {
6555 auto [foreign_table, columns] =
6557 CATALOG_SERVER_NAME,
6560 {
"database_id", {
kINT}},
6563 {
"object_id", {
kINT}},
6564 {
"object_owner_id", {
kINT}},
6569 recreateSystemTableIfUpdated(foreign_table, columns);
6572 void Catalog::initializeRolesSystemTable() {
6573 auto [foreign_table, columns] =
6575 CATALOG_SERVER_NAME,
6578 recreateSystemTableIfUpdated(foreign_table, columns);
6581 void Catalog::initializeTablesSystemTable() {
6582 auto [foreign_table, columns] =
6584 CATALOG_SERVER_NAME,
6585 {{
"database_id", {
kINT}},
6587 {
"table_id", {
kINT}},
6589 {
"owner_id", {
kINT}},
6591 {
"column_count", {
kINT}},
6594 {
"max_fragment_size", {
kINT}},
6595 {
"max_chunk_size", {
kBIGINT}},
6596 {
"fragment_page_size", {
kINT}},
6598 {
"max_rollback_epochs", {
kINT}},
6599 {
"shard_count", {
kINT}},
6602 recreateSystemTableIfUpdated(foreign_table, columns);
6605 void Catalog::initializeDashboardsSystemTable() {
6606 auto [foreign_table, columns] =
6608 CATALOG_SERVER_NAME,
6609 {{
"database_id", {
kINT}},
6611 {
"dashboard_id", {
kINT}},
6613 {
"owner_id", {
kINT}},
6618 recreateSystemTableIfUpdated(foreign_table, columns);
6621 void Catalog::initializeRoleAssignmentsSystemTable() {
6622 auto [foreign_table, columns] = getSystemTableSchema(
6624 CATALOG_SERVER_NAME,
6627 recreateSystemTableIfUpdated(foreign_table, columns);
6630 void Catalog::initializeMemorySummarySystemTable() {
6631 auto [foreign_table, columns] =
6633 MEMORY_STATS_SERVER_NAME,
6635 {
"device_id", {
kINT}},
6637 {
"max_page_count", {
kBIGINT}},
6639 {
"allocated_page_count", {
kBIGINT}},
6640 {
"used_page_count", {
kBIGINT}},
6641 {
"free_page_count", {
kBIGINT}}},
6643 recreateSystemTableIfUpdated(foreign_table, columns);
6646 void Catalog::initializeMemoryDetailsSystemTable() {
6647 auto [foreign_table, columns] =
6649 MEMORY_STATS_SERVER_NAME,
6651 {
"database_id", {
kINT}},
6653 {
"table_id", {
kINT}},
6655 {
"column_id", {
kINT}},
6658 {
"device_id", {
kINT}},
6663 {
"slab_id", {
kINT}},
6665 {
"last_touch_epoch", {
kBIGINT}}},
6667 recreateSystemTableIfUpdated(foreign_table, columns);
6670 void Catalog::initializeStorageDetailsSystemTable() {
6671 auto [foreign_table, columns] =
6673 STORAGE_STATS_SERVER_NAME,
6675 {
"database_id", {
kINT}},
6677 {
"table_id", {
kINT}},
6680 {
"epoch_floor", {
kINT}},
6681 {
"fragment_count", {
kINT}},
6682 {
"shard_id", {
kINT}},
6683 {
"data_file_count", {
kINT}},
6684 {
"metadata_file_count", {
kINT}},
6685 {
"total_data_file_size", {
kBIGINT}},
6686 {
"total_data_page_count", {
kBIGINT}},
6687 {
"total_free_data_page_count", {
kBIGINT}},
6688 {
"total_metadata_file_size", {
kBIGINT}},
6689 {
"total_metadata_page_count", {
kBIGINT}},
6690 {
"total_free_metadata_page_count", {
kBIGINT}},
6691 {
"total_dictionary_data_file_size", {
kBIGINT}}},
6693 recreateSystemTableIfUpdated(foreign_table, columns);
6696 void Catalog::initializeExecutorResourcePoolSummarySystemTable() {
6697 auto [foreign_table, columns] =
6699 EXECUTOR_STATS_SERVER_NAME,
6700 {{
"total_cpu_slots", {
kINT}},
6701 {
"total_gpu_slots", {
kINT}},
6702 {
"total_cpu_result_mem", {
kBIGINT}},
6703 {
"total_cpu_buffer_pool_mem", {
kBIGINT}},
6704 {
"total_gpu_buffer_pool_mem", {
kBIGINT}},
6705 {
"allocated_cpu_slots", {
kINT}},
6706 {
"allocated_gpu_slots", {
kINT}},
6707 {
"allocated_cpu_result_mem", {
kBIGINT}},
6708 {
"allocated_cpu_buffer_pool_mem", {
kBIGINT}},
6709 {
"allocated_gpu_buffer_pool_mem", {
kBIGINT}},
6710 {
"allocated_cpu_buffers", {
kINT}},
6711 {
"allocated_gpu_buffers", {
kINT}},
6712 {
"allocated_temp_cpu_buffer_pool_mem", {
kBIGINT}},
6713 {
"allocated_temp_gpu_buffer_pool_mem", {
kBIGINT}},
6714 {
"total_requests", {
kBIGINT}},
6715 {
"outstanding_requests", {
kINT}},
6716 {
"outstanding_cpu_slots_requests", {
kINT}},
6717 {
"outstanding_gpu_slots_requests", {
kINT}},
6718 {
"outstanding_cpu_result_mem_requests", {
kINT}},
6719 {
"outstanding_cpu_buffer_pool_mem_requests", {
kINT}},
6720 {
"outstanding_gpu_buffer_pool_mem_requests", {
kINT}}},
6722 recreateSystemTableIfUpdated(foreign_table, columns);
6725 void Catalog::initializeMLModelMetadataSystemTable() {
6726 auto [foreign_table, columns] =
6728 ML_METADATA_SERVER_NAME,
6734 {
"num_logical_features", {
kBIGINT}},
6735 {
"num_physical_features", {
kBIGINT}},
6736 {
"num_categorical_features", {
kBIGINT}},
6737 {
"num_numeric_features", {
kBIGINT}},
6738 {
"train_fraction", {
kDOUBLE}},
6739 {
"eval_fraction", {
kDOUBLE}}},
6741 recreateSystemTableIfUpdated(foreign_table, columns);
6744 void Catalog::initializeServerLogsSystemTables() {
6745 auto [foreign_table, columns] =
6751 {
"process_id", {
kINT}},
6752 {
"query_id", {
kINT}},
6753 {
"thread_id", {
kINT}},
6761 foreign_table.options[RegexFileBufferParser::LINE_REGEX_KEY] =
6762 "^([^\\s]+)\\s(\\w)\\s(\\d+)\\s(\\d+)\\s(\\d+)\\s([^\\s]+)\\s(.+)$";
6763 if (recreateSystemTableIfUpdated(foreign_table, columns)) {
6769 void Catalog::initializeRequestLogsSystemTables() {
6770 auto [foreign_table, columns] =
6775 {
"process_id", {
kINT}},
6776 {
"query_id", {
kINT}},
6777 {
"thread_id", {
kINT}},
6780 {
"request_duration_ms", {
kBIGINT}},
6786 {
"dashboard_id", {
kINT}},
6788 {
"chart_id", {
kINT}},
6789 {
"execution_time_ms", {
kBIGINT}},
6790 {
"total_time_ms", {
kBIGINT}}},
6796 foreign_table.options[RegexFileBufferParser::LINE_REGEX_KEY] =
6797 "^([^\\s]+)\\s(\\w)\\s(\\d+)\\s(\\d+)\\s(\\d+)\\s([^\\s]+)\\s(?:stdlog)\\s(\\w+)"
6798 "\\s(?:\\d+)\\s(\\d+)\\s(\\w+)\\s([^\\s]+)\\s([^\\s]+)\\s(\\{[^\\}]+\\})\\s(\\{[^"
6800 if (recreateSystemTableIfUpdated(foreign_table, columns)) {
6806 void Catalog::initializeWebServerLogsSystemTables() {
6807 auto [foreign_table, columns] =
6816 foreign_table.options[AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY] =
6817 ".*heavy_web_server.*ALL\\..*";
6821 foreign_table.options[RegexFileBufferParser::LINE_REGEX_KEY] =
6822 "^time=\"([^\"]+)\"\\slevel=([^\\s]+)\\smsg=\"([^\"]+)\"$";
6823 if (recreateSystemTableIfUpdated(foreign_table, columns)) {
6829 void Catalog::initializeWebServerAccessLogsSystemTables() {
6830 auto [foreign_table, columns] =
6838 {
"response_size", {
kBIGINT}}},
6842 foreign_table.options[AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY] =
6843 ".*heavy_web_server.*ACCESS\\..*";
6847 foreign_table.options[RegexFileBufferParser::LINE_REGEX_KEY] =
6848 "^(\\d+\\.\\d+\\.\\d+\\.\\d+)\\s+\\-\\s+\\-\\s+\\[([^\\]]+)\\]\\s+\"(\\w+)\\s+([^"
6849 "\\s]+)\\s+HTTP\\/1\\.1\"\\s+(\\d+)\\s+(\\d+)$";
6850 if (recreateSystemTableIfUpdated(foreign_table, columns)) {
6856 void Catalog::createSystemTableServer(
const std::string& server_name,
6857 const std::string& data_wrapper_type,
6859 auto server = std::make_unique<foreign_storage::ForeignServer>(
6862 auto stored_server = getForeignServer(server_name);
6863 if (stored_server && stored_server->options != server->options) {
6865 auto tables = getAllForeignTablesForForeignServer(stored_server->id);
6866 for (
const auto table :
tables) {
6867 LOG(
INFO) <<
"Dropping existing \"" << table->tableName <<
"\" system table for \""
6868 << server_name <<
"\" foreign server.";
6871 LOG(
INFO) <<
"Dropping existing \"" << server_name
6872 <<
"\" system table foreign server.";
6873 dropForeignServer(server_name);
6874 stored_server =
nullptr;
6876 if (!stored_server) {
6877 LOG(
INFO) <<
"Creating a new \"" << server_name <<
"\" system table foreign server.";
6878 createForeignServer(std::move(server),
true);
6882 std::pair<foreign_storage::ForeignTable, std::list<ColumnDescriptor>>
6883 Catalog::getSystemTableSchema(
6884 const std::string& table_name,
6885 const std::string& server_name,
6886 const std::vector<std::pair<std::string, SQLTypeInfo>>& column_type_by_name,
6887 bool is_in_memory_system_table) {
6890 foreign_table.
nColumns = column_type_by_name.size();
6891 foreign_table.
isView =
false;
6909 list<ColumnDescriptor> columns;
6910 for (
const auto& [column_name, column_type] : column_type_by_name) {
6911 columns.emplace_back();
6912 auto& cd = columns.back();
6913 cd.columnName = column_name;
6914 cd.columnType = column_type;
6915 cd.isSystemCol =
false;
6916 cd.isVirtualCol =
false;
6918 return {foreign_table, columns};
6922 const std::list<ColumnDescriptor>& columns) {
6923 auto stored_td = getMetadataForTable(foreign_table.
tableName,
false);
6924 bool should_recreate{
false};
6926 auto stored_foreign_table =
6928 CHECK(stored_foreign_table);
6929 if (stored_foreign_table->foreign_server->name !=
6931 stored_foreign_table->options != foreign_table.
options) {
6932 should_recreate =
true;
6934 auto stored_columns =
6935 getAllColumnMetadataForTable(stored_td->tableId,
false,
false,
false);
6936 if (stored_columns.size() != columns.size()) {
6937 should_recreate =
true;
6939 auto it_1 = stored_columns.begin();
6940 auto it_2 = columns.begin();
6941 for (; it_1 != stored_columns.end() && it_2 != columns.end(); it_1++, it_2++) {
6945 if ((*it_1)->columnName != it_2->columnName ||
6946 (*it_1)->columnType.get_type() != it_2->columnType.get_type() ||
6947 (*it_1)->columnType.get_subtype() != it_2->columnType.get_subtype() ||
6948 (*it_1)->columnType.get_dimension() != it_2->columnType.get_dimension() ||
6949 (*it_1)->columnType.get_scale() != it_2->columnType.get_scale() ||
6950 (*it_1)->columnType.get_notnull() != it_2->columnType.get_notnull() ||
6951 (*it_1)->columnType.get_compression() !=
6952 it_2->columnType.get_compression() ||
6953 (*it_1)->columnType.get_size() != it_2->columnType.get_size()) {
6954 should_recreate =
true;
6961 should_recreate =
true;
6963 if (should_recreate) {
6966 <<
"\" system table.";
6967 deleteTableCatalogMetadata(stored_td, {stored_td});
6969 LOG(
INFO) <<
"Creating a new \"" << foreign_table.
tableName <<
"\" system table.";
6970 createTable(foreign_table, columns, {},
true);
6972 return should_recreate;
6978 CHECK(table_name_opt.has_value());
6979 return table_name_opt.value();
6988 auto dict_it = dict_columns_by_table_id_.find(cd->
tableId);
6989 if (dict_it != dict_columns_by_table_id_.end()) {
6990 auto& set = dict_it->second;
6991 for (
auto it = set.begin(); it != set.end(); ++it) {
6992 if ((*it)->columnId == cd->
columnId) {
6999 dict_columns_by_table_id_[cd->
tableId].emplace(cd);
7002 removeColumnDescriptor(old_cd);
7003 addColumnDescriptor(cd);
7011 dict_columns_by_table_id_[cd->
tableId].emplace(cd);
7017 dict_columns_by_table_id_[cd->
tableId].erase(cd);
7024 template <
typename F,
typename... Args>
7025 void Catalog::execInTransaction(F&&
f, Args&&...
args) {
7028 sqliteConnector_.query(
"BEGIN TRANSACTION");
7030 (this->*
f)(std::forward<Args>(
args)...);
7031 }
catch (std::exception&) {
7032 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
7035 sqliteConnector_.query(
"END TRANSACTION");
std::lock_guard< T > lock_guard
static constexpr const char * MEMORY_DETAILS_SYS_TABLE_NAME
bool contains(const T &container, const U &element)
int32_t maxRollbackEpochs
int64_t get_next_refresh_time(const foreign_storage::ForeignTable &foreign_table)
const Parser::SharedDictionaryDef compress_reference_path(Parser::SharedDictionaryDef cur_node, const std::vector< Parser::SharedDictionaryDef > &shared_dict_defs)
static std::set< std::string > reserved_keywords
void removeFromColumnMap(ColumnDescriptor *cd)
static constexpr const char * WS_SERVER_ACCESS_LOGS_SYS_TABLE_NAME
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
const int MAPD_TEMP_TABLE_START_ID
std::vector< int > ChunkKey
std::vector< std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * >>> LockedTableDescriptors
static constexpr const char * SERVER_LOGS_SYS_TABLE_NAME
specifies the content in-memory of a row in the link metadata view
static constexpr const char * EXECUTOR_RESOURCE_POOL_SUMMARY_SYS_TABLE_NAME
int64_t next_refresh_time
static constexpr char const * REGEX_PARSER
void add_db_object(const std::string &object_name, DBObjectType object_type, int32_t user_id, const AccessPrivileges &privileges, std::map< int32_t, std::vector< DBObject >> &db_objects)
void CheckAndExecuteMigrations()
const std::string kDataDirectoryName
std::string dictFolderPath
std::tuple< int, std::string > ColumnKey
HOST DEVICE int get_size() const
static TableSchemaLockMgr & instance()
~Catalog()
Destructor - deletes all ColumnDescriptor and TableDescriptor structures which were allocated on the ...
SQLTypeInfo get_encoded_text_type()
SQLTypeInfo get_var_array_type(SQLTypes type)
T getData(const int row, const int col)
class for a per-database catalog. also includes metadata for the current database and the current use...
ColumnDescriptorMap columnDescriptorMap_
static const AccessPrivileges ALL_DATABASE
static TimeT::rep execution(F func, Args &&...args)
bool g_enable_logs_system_tables
void updateFrontendViewAndLinkUsers()
static bool dropRenderGroupColumns(const Catalog_Namespace::TableDescriptorMapById &table_descriptors_by_id, Catalog_Namespace::Catalog *cat)
virtual void query_with_text_params(std::string const &query_only)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
int64_t last_refresh_time
static const std::string LOCAL_FILE_STORAGE_TYPE
static constexpr char const * INTERNAL_STORAGE_STATS
Data_Namespace::DataMgr & getDataMgr() const
void set_common_log_system_table_options(foreign_storage::ForeignTable &foreign_table)
std::string convert_temporal_to_iso_format(const SQLTypeInfo &type_info, int64_t unix_time)
void clearForTablePrefix(const ChunkKey &)
static constexpr const char * DASHBOARDS_SYS_TABLE_NAME
SqliteConnector sqliteConnector_
const DBMetadata currentDB_
DictDescriptorMapById dictDescriptorMapByRef_
HOST DEVICE int get_scale() const
PersistentStorageMgr * getPersistentStorageMgr() const
void updateDictionarySchema()
#define DEFAULT_MAX_CHUNK_SIZE
void updateDictionaryNames()
std::shared_ptr< Data_Namespace::DataMgr > dataMgr_
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
thread_holding_write_lock()
HOST DEVICE void set_subtype(SQLTypes st)
DEVICE void sort(ARGS &&...args)
std::shared_ptr< Catalog > getDummyCatalog()
const std::string & get_foreign_table() const
virtual void query(const std::string &queryString)
fs::path get_log_dir_path()
void setObjectKey(const DBObjectKey &objectKey)
auto table_json_filepath(const std::string &base_path, const std::string &db_name)
const int MAPD_TEMP_DICT_START_ID
bool g_enable_logs_system_tables_auto_refresh
The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order...
void checkDateInDaysColumnMigration()
void clear_cached_table_data(const Data_Namespace::DataMgr *data_mgr, int32_t db_id, int32_t table_id)
int32_t max_rollback_epochs
void setPrivileges(const AccessPrivileges &privs)
bool contains_spaces(std::string_view str)
returns true if the string contains one or more spaces
void reloadCatalogMetadataUnlocked(const std::map< int32_t, std::string > &user_name_by_user_id)
HOST DEVICE SQLTypes get_type() const
const std::string kInfoSchemaDbName
dsqliteMutex_(std::make_unique< heavyai::DistributedSharedMutex >(std::filesystem::path(basePath_)/shared::kLockfilesDirectoryName/shared::kCatalogDirectoryName/(currentDB_.dbName+".sqlite.lockfile")))
static const AccessPrivileges SELECT_FROM_TABLE
std::vector< int > columnIdBySpi_
static constexpr char const * INTERNAL_CATALOG
void updateFrontendViewSchema()
DeletedColumnPerTableMap deletedColumnPerTable_
const ColumnDescriptor * get_foreign_col(const Catalog &cat, const Parser::SharedDictionaryDef &shared_dict_def)
ColumnDescriptorMapById columnDescriptorMapById_
DBObject * findDbObject(const DBObjectKey &objectKey, bool only_direct) const
std::shared_ptr< std::mutex > mutex_
static constexpr std::array< char const *, 5 > IN_MEMORY_DATA_WRAPPERS
bool contains_sql_reserved_chars(std::string_view str, std::string_view chars="`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?")
returns true if the string contains one or more OmniSci SQL reserved characters
bool is_in_memory_system_table
std::string dashboardMetadata
static const AccessPrivileges ALL_VIEW
void grantRoleBatch(const std::vector< std::string > &roles, const std::vector< std::string > &grantees)
static constexpr const char * REQUEST_LOGS_SYS_TABLE_NAME
void recordOwnershipOfObjectsInObjectPermissions()
void updateFrontendViewsToDashboards()
std::string dashboardSystemRoleName
This file contains the class specification and related data structures for Catalog.
void set_common_db_log_system_table_options(foreign_storage::ForeignTable &foreign_table)
foreign_storage::ForeignStorageCache * getDiskCache() const
int get_physical_cols() const
std::string dashboardState
static constexpr const char * ROLES_SYS_TABLE_NAME
static SysCatalog & instance()
This file contains the class specification and related data structures for SysCatalog.
static const std::string STORAGE_TYPE_KEY
std::string get_user_name_from_id(int32_t id, const std::map< int32_t, std::string > &user_name_by_user_id)
Classes representing a parse tree.
static int64_t getNextRefreshTime(const std::map< std::string, std::string, std::less<>> &foreign_table_options)
const DBMetadata & getCurrentDB() const
bool g_enable_system_tables
static const std::string getForeignTableSchema(bool if_not_exists=false)
std::string g_logs_system_tables_refresh_interval
#define INJECT_TIMER(DESC)
void populateOptionsMap(OptionsMap &&options_map, bool clear=false)
static constexpr char const * INTERNAL_ML_MODEL_METADATA
const std::string & get_foreign_column() const
void dropTable(const TableDescriptor *td)
std::string table_epochs_to_string(const std::vector< TableEpochInfo > &table_epochs)
void updateDefaultColumnValues()
std::vector< std::string > parse_underlying_dashboard_objects(const std::string &meta)
Encapsulates an enumeration of foreign data wrapper type strings.
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
static const int32_t MAPD_VERSION
Role * getRoleGrantee(const std::string &name) const
static const std::string getForeignServerSchema(bool if_not_exists=false)
int getDatabaseId() const
static const AccessPrivileges ALL_SERVER
void set_dict_key(ColumnDescriptor &cd)
static constexpr const char * MEMORY_SUMMARY_SYS_TABLE_NAME
TableDescriptorMapById tableDescriptorMapById_
void setOwner(int32_t userId)
bool is_dict_encoded_type() const
std::string get_checked_table_name(const Catalog *catalog, const ColumnDescriptor *cd)
specifies the content in-memory of a row in the column metadata table
#define DEFAULT_MAX_ROLLBACK_EPOCHS
specifies the content in-memory of a row in the table metadata table
static constexpr const char * ROLE_ASSIGNMENTS_SYS_TABLE_NAME
void updateDeletedColumnIndicator()
std::list< UserMetadata > getAllUserMetadata()
void buildDictionaryMapUnlocked()
static constexpr const char * USERS_SYS_TABLE_NAME
void createOrUpdateDashboardSystemRole(const std::string &view_meta, const int32_t &user_id, const std::string &dash_role_name)
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
void replace_cached_table_name(std::map< std::string, int > &cachedTableMap, const std::string &curTableName, const std::string &newTableName, int tableId)
bool g_serialize_temp_tables
static constexpr char const * INTERNAL_EXECUTOR_STATS
std::string DBObjectTypeToString(DBObjectType type)
struct dict_ref_t DictRef
static constexpr const char * DATABASES_SYS_TABLE_NAME
#define DEFAULT_PAGE_SIZE
const DBObjectMap * getDbObjects(bool only_direct) const
void set_comp_param(int p)
void updateFixlenArrayColumns()
static constexpr const char * TABLES_SYS_TABLE_NAME
const int DEFAULT_INITIAL_VERSION
std::optional< std::string > default_value
V & get_from_map(std::map< K, V, comp > &map, const K &key)
static const std::string physicalTableNameTag_
int32_t g_distributed_leaf_idx
void reloadTableMetadata(int table_id)
HOST DEVICE EncodingType get_compression() const
static const AccessPrivileges SELECT_FROM_VIEW
bool table_is_temporary(const TableDescriptor *const td)
static constexpr const char * ML_MODEL_METADATA_SYS_TABLE_NAME
void set_dimension(int d)
#define DEFAULT_FRAGMENT_ROWS
void setStringDictKey(const shared::StringDictKey &dict_key)
std::map< std::string, std::shared_ptr< DashboardDescriptor >> DashboardDescriptorMap
static const std::string BASE_PATH_KEY
static const std::string getCustomExpressionsSchema(bool if_not_exists=false)
static constexpr const char * PERMISSIONS_SYS_TABLE_NAME
Fragmenter_Namespace::FragmenterType fragType
static constexpr char const * INTERNAL_MEMORY_STATS
Data_Namespace::MemoryLevel persistenceLevel
const Catalog * getObjForLock()
HOST DEVICE int get_dimension() const
static constexpr const char * STORAGE_DETAILS_SYS_TABLE_NAME
std::string convert_object_owners_map_to_string(int32_t db_id, int32_t new_owner_id, const std::map< int32_t, std::vector< DBObject >> &old_owner_db_objects)
static const AccessPrivileges ALL_DASHBOARD
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
bool checkDropRenderGroupColumnsMigration()
std::string dashboardName
static const AccessPrivileges ALL_TABLE
std::string data_wrapper_type
const std::string kCatalogDirectoryName
HOST DEVICE int get_comp_param() const
int32_t g_distributed_num_leaves
void updateCustomExpressionsSchema()
const ForeignServer * foreign_server
static constexpr const char * REFRESH_TIMING_TYPE_KEY
std::unique_ptr< heavyai::DistributedSharedMutex > dsqliteMutex_
static constexpr char const * CSV
std::map< std::string, std::string, std::less<>> OptionsMap
std::optional< std::string > getTableName(int32_t table_id) const
void reloadTableMetadataUnlocked(int table_id)
static void migrateDateInDaysMetadata(const Catalog_Namespace::TableDescriptorMapById &table_descriptors_by_id, const int database_id, Catalog_Namespace::Catalog *cat, SqliteConnector &sqlite)
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Descriptor for a dictionary for a string columne.
void updateLogicalToPhysicalTableLinkSchema()
static constexpr int NULL_REFRESH_TIME
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
static int64_t getCurrentTime()
std::unordered_map< std::string, std::vector< std::string > > getGranteesOfSharedDashboards(const std::vector< std::string > &dashboard_ids)
const std::string kLockfilesDirectoryName
void populateRoleDbObjects(const std::vector< DBObject > &objects)
static const AccessPrivileges DELETE_DASHBOARD
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
void drop_tables(Catalog &catalog, const std::vector< std::string > &table_names)
HOST DEVICE bool get_notnull() const
int32_t validate_and_get_user_id(const std::string &user_name)
void renameLegacyDataWrappers()
void updateTableDescriptorSchema()
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_<EPOCH>_<oldname>.
void reloadDictionariesFromDiskUnlocked()
static constexpr char const * FOREIGN_TABLE
bool is_string_array() const
int32_t max_rollback_epochs
static constexpr char const * INTERNAL_LOGS
static constexpr const char * WS_SERVER_LOGS_SYS_TABLE_NAME
SQLTypeInfo get_var_encoded_text_array_type()
bool is_reserved_sql_keyword(std::string_view str)
returns true if the string equals an OmniSci SQL reserved keyword
thread_holding_sqlite_lock()
std::map< int32_t, std::string > get_user_id_to_user_name_map()
static constexpr char const * PARQUET
virtual void renameDbObject(const DBObject &object)
virtual size_t getNumRows() const
A selection of helper methods for File I/O.
LogicalToPhysicalTableMapById logicalToPhysicalTableMapById_
Catalog()
Constructor builds a hollow catalog used during constructor of other catalogs.
TableDescriptorMap tableDescriptorMap_
static thread_local bool thread_holds_read_lock
void CheckAndExecuteMigrationsPostBuildMaps()
static constexpr const char * SCHEDULE_REFRESH_TIMING_TYPE
std::string generate_dashboard_system_rolename(const std::string &db_id, const std::string &dash_id)
std::tuple< int, int > ColumnIdKey
void createDashboardSystemRoles()
void updateLogicalToPhysicalTableMap(const int32_t logical_tb_id)
HOST DEVICE void set_type(SQLTypes t)