OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "Catalog/Catalog.h"
24 
25 #include <algorithm>
26 #include <boost/algorithm/string/predicate.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/range/adaptor/map.hpp>
29 #include <boost/version.hpp>
30 #include <cassert>
31 #include <cerrno>
32 #include <cstdio>
33 #include <cstring>
34 #include <exception>
35 #include <fstream>
36 #include <list>
37 #include <memory>
38 #include <random>
39 #include <regex>
40 #include <sstream>
41 
42 #if BOOST_VERSION >= 106600
43 #include <boost/uuid/detail/sha1.hpp>
44 #else
45 #include <boost/uuid/sha1.hpp>
46 #endif
47 #include <rapidjson/document.h>
48 #include <rapidjson/istreamwrapper.h>
49 #include <rapidjson/ostreamwrapper.h>
50 #include <rapidjson/writer.h>
51 
52 #include "Catalog/SysCatalog.h"
53 
54 #include "QueryEngine/Execute.h"
56 
63 #include "Fragmenter/Fragmenter.h"
65 #include "LockMgr/LockMgr.h"
68 #include "Parser/ParserNode.h"
69 #include "QueryEngine/Execute.h"
71 #include "RefreshTimeCalculator.h"
72 #include "Shared/DateTimeParser.h"
73 #include "Shared/File.h"
74 #include "Shared/StringTransform.h"
75 #include "Shared/SysDefinitions.h"
76 #include "Shared/measure.h"
77 #include "Shared/misc.h"
79 
80 #include "MapDRelease.h"
81 #include "RWLocks.h"
83 
84 #include "Shared/distributed.h"
85 
86 using Chunk_NS::Chunk;
89 using std::list;
90 using std::map;
91 using std::pair;
92 using std::runtime_error;
93 using std::string;
94 using std::vector;
95 
96 bool g_enable_fsi{true};
97 bool g_enable_s3_fsi{false};
102 // 10 minutes refresh interval by default
104 extern bool g_cache_string_hash;
105 extern bool g_enable_system_tables;
106 
107 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
108 // under unit testing.
110 
111 namespace Catalog_Namespace {
112 
113 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
115  1073741824; // 2^30, give room for over a billion non-temp tables
117  1073741824; // 2^30, give room for over a billion non-temp dictionaries
118 
119 const std::string Catalog::physicalTableNameTag_("_shard_#");
120 
121 thread_local bool Catalog::thread_holds_read_lock = false;
122 
127 
128 // migration will be done as two step process this release
129 // will create and use new table
130 // next release will remove old table, doing this to have fall back path
131 // incase of migration failure
134  sqliteConnector_.query("BEGIN TRANSACTION");
135  try {
137  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
138  if (sqliteConnector_.getNumRows() != 0) {
139  // already done
140  sqliteConnector_.query("END TRANSACTION");
141  return;
142  }
144  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
145  "userid integer references mapd_users, state text, image_hash text, update_time "
146  "timestamp, "
147  "metadata text, UNIQUE(userid, name) )");
148  // now copy content from old table to new table
150  "insert into mapd_dashboards (id, name , "
151  "userid, state, image_hash, update_time , "
152  "metadata) "
153  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
154  "view_metadata "
155  "from mapd_frontend_views");
156  } catch (const std::exception& e) {
157  sqliteConnector_.query("ROLLBACK TRANSACTION");
158  throw;
159  }
160  sqliteConnector_.query("END TRANSACTION");
161 }
162 
163 namespace {
164 
165 inline auto table_json_filepath(const std::string& base_path,
166  const std::string& db_name) {
167  return boost::filesystem::path(base_path + "/" + shared::kCatalogDirectoryName + "/" +
168  db_name + "_temp_tables.json");
169 }
170 
171 std::map<int32_t, std::string> get_user_id_to_user_name_map();
172 } // namespace
173 
175 
176 Catalog::Catalog(const string& basePath,
177  const DBMetadata& curDB,
178  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
179  const std::vector<LeafHostInfo>& string_dict_hosts,
180  std::shared_ptr<Calcite> calcite,
181  bool is_new_db)
182  : basePath_(basePath)
183  , sqliteConnector_(curDB.dbName, basePath + "/" + shared::kCatalogDirectoryName + "/")
184  , currentDB_(curDB)
185  , dataMgr_(dataMgr)
186  , string_dict_hosts_(string_dict_hosts)
187  , calciteMgr_(calcite)
188  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
189  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
190  , dcatalogMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
191  std::filesystem::path(basePath_) / shared::kLockfilesDirectoryName /
192  shared::kCatalogDirectoryName / (currentDB_.dbName + ".lockfile"),
193  [this](size_t) {
194  if (!initialized_) {
195  return;
196  }
197  const auto user_name_by_user_id = get_user_id_to_user_name_map();
199  *dsqliteMutex_);
200  reloadCatalogMetadataUnlocked(user_name_by_user_id);
201  }))
202  , dsqliteMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
203  std::filesystem::path(basePath_) / shared::kLockfilesDirectoryName /
204  shared::kCatalogDirectoryName / (currentDB_.dbName + ".sqlite.lockfile")))
205  , sqliteMutex_()
206  , sharedMutex_()
209  if (!g_enable_fsi) {
210  CHECK(!g_enable_system_tables) << "System tables require FSI to be enabled";
211  CHECK(!g_enable_s3_fsi) << "S3 FSI requires FSI to be enabled";
212  }
213 
214  if (!is_new_db) {
215  CheckAndExecuteMigrations();
216  }
217 
218  buildMaps();
219 
220  if (g_enable_fsi) {
221  createDefaultServersIfNotExists();
222  }
223  if (!is_new_db) {
224  CheckAndExecuteMigrationsPostBuildMaps();
225  }
227  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
228  }
229  conditionallyInitializeSystemObjects();
230  // once all initialized use real object
231  initialized_ = true;
232 }
233 
235  // cat_write_lock write_lock(this);
236 
237  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
238  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
239  tableDescIt != tableDescriptorMap_.end();
240  ++tableDescIt) {
241  tableDescIt->second->fragmenter = nullptr;
242  delete tableDescIt->second;
243  }
244 
245  // TableDescriptorMapById points to the same descriptors. No need to delete
246 
247  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
248  columnDescIt != columnDescriptorMap_.end();
249  ++columnDescIt) {
250  delete columnDescIt->second;
251  }
252 
253  // ColumnDescriptorMapById points to the same descriptors. No need to delete
254 
256  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
257  }
258 }
259 
261  if (initialized_) {
262  return this;
263  } else {
264  return SysCatalog::instance().getDummyCatalog().get();
265  }
266 }
267 
270  sqliteConnector_.query("BEGIN TRANSACTION");
271  try {
272  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
273  std::vector<std::string> cols;
274  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
275  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
276  }
277  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
278  cols.end()) {
279  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
281  sqliteConnector_.query(queryString);
282  }
283  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
284  cols.end()) {
285  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
286  std::to_string(0));
287  sqliteConnector_.query(queryString);
288  }
289  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
290  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
291  std::to_string(-1));
292  sqliteConnector_.query(queryString);
293  }
294  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
295  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
296  std::to_string(0));
297  sqliteConnector_.query(queryString);
298  }
299  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
300  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
301  sqliteConnector_.query(queryString);
302  }
303  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
304  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
306  sqliteConnector_.query(queryString);
307  }
308  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
309  cols.end()) {
311  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
312  }
313  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
314  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
315  sqliteConnector_.query(queryString);
316  }
317  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
318  cols.end()) {
319  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
320  std::to_string(-1));
321  sqliteConnector_.query(queryString);
322  }
323  if (std::find(cols.begin(), cols.end(), std::string("is_system_table")) ==
324  cols.end()) {
325  string queryString("ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
326  sqliteConnector_.query(queryString);
327  }
328  } catch (std::exception& e) {
329  sqliteConnector_.query("ROLLBACK TRANSACTION");
330  throw;
331  }
332  sqliteConnector_.query("END TRANSACTION");
333 }
334 
337  sqliteConnector_.query("BEGIN TRANSACTION");
338  try {
340  "select name from sqlite_master WHERE type='table' AND "
341  "name='mapd_version_history'");
342  if (sqliteConnector_.getNumRows() == 0) {
344  "CREATE TABLE mapd_version_history(version integer, migration_history text "
345  "unique)");
346  } else {
348  "select * from mapd_version_history where migration_history = "
349  "'notnull_fixlen_arrays'");
350  if (sqliteConnector_.getNumRows() != 0) {
351  // legacy fixlen arrays had migrated
352  // no need for further execution
353  sqliteConnector_.query("END TRANSACTION");
354  return;
355  }
356  }
357  // Insert check for migration
359  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
360  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
361  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
362  // Upating all fixlen array columns
363  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
364  std::to_string(kARRAY) + " AND size>0;");
365  sqliteConnector_.query(queryString);
366  } catch (std::exception& e) {
367  sqliteConnector_.query("ROLLBACK TRANSACTION");
368  throw;
369  }
370  sqliteConnector_.query("END TRANSACTION");
371 }
372 
375  sqliteConnector_.query("BEGIN TRANSACTION");
376  try {
378  "select name from sqlite_master WHERE type='table' AND "
379  "name='mapd_version_history'");
380  if (sqliteConnector_.getNumRows() == 0) {
382  "CREATE TABLE mapd_version_history(version integer, migration_history text "
383  "unique)");
384  } else {
386  "select * from mapd_version_history where migration_history = "
387  "'notnull_geo_columns'");
388  if (sqliteConnector_.getNumRows() != 0) {
389  // legacy geo columns had migrated
390  // no need for further execution
391  sqliteConnector_.query("END TRANSACTION");
392  return;
393  }
394  }
395  // Insert check for migration
397  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
398  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
399  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
400  // Upating all geo columns
401  string queryString(
402  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
403  " OR coltype=" + std::to_string(kMULTIPOINT) + " OR coltype=" +
405  " OR coltype=" + std::to_string(kPOLYGON) +
406  " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
407  sqliteConnector_.query(queryString);
408  } catch (std::exception& e) {
409  sqliteConnector_.query("ROLLBACK TRANSACTION");
410  throw;
411  }
412  sqliteConnector_.query("END TRANSACTION");
413 }
414 
417  sqliteConnector_.query("BEGIN TRANSACTION");
418  try {
419  // check table still exists
421  "SELECT name FROM sqlite_master WHERE type='table' AND "
422  "name='mapd_frontend_views'");
423  if (sqliteConnector_.getNumRows() == 0) {
424  // table does not exists
425  // no need to migrate
426  sqliteConnector_.query("END TRANSACTION");
427  return;
428  }
429  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
430  std::vector<std::string> cols;
431  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
432  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
433  }
434  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
435  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
436  }
437  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
438  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
439  }
440  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
441  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
442  }
443  } catch (std::exception& e) {
444  sqliteConnector_.query("ROLLBACK TRANSACTION");
445  throw;
446  }
447  sqliteConnector_.query("END TRANSACTION");
448 }
449 
452  sqliteConnector_.query("BEGIN TRANSACTION");
453  try {
455  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
456  "integer references mapd_users, "
457  "link text unique, view_state text, update_time timestamp, view_metadata text)");
458  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
459  std::vector<std::string> cols;
460  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
461  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
462  }
463  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
464  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
465  }
466  } catch (const std::exception& e) {
467  sqliteConnector_.query("ROLLBACK TRANSACTION");
468  throw;
469  }
470  sqliteConnector_.query("END TRANSACTION");
471 }
472 
475  sqliteConnector_.query("BEGIN TRANSACTION");
476  try {
477  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
478  // check table still exists
480  "SELECT name FROM sqlite_master WHERE type='table' AND "
481  "name='mapd_frontend_views'");
482  if (sqliteConnector_.getNumRows() == 0) {
483  // table does not exists
484  // no need to migrate
485  sqliteConnector_.query("END TRANSACTION");
486  return;
487  }
489  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
490  } catch (const std::exception& e) {
491  sqliteConnector_.query("ROLLBACK TRANSACTION");
492  throw;
493  }
494  sqliteConnector_.query("END TRANSACTION");
495 }
496 
497 // introduce DB version into the tables table
498 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
499 // old value
500 
503  if (currentDB_.dbName.length() == 0) {
504  // updateDictionaryNames dbName length is zero nothing to do here
505  return;
506  }
507  sqliteConnector_.query("BEGIN TRANSACTION");
508  try {
509  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
510  std::vector<std::string> cols;
511  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
512  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
513  }
514  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
515  LOG(INFO) << "Updating mapd_tables updatePageSize";
516  // No version number
517  // need to update the defaul tpagesize to old correct value
518  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
519  // need to add new version info
520  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
522  sqliteConnector_.query(queryString);
523  }
524  } catch (std::exception& e) {
525  sqliteConnector_.query("ROLLBACK TRANSACTION");
526  throw;
527  }
528  sqliteConnector_.query("END TRANSACTION");
529 }
530 
533  sqliteConnector_.query("BEGIN TRANSACTION");
534  try {
535  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
536  std::vector<std::string> cols;
537  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
538  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
539  }
540  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
541  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
542  // need to add new version info
543  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
545  sqliteConnector_.query(queryString);
546  // need to add new column to table defintion to indicate deleted column, column used
547  // as bitmap for deleted rows.
549  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
550  }
551  } catch (std::exception& e) {
552  sqliteConnector_.query("ROLLBACK TRANSACTION");
553  throw;
554  }
555  sqliteConnector_.query("END TRANSACTION");
556 }
557 
560  sqliteConnector_.query("BEGIN TRANSACTION");
561  try {
562  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
563  std::vector<std::string> cols;
564  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
565  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
566  }
567  if (std::find(cols.begin(), cols.end(), std::string("default_value")) == cols.end()) {
568  LOG(INFO) << "Adding support for default values to mapd_columns";
569  sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
570  }
571  } catch (std::exception& e) {
572  sqliteConnector_.query("ROLLBACK TRANSACTION");
573  LOG(ERROR) << "Failed to make metadata update for default values` support";
574  throw;
575  }
576  sqliteConnector_.query("END TRANSACTION");
577 }
578 
579 // introduce DB version into the dictionary tables
580 // if the DB does not have a version rename all dictionary tables
581 
584  if (currentDB_.dbName.length() == 0) {
585  // updateDictionaryNames dbName length is zero nothing to do here
586  return;
587  }
588  sqliteConnector_.query("BEGIN TRANSACTION");
589  try {
590  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
591  std::vector<std::string> cols;
592  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
593  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
594  }
595  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
596  // No version number
597  // need to rename dictionaries
598  string dictQuery("SELECT dictid, name from mapd_dictionaries");
599  sqliteConnector_.query(dictQuery);
600  size_t numRows = sqliteConnector_.getNumRows();
601  for (size_t r = 0; r < numRows; ++r) {
602  int dictId = sqliteConnector_.getData<int>(r, 0);
603  std::string dictName = sqliteConnector_.getData<string>(r, 1);
604 
605  std::string oldName = g_base_path + "/" + shared::kDataDirectoryName + "/" +
606  currentDB_.dbName + "_" + dictName;
607  std::string newName = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
608  std::to_string(currentDB_.dbId) + "_DICT_" +
609  std::to_string(dictId);
610 
611  int result = rename(oldName.c_str(), newName.c_str());
612 
613  if (result == 0) {
614  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
615  << newName;
616  } else {
617  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
618  << newName + " dbname '" << currentDB_.dbName << "' error code "
619  << std::to_string(result);
620  }
621  }
622  // need to add new version info
623  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
625  sqliteConnector_.query(queryString);
626  }
627  } catch (std::exception& e) {
628  sqliteConnector_.query("ROLLBACK TRANSACTION");
629  throw;
630  }
631  sqliteConnector_.query("END TRANSACTION");
632 }
633 
636  sqliteConnector_.query("BEGIN TRANSACTION");
637  try {
639  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
640  "logical_table_id integer, physical_table_id integer)");
641  } catch (const std::exception& e) {
642  sqliteConnector_.query("ROLLBACK TRANSACTION");
643  throw;
644  }
645  sqliteConnector_.query("END TRANSACTION");
646 }
647 
648 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
649  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
650  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
651  */
652 
654  sqliteConnector_.query("BEGIN TRANSACTION");
655  try {
656  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
657  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
658  const auto physicalTables = physicalTableIt->second;
659  CHECK(!physicalTables.empty());
660  for (size_t i = 0; i < physicalTables.size(); i++) {
661  int32_t physical_tb_id = physicalTables[i];
663  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
664  "physical_table_id) VALUES (?1, ?2)",
665  std::vector<std::string>{std::to_string(logical_tb_id),
666  std::to_string(physical_tb_id)});
667  }
668  }
669  } catch (std::exception& e) {
670  sqliteConnector_.query("ROLLBACK TRANSACTION");
671  throw;
672  }
673  sqliteConnector_.query("END TRANSACTION");
674 }
675 
678  sqliteConnector_.query("BEGIN TRANSACTION");
679  try {
680  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
681  std::vector<std::string> cols;
682  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
683  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
684  }
685  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
686  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
687  }
688  } catch (std::exception& e) {
689  sqliteConnector_.query("ROLLBACK TRANSACTION");
690  throw;
691  }
692  sqliteConnector_.query("END TRANSACTION");
693 }
694 
697  sqliteConnector_.query("BEGIN TRANSACTION");
698  try {
701  } catch (std::exception& e) {
702  sqliteConnector_.query("ROLLBACK TRANSACTION");
703  throw;
704  }
705  sqliteConnector_.query("END TRANSACTION");
706 }
707 
709  // TODO: Move common migration logic to a shared function.
711  sqliteConnector_.query("BEGIN TRANSACTION");
712  try {
714  "select name from sqlite_master WHERE type='table' AND "
715  "name='mapd_version_history'");
716  static const std::string migration_name{"rename_legacy_data_wrappers"};
717  if (sqliteConnector_.getNumRows() == 0) {
719  "CREATE TABLE mapd_version_history(version integer, migration_history text "
720  "unique)");
721  } else {
723  "select * from mapd_version_history where migration_history = "
724  "'" +
725  migration_name + "'");
726  if (sqliteConnector_.getNumRows() != 0) {
727  // Migration already done.
728  sqliteConnector_.query("END TRANSACTION");
729  return;
730  }
731  }
732  LOG(INFO) << "Executing " << migration_name << " migration.";
733 
734  // Update legacy data wrapper names
736  // clang-format off
737  std::map<std::string, std::string> old_to_new_wrapper_names{
738  {"OMNISCI_CSV", DataWrapperType::CSV},
739  {"OMNISCI_PARQUET", DataWrapperType::PARQUET},
740  {"OMNISCI_REGEX_PARSER", DataWrapperType::REGEX_PARSER},
741  {"OMNISCI_INTERNAL_CATALOG", DataWrapperType::INTERNAL_CATALOG},
742  {"INTERNAL_OMNISCI_MEMORY_STATS", DataWrapperType::INTERNAL_MEMORY_STATS},
743  {"INTERNAL_OMNISCI_STORAGE_STATS", DataWrapperType::INTERNAL_STORAGE_STATS}
744  };
745  // clang-format on
746 
747  for (const auto& [old_wrapper_name, new_wrapper_name] : old_to_new_wrapper_names) {
749  "UPDATE omnisci_foreign_servers SET data_wrapper_type = ? WHERE "
750  "data_wrapper_type = ?",
751  std::vector<std::string>{new_wrapper_name, old_wrapper_name});
752  }
753 
754  // Record migration.
756  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
757  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
758  LOG(INFO) << migration_name << " migration completed.";
759  } catch (std::exception& e) {
760  sqliteConnector_.query("ROLLBACK TRANSACTION");
761  throw;
762  }
763  sqliteConnector_.query("END TRANSACTION");
764 }
765 
768  sqliteConnector_.query("BEGIN TRANSACTION");
769  try {
771  } catch (const std::exception& e) {
772  sqliteConnector_.query("ROLLBACK TRANSACTION");
773  throw;
774  }
775  sqliteConnector_.query("END TRANSACTION");
776 }
777 
778 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
779  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
780  "omnisci_foreign_servers(id integer primary key, name text unique, " +
781  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
782  "options text)";
783 }
784 
785 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
786  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
787  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
788  "options text, last_refresh_time integer, next_refresh_time integer, " +
789  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
790  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
791 }
792 
793 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
794  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
795  "omnisci_custom_expressions(id integer primary key, name text, " +
796  "expression_json text, data_source_type text, " +
797  "data_source_id integer, is_deleted boolean)";
798 }
799 
802  sqliteConnector_.query("BEGIN TRANSACTION");
803  std::vector<DBObject> objects;
804  try {
806  "SELECT name FROM sqlite_master WHERE type='table' AND "
807  "name='mapd_record_ownership_marker'");
808  // check if mapd catalog - marker exists
809  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
810  // already done
811  sqliteConnector_.query("END TRANSACTION");
812  return;
813  }
814  // check if different catalog - marker exists
815  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
816  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
817  // Check if migration is being performed on existing non mapd catalogs
818  // Older non mapd dbs will have table but no record in them
819  if (sqliteConnector_.getNumRows() != 0) {
820  // already done
821  sqliteConnector_.query("END TRANSACTION");
822  return;
823  }
824  }
825  // marker not exists - create one
826  else {
827  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
828  }
829 
830  DBMetadata db;
831  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
832  // place dbId as a refernce for migration being performed
834  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
835  std::vector<std::string>{std::to_string(db.dbOwner)});
836 
837  static const std::map<const DBObjectType, const AccessPrivileges>
838  object_level_all_privs_lookup{
844 
845  // grant owner all permissions on DB
846  DBObjectKey key;
847  key.dbId = currentDB_.dbId;
848  auto _key_place = [&key](auto type) {
849  key.permissionType = type;
850  return key;
851  };
852  for (auto& it : object_level_all_privs_lookup) {
853  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
854  objects.back().setName(currentDB_.dbName);
855  }
856 
857  {
858  // other users tables and views
859  string tableQuery(
860  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
861  sqliteConnector_.query(tableQuery);
862  size_t numRows = sqliteConnector_.getNumRows();
863  for (size_t r = 0; r < numRows; ++r) {
864  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
865  std::string tableName = sqliteConnector_.getData<string>(r, 1);
866  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
867  bool isview = sqliteConnector_.getData<bool>(r, 3);
868 
871  DBObjectKey key;
872  key.dbId = currentDB_.dbId;
873  key.objectId = tableid;
874  key.permissionType = type;
875 
876  DBObject obj(tableName, type);
877  obj.setObjectKey(key);
878  obj.setOwner(ownerid);
881 
882  objects.push_back(obj);
883  }
884  }
885 
886  {
887  // other users dashboards
888  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
889  sqliteConnector_.query(tableQuery);
890  size_t numRows = sqliteConnector_.getNumRows();
891  for (size_t r = 0; r < numRows; ++r) {
892  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
893  std::string dashName = sqliteConnector_.getData<string>(r, 1);
894  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
895 
897  DBObjectKey key;
898  key.dbId = currentDB_.dbId;
899  key.objectId = dashId;
900  key.permissionType = type;
901 
902  DBObject obj(dashName, type);
903  obj.setObjectKey(key);
904  obj.setOwner(ownerid);
906 
907  objects.push_back(obj);
908  }
909  }
910  } catch (const std::exception& e) {
911  sqliteConnector_.query("ROLLBACK TRANSACTION");
912  throw;
913  }
914  sqliteConnector_.query("END TRANSACTION");
915 
916  // now apply the objects to the syscat to track the permisisons
917  // moved outside transaction to avoid lock in sqlite
918  try {
920  } catch (const std::exception& e) {
921  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
922  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
923  e.what());
924  // will need to remove the mapd_record_ownership_marker table and retry
925  }
926 }
927 
932 }
933 
935  // do not take cat_sqlite_lock here as Catalog functions do that themselves
937 }
938 
940  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
941  std::vector<std::string> dashboard_ids;
942  static const std::string migration_name{"dashboard_roles_migration"};
943  {
945  sqliteConnector_.query("BEGIN TRANSACTION");
946  try {
947  // migration_history should be present in all catalogs by now
948  // if not then would be created before this migration
950  "select * from mapd_version_history where migration_history = '" +
951  migration_name + "'");
952  if (sqliteConnector_.getNumRows() != 0) {
953  // no need for further execution
954  sqliteConnector_.query("END TRANSACTION");
955  return;
956  }
957  LOG(INFO) << "Performing dashboard internal roles Migration.";
958  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
959  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
962  sqliteConnector_.getData<string>(i, 0)))) {
963  // Successfully created roles during previous migration/crash
964  // No need to include them
965  continue;
966  }
967  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
968  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
969  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
970  }
971  } catch (const std::exception& e) {
972  sqliteConnector_.query("ROLLBACK TRANSACTION");
973  throw;
974  }
975  sqliteConnector_.query("END TRANSACTION");
976  }
977  // All current grantees with shared dashboards.
978  const auto active_grantees =
980 
981  try {
982  // NOTE(wamsi): Transactionally unsafe
983  for (auto dash : dashboards) {
984  createOrUpdateDashboardSystemRole(dash.second.second,
985  dash.second.first,
987  std::to_string(currentDB_.dbId), dash.first));
988  auto result = active_grantees.find(dash.first);
989  if (result != active_grantees.end()) {
992  dash.first)},
993  result->second);
994  }
995  }
997  // check if this has already been completed
999  "select * from mapd_version_history where migration_history = '" +
1000  migration_name + "'");
1001  if (sqliteConnector_.getNumRows() != 0) {
1002  return;
1003  }
1005  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
1006  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
1007  } catch (const std::exception& e) {
1008  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
1009  << e.what();
1010  throw;
1011  }
1012  LOG(INFO) << "Successfully created dashboard system roles during migration.";
1013 }
1014 
1016  cat_write_lock write_lock(this);
1020  updateGeoColumns();
1023  updateLinkSchema();
1027  updatePageSize();
1031  if (g_enable_fsi) {
1032  updateFsiSchemas();
1034  }
1037 }
1038 
1042 }
1043 
1044 namespace {
1045 std::map<int32_t, std::string> get_user_id_to_user_name_map() {
1046  auto users = SysCatalog::instance().getAllUserMetadata();
1047  std::map<int32_t, std::string> user_name_by_user_id;
1048  for (const auto& user : users) {
1049  user_name_by_user_id[user.userId] = user.userName;
1050  }
1051  return user_name_by_user_id;
1052 }
1053 
1055  int32_t id,
1056  const std::map<int32_t, std::string>& user_name_by_user_id) {
1057  auto entry = user_name_by_user_id.find(id);
1058  if (entry != user_name_by_user_id.end()) {
1059  return entry->second;
1060  }
1061  // a user could be deleted and a dashboard still exist?
1062  return "Unknown";
1063 }
1064 
1066  CHECK_GT(cd.db_id, 0);
1067  auto& column_type = cd.columnType;
1068  if (column_type.is_dict_encoded_string() ||
1069  column_type.is_subtype_dict_encoded_string()) {
1070  column_type.setStringDictKey({cd.db_id, column_type.get_comp_param()});
1071  }
1072 }
1073 } // namespace
1074 
1076  std::string dictQuery(
1077  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1078  sqliteConnector_.query(dictQuery);
1079  auto numRows = sqliteConnector_.getNumRows();
1080  for (size_t r = 0; r < numRows; ++r) {
1081  auto dictId = sqliteConnector_.getData<int>(r, 0);
1082  auto dictName = sqliteConnector_.getData<string>(r, 1);
1083  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
1084  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
1085  auto refcount = sqliteConnector_.getData<int>(r, 4);
1086  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
1087  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
1088  DictRef dict_ref(currentDB_.dbId, dictId);
1089  auto dd = new DictDescriptor(
1090  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
1091  dictDescriptorMapByRef_[dict_ref].reset(dd);
1092  }
1093 }
1094 
1095 // NOTE(sy): Only used by --multi-instance clusters.
1096 void Catalog::reloadTableMetadata(int table_id) {
1097  cat_write_lock write_lock(this);
1099  reloadTableMetadataUnlocked(table_id);
1100 }
1101 
1102 // NOTE(sy): Only used by --multi-instance clusters.
1104  // Reload dictionaries first.
1105  // TODO(sy): Does dictionary reloading really belong here?
1106  // We don't have dictionary locks in the system but maybe we need them.
1108 
1109  // Delete this table's metadata from the in-memory cache before reloading.
1110  TableDescriptor* original_td = nullptr;
1111  std::list<ColumnDescriptor*> original_cds;
1112  if (auto it1 = tableDescriptorMapById_.find(table_id);
1113  it1 != tableDescriptorMapById_.end()) {
1114  original_td = it1->second;
1115  if (dynamic_cast<foreign_storage::ForeignTable*>(original_td)) {
1116  // If have a foreign table then we need to destroy the local data wrapper or it will
1117  // contain (potentially invalid) cached data. This needs to be done before we
1118  // remove the table descriptors.
1119  dataMgr_->removeTableRelatedDS(currentDB_.dbId, table_id);
1120  } else {
1121  dataMgr_->removeMutableTableDiskCacheData(currentDB_.dbId, table_id);
1122  }
1123  tableDescriptorMapById_.erase(it1);
1124  if (auto it2 = tableDescriptorMap_.find(to_upper(original_td->tableName));
1125  it2 != tableDescriptorMap_.end()) {
1126  CHECK_EQ(original_td, it2->second);
1127  tableDescriptorMap_.erase(it2);
1128  }
1129  if (original_td->hasDeletedCol) {
1130  const auto ret = deletedColumnPerTable_.erase(original_td);
1131  CHECK_EQ(ret, size_t(1));
1132  }
1133  for (int column_id = 0; column_id < original_td->nColumns; ++column_id) {
1134  if (auto it3 = columnDescriptorMapById_.find({table_id, column_id});
1135  it3 != columnDescriptorMapById_.end()) {
1136  ColumnDescriptor* original_cd = it3->second;
1137  original_cds.push_back(original_cd);
1138  removeFromColumnMap(original_cd);
1139  }
1140  }
1141  }
1142 
1143  TableDescriptor* td;
1144  try {
1145  td = createTableFromDiskUnlocked(table_id);
1146  } catch (const NoTableFoundException& e) {
1147  // No entry found for table on disk. Another node may have deleted it.
1148  return;
1149  }
1150 
1151  if (auto tableDescIt = tableDescriptorMapById_.find(table_id);
1152  tableDescIt != tableDescriptorMapById_.end()) {
1153  tableDescIt->second->fragmenter = nullptr;
1154  delete tableDescIt->second;
1155  }
1156 
1157  // Reload the column descriptors.
1158  auto cds = sqliteGetColumnsForTableUnlocked(table_id);
1159 
1160  // Store the descriptors into the cache.
1161  if (original_td) {
1162  td->mutex_ = original_td->mutex_; // TODO(sy): Unnecessary?
1163  delete original_td;
1164  original_td = nullptr;
1165  }
1166  for (ColumnDescriptor* original_cd : original_cds) {
1167  delete original_cd;
1168  }
1169  original_cds.clear();
1170  tableDescriptorMap_[to_upper(td->tableName)] = td;
1171  tableDescriptorMapById_[td->tableId] = td;
1172  int32_t skip_physical_cols = 0;
1173  for (ColumnDescriptor* cd : cds) {
1174  addToColumnMap(cd);
1175 
1176  if (skip_physical_cols <= 0) {
1177  skip_physical_cols = cd->columnType.get_physical_cols();
1178  }
1179 
1180  if (cd->isDeletedCol) {
1181  td->hasDeletedCol = true;
1182  setDeletedColumnUnlocked(td, cd);
1183  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1184  td->columnIdBySpi_.push_back(cd->columnId);
1185  }
1186  }
1187 
1188  // Notify Calcite about the reloaded table.
1189  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
1190 }
1191 
1192 // NOTE(sy): Only used by --multi-instance clusters.
1193 void Catalog::reloadCatalogMetadata(
1194  const std::map<int32_t, std::string>& user_name_by_user_id) {
1195  cat_write_lock write_lock(this);
1196  cat_sqlite_lock sqlite_lock(getObjForLock());
1197  reloadCatalogMetadataUnlocked(get_user_id_to_user_name_map());
1198 }
1199 
1200 // NOTE(sy): Only used by --multi-instance clusters.
1201 void Catalog::reloadCatalogMetadataUnlocked(
1202  const std::map<int32_t, std::string>& user_name_by_user_id) {
1204 
1205  sqliteConnector_.reset(basePath_ + "/" + shared::kCatalogDirectoryName + "/" +
1206  currentDB_.dbName);
1207 
1208  // Notice when tables or columns have been created, dropped, or changed by other nodes.
1209  // Needed so that users will see reasonably-correct lists of what objects exist.
1210 
1211  // Load the list of table ID's that exist on disk storage.
1212  std::set<int> cluster_table_ids;
1213  std::string tableQuery("SELECT tableid from mapd_tables");
1214  sqliteConnector_.query(tableQuery);
1215  auto numRows = sqliteConnector_.getNumRows();
1216  for (size_t r = 0; r < numRows; ++r) {
1217  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1218  cluster_table_ids.insert(table_id);
1219  }
1220 
1221  // Ignore any table ID's locked by other threads on this node.
1222  // Those other threads are already handling any necessary reloading for those tables.
1223  std::set<int> ignored_table_ids;
1224  for (ChunkKey const& k : lockmgr::TableSchemaLockMgr::instance().getLockedTables()) {
1225  CHECK_EQ(k.size(), 2U);
1226  if (k[CHUNK_KEY_DB_IDX] != currentDB_.dbId) {
1227  continue;
1228  }
1229  ignored_table_ids.insert(k[CHUNK_KEY_TABLE_IDX]);
1230  }
1231 
1232  // For this node's Catalog cache:
1233  // Newly created table schemas created by other nodes need to be loaded.
1234  // Unlocked table schemas might have been renamed by other nodes; just reload them all.
1235  // Deleted table schemas still in this node's cache need to be flushed.
1236  std::set<int> reload_table_ids;
1237  for (auto const& cluster_table_id : cluster_table_ids) {
1238  if (ignored_table_ids.find(cluster_table_id) == ignored_table_ids.end()) {
1239  reload_table_ids.insert(cluster_table_id);
1240  }
1241  }
1242  for (auto const& [cached_table_id, td] : tableDescriptorMapById_) {
1243  if (cluster_table_ids.find(cached_table_id) == cluster_table_ids.end()) {
1244  reload_table_ids.insert(cached_table_id);
1245  }
1246  }
1247 
1248  // Reload tables.
1249  for (auto const& reload_table_id : reload_table_ids) {
1250  reloadTableMetadataUnlocked(reload_table_id);
1251  }
1252 
1254 
1255  dashboardDescriptorMap_.clear();
1256  linkDescriptorMap_.clear();
1257  linkDescriptorMapById_.clear();
1258  foreignServerMap_.clear();
1259  foreignServerMapById_.clear();
1260  custom_expr_map_by_id_.clear();
1261 
1262  if (g_enable_fsi) {
1263  buildForeignServerMapUnlocked();
1264  }
1265 
1266  updateViewsInMapUnlocked();
1267  buildDashboardsMapUnlocked(user_name_by_user_id);
1268  buildLinksMapUnlocked();
1269  buildCustomExpressionsMapUnlocked();
1270 
1271  // Notify Calcite about the reloaded database.
1272  if (calciteMgr_) {
1273  calciteMgr_->updateMetadata(currentDB_.dbName, {});
1274  }
1275 }
1276 
1277 void Catalog::buildTablesMapUnlocked() {
1278  std::string tableQuery(
1279  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1280  "max_chunk_size, frag_page_size, "
1281  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
1282  "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
1283  "from mapd_tables");
1284  sqliteConnector_.query(tableQuery);
1285  auto numRows = sqliteConnector_.getNumRows();
1286  for (size_t r = 0; r < numRows; ++r) {
1287  TableDescriptor* td;
1288  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
1289  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1290  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1291  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
1292  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1293  "supported table option (table "
1294  << table_name << " [" << table_id << "] in database "
1295  << currentDB_.dbName << ").";
1296  }
1297 
1298  if (storage_type == StorageType::FOREIGN_TABLE) {
1299  td = new foreign_storage::ForeignTable();
1300  } else {
1301  td = new TableDescriptor();
1302  }
1303 
1304  td->storageType = storage_type;
1305  td->tableId = sqliteConnector_.getData<int>(r, 0);
1306  td->tableName = sqliteConnector_.getData<string>(r, 1);
1307  td->nColumns = sqliteConnector_.getData<int>(r, 2);
1308  td->isView = sqliteConnector_.getData<bool>(r, 3);
1309  td->fragments = sqliteConnector_.getData<string>(r, 4);
1310  td->fragType =
1311  (Fragmenter_Namespace::FragmenterType)sqliteConnector_.getData<int>(r, 5);
1312  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
1313  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1314  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
1315  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1316  td->partitions = sqliteConnector_.getData<string>(r, 10);
1317  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
1318  td->shard = sqliteConnector_.getData<int>(r, 12);
1319  td->nShards = sqliteConnector_.getData<int>(r, 13);
1320  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
1321  td->userId = sqliteConnector_.getData<int>(r, 15);
1322  td->sortedColumnId =
1323  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
1324  if (!td->isView) {
1325  td->fragmenter = nullptr;
1326  }
1327  td->maxRollbackEpochs = sqliteConnector_.getData<int>(r, 18);
1328  td->is_system_table = sqliteConnector_.getData<bool>(r, 19);
1329  td->hasDeletedCol = false;
1330 
1331  tableDescriptorMap_[to_upper(td->tableName)] = td;
1332  tableDescriptorMapById_[td->tableId] = td;
1333  }
1334 }
1335 
1336 void Catalog::buildColumnsMapUnlocked() {
1337  std::string columnQuery(
1338  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1339  "is_notnull, compression, comp_param, "
1340  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1341  "default_value from "
1342  "mapd_columns ORDER BY tableid, "
1343  "columnid");
1344  sqliteConnector_.query(columnQuery);
1345  auto numRows = sqliteConnector_.getNumRows();
1346  int32_t skip_physical_cols = 0;
1347  for (size_t r = 0; r < numRows; ++r) {
1348  ColumnDescriptor* cd = new ColumnDescriptor();
1349  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1350  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1351  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1352  cd->columnType.set_type((SQLTypes)sqliteConnector_.getData<int>(r, 3));
1353  cd->columnType.set_subtype((SQLTypes)sqliteConnector_.getData<int>(r, 4));
1354  cd->columnType.set_dimension(sqliteConnector_.getData<int>(r, 5));
1355  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1356  cd->columnType.set_notnull(sqliteConnector_.getData<bool>(r, 7));
1357  cd->columnType.set_compression((EncodingType)sqliteConnector_.getData<int>(r, 8));
1358  cd->columnType.set_comp_param(sqliteConnector_.getData<int>(r, 9));
1359  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1360  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1361  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1362  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1363  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1364  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1365  if (sqliteConnector_.isNull(r, 16)) {
1366  cd->default_value = std::nullopt;
1367  } else {
1368  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1369  }
1370  cd->isGeoPhyCol = skip_physical_cols > 0;
1371  cd->db_id = getDatabaseId();
1372  set_dict_key(*cd);
1373  addToColumnMap(cd);
1374 
1375  if (skip_physical_cols <= 0) {
1376  skip_physical_cols = cd->columnType.get_physical_cols();
1377  }
1378 
1379  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1380  CHECK(td_itr != tableDescriptorMapById_.end());
1381 
1382  if (cd->isDeletedCol) {
1383  td_itr->second->hasDeletedCol = true;
1384  setDeletedColumnUnlocked(td_itr->second, cd);
1385  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1386  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1387  }
1388  }
1389 
1390  // sort columnIdBySpi_ based on columnId
1391  for (auto& tit : tableDescriptorMapById_) {
1392  std::sort(tit.second->columnIdBySpi_.begin(),
1393  tit.second->columnIdBySpi_.end(),
1394  [](const size_t a, const size_t b) -> bool { return a < b; });
1395  }
1396 }
1397 
1398 void Catalog::updateViewUnlocked(TableDescriptor& td) {
1399  std::string viewQuery("SELECT sql FROM mapd_views where tableid = " +
1400  std::to_string(td.tableId));
1401  sqliteConnector_.query(viewQuery);
1402  auto num_rows = sqliteConnector_.getNumRows();
1403  CHECK_EQ(num_rows, 1U) << "Expected single entry in mapd_views for view '"
1404  << td.tableName << "', instead got " << num_rows;
1405  td.viewSQL = sqliteConnector_.getData<string>(0, 0);
1406 }
1407 
1408 void Catalog::updateViewsInMapUnlocked() {
1409  std::string viewQuery("SELECT tableid, sql FROM mapd_views");
1410  sqliteConnector_.query(viewQuery);
1411  auto numRows = sqliteConnector_.getNumRows();
1412  for (size_t r = 0; r < numRows; ++r) {
1413  auto tableId = sqliteConnector_.getData<int>(r, 0);
1414  auto td = tableDescriptorMapById_[tableId];
1415  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1416  td->fragmenter = nullptr;
1417  }
1418 }
1419 
1420 void Catalog::buildDashboardsMapUnlocked(
1421  const std::map<int32_t, std::string>& user_name_by_user_id) {
1422  std::string frontendViewQuery(
1423  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1424  "userid, "
1425  "metadata "
1426  "FROM mapd_dashboards");
1427  sqliteConnector_.query(frontendViewQuery);
1428  auto numRows = sqliteConnector_.getNumRows();
1429  for (size_t r = 0; r < numRows; ++r) {
1430  auto vd = std::make_shared<DashboardDescriptor>();
1431  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1432  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1433  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1434  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1435  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1436  vd->userId = sqliteConnector_.getData<int>(r, 5);
1437  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1438  vd->user = get_user_name_from_id(vd->userId, user_name_by_user_id);
1439  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1440  std::to_string(currentDB_.dbId), sqliteConnector_.getData<string>(r, 0));
1441  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1442  }
1443 }
1444 
1445 void Catalog::buildLinksMapUnlocked() {
1446  std::string linkQuery(
1447  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1448  "update_time), view_metadata "
1449  "FROM mapd_links");
1450  sqliteConnector_.query(linkQuery);
1451  auto numRows = sqliteConnector_.getNumRows();
1452  for (size_t r = 0; r < numRows; ++r) {
1453  auto ld = new LinkDescriptor();
1454  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1455  ld->userId = sqliteConnector_.getData<int>(r, 1);
1456  ld->link = sqliteConnector_.getData<string>(r, 2);
1457  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1458  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1459  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1460  linkDescriptorMap_[std::to_string(currentDB_.dbId) + ld->link] = ld;
1461  linkDescriptorMapById_[ld->linkId] = ld;
1462  }
1463 }
1464 
1465 void Catalog::buildLogicalToPhysicalMapUnlocked() {
1466  /* rebuild map linking logical tables to corresponding physical ones */
1467  std::string logicalToPhysicalTableMapQuery(
1468  "SELECT logical_table_id, physical_table_id "
1469  "FROM mapd_logical_to_physical");
1470  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1471  auto numRows = sqliteConnector_.getNumRows();
1472  for (size_t r = 0; r < numRows; ++r) {
1473  auto logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1474  auto physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1475  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1476  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1477  /* add new entity to the map logicalToPhysicalTableMapById_ */
1478  std::vector<int32_t> physicalTables{physical_tb_id};
1479  const auto it_ok =
1480  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1481  CHECK(it_ok.second);
1482  } else {
1483  /* update map logicalToPhysicalTableMapById_ */
1484  physicalTableIt->second.push_back(physical_tb_id);
1485  }
1486  }
1487 }
1488 
1489 // The catalog uses a series of maps to cache data that have been read from the sqlite
1490 // tables. Usually we update these maps whenever we write using sqlite, so this function
1491 // is responsible for initializing all of them based on the sqlite db state.
1492 void Catalog::buildMaps() {
1493  // Get all user id to username mapping here in order to avoid making a call to
1494  // SysCatalog (and attempting to acquire SysCatalog locks) while holding locks for this
1495  // catalog.
1496  const auto user_name_by_user_id = get_user_id_to_user_name_map();
1497 
1498  cat_write_lock write_lock(this);
1499  cat_sqlite_lock sqlite_lock(getObjForLock());
1500 
1501  buildDictionaryMapUnlocked();
1502  buildTablesMapUnlocked();
1503 
1504  if (g_enable_fsi) {
1505  buildForeignServerMapUnlocked();
1506  updateForeignTablesInMapUnlocked();
1507  }
1508 
1509  buildColumnsMapUnlocked();
1510  updateViewsInMapUnlocked();
1511  buildDashboardsMapUnlocked(user_name_by_user_id);
1512  buildLinksMapUnlocked();
1513  buildLogicalToPhysicalMapUnlocked();
1514  buildCustomExpressionsMapUnlocked();
1515 }
1516 
1517 void Catalog::buildCustomExpressionsMapUnlocked() {
1518  sqliteConnector_.query(
1519  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1520  "is_deleted "
1521  "FROM omnisci_custom_expressions");
1522  auto num_rows = sqliteConnector_.getNumRows();
1523  for (size_t row = 0; row < num_rows; row++) {
1524  auto custom_expr = getCustomExpressionFromConnector(row);
1525  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1526  }
1527 }
1528 
1529 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1530  auto id = sqliteConnector_.getData<int>(row, 0);
1531  auto name = sqliteConnector_.getData<string>(row, 1);
1532  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1533  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1534  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1535  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1536  return std::make_unique<CustomExpression>(
1537  id,
1538  name,
1539  expression_json,
1540  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1541  data_source_id,
1542  is_deleted);
1543 }
1544 
1545 void Catalog::addTableToMap(const TableDescriptor* td,
1546  const list<ColumnDescriptor>& columns,
1547  const list<DictDescriptor>& dicts) {
1548  cat_write_lock write_lock(this);
1549  TableDescriptor* new_td;
1550 
1551  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1552  if (foreign_table) {
1553  auto new_foreign_table = new foreign_storage::ForeignTable();
1554  *new_foreign_table = *foreign_table;
1555  new_td = new_foreign_table;
1556  } else {
1557  new_td = new TableDescriptor();
1558  *new_td = *td;
1559  }
1560 
1561  new_td->mutex_ = std::make_shared<std::mutex>();
1562  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1563  tableDescriptorMapById_[td->tableId] = new_td;
1564  for (auto cd : columns) {
1565  ColumnDescriptor* new_cd = new ColumnDescriptor();
1566  *new_cd = cd;
1567  addToColumnMap(new_cd);
1568 
1569  // Add deleted column to the map
1570  if (cd.isDeletedCol) {
1571  CHECK(new_td->hasDeletedCol);
1572  setDeletedColumnUnlocked(new_td, new_cd);
1573  }
1574  }
1575 
1576  std::sort(new_td->columnIdBySpi_.begin(),
1577  new_td->columnIdBySpi_.end(),
1578  [](const size_t a, const size_t b) -> bool { return a < b; });
1579  // TODO(sy): Why does addTableToMap() sort columnIdBySpi_ but not insert into it while
1580  // buildColumnsMapUnlocked() does both?
1581 
1582  std::unique_ptr<StringDictionaryClient> client;
1583  DictRef dict_ref(currentDB_.dbId, -1);
1584  if (!string_dict_hosts_.empty()) {
1585  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1586  }
1587  for (auto dd : dicts) {
1588  if (!dd.dictRef.dictId) {
1589  // Dummy entry created for a shard of a logical table, nothing to do.
1590  continue;
1591  }
1592  dict_ref.dictId = dd.dictRef.dictId;
1593  if (client) {
1594  client->create(dict_ref, dd.dictIsTemp);
1595  }
1596  DictDescriptor* new_dd = new DictDescriptor(dd);
1597  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1598  if (!dd.dictIsTemp) {
1599  boost::filesystem::create_directory(new_dd->dictFolderPath);
1600  }
1601  }
1602 }
1603 
1604 void Catalog::removeTableFromMap(const string& tableName,
1605  const int tableId,
1606  const bool is_on_error) {
1607  cat_write_lock write_lock(this);
1608  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1609  if (tableDescIt == tableDescriptorMapById_.end()) {
1610  throw TableNotFoundException(tableName, currentDB_.dbName, " Cannot remove.");
1611  }
1612 
1613  TableDescriptor* td = tableDescIt->second;
1614 
1615  if (td->hasDeletedCol) {
1616  const auto ret = deletedColumnPerTable_.erase(td);
1617  CHECK_EQ(ret, size_t(1));
1618  }
1619 
1620  tableDescriptorMapById_.erase(tableDescIt);
1621  tableDescriptorMap_.erase(to_upper(tableName));
1622  td->fragmenter = nullptr;
1623  dict_columns_by_table_id_.erase(tableId);
1624 
1626  delete td;
1627 
1628  std::unique_ptr<StringDictionaryClient> client;
1629  if (SysCatalog::instance().isAggregator()) {
1630  CHECK(!string_dict_hosts_.empty());
1631  DictRef dict_ref(currentDB_.dbId, -1);
1632  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1633  }
1634 
1635  // delete all column descriptors for the table
1636  // no more link columnIds to sequential indexes!
1637  for (auto cit = columnDescriptorMapById_.begin();
1638  cit != columnDescriptorMapById_.end();) {
1639  if (tableId != std::get<0>(cit->first)) {
1640  ++cit;
1641  } else {
1642  int i = std::get<1>(cit++->first);
1643  ColumnIdKey cidKey(tableId, i);
1644  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1645  ColumnDescriptor* cd = colDescIt->second;
1646  columnDescriptorMapById_.erase(colDescIt);
1647  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1648  columnDescriptorMap_.erase(cnameKey);
1649  const int dictId = cd->columnType.get_comp_param();
1650  // Dummy dictionaries created for a shard of a logical table have the id set to
1651  // zero.
1652  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1653  INJECT_TIMER(removingDicts);
1654  DictRef dict_ref(currentDB_.dbId, dictId);
1655  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1656  // If we're removing this table due to an error, it is possible that the string
1657  // dictionary reference was never populated. Don't crash, just continue cleaning
1658  // up the TableDescriptor and ColumnDescriptors
1659  if (!is_on_error) {
1660  CHECK(dictIt != dictDescriptorMapByRef_.end());
1661  } else {
1662  if (dictIt == dictDescriptorMapByRef_.end()) {
1663  continue;
1664  }
1665  }
1666  const auto& dd = dictIt->second;
1667  CHECK_GE(dd->refcount, 1);
1668  --dd->refcount;
1669  if (!dd->refcount) {
1670  dd->stringDict.reset();
1671  if (!isTemp) {
1672  File_Namespace::renameForDelete(dd->dictFolderPath);
1673  }
1674  if (client) {
1675  client->drop(dict_ref);
1676  }
1677  dictDescriptorMapByRef_.erase(dictIt);
1678  }
1679  }
1680 
1681  delete cd;
1682  }
1683  }
1684 }
1685 
1686 void Catalog::addFrontendViewToMap(DashboardDescriptor& vd) {
1687  cat_write_lock write_lock(this);
1688  addFrontendViewToMapNoLock(vd);
1689 }
1690 
1691 void Catalog::addFrontendViewToMapNoLock(DashboardDescriptor& vd) {
1692  cat_write_lock write_lock(this);
1693  dashboardDescriptorMap_[std::to_string(vd.userId) + ":" + vd.dashboardName] =
1694  std::make_shared<DashboardDescriptor>(vd);
1695 }
1696 
1697 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1698  const int& user_id) {
1699  std::vector<DBObject> objects;
1700  DBObjectKey key;
1701  key.dbId = currentDB_.dbId;
1702  auto _key_place = [&key](auto type, auto id) {
1703  key.permissionType = type;
1704  key.objectId = id;
1705  return key;
1706  };
1707  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1708  auto td = getMetadataForTable(object_name, false);
1709  if (!td) {
1710  // Parsed object source is not present in current database
1711  // LOG the info and ignore
1712  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1713  << " no longer exists in current DB.";
1714  continue;
1715  }
1716  // Dashboard source can be Table or View
1717  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1718  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1720  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1721  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1722  objects.back().setName(td->tableName);
1723  }
1724  return objects;
1725 }
1726 
1727 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1728  const int32_t& user_id,
1729  const std::string& dash_role_name) {
1730  auto objects = parseDashboardObjects(view_meta, user_id);
1731  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1732  if (!rl) {
1733  // Dashboard role does not exist
1734  // create role and grant privileges
1735  // NOTE(wamsi): Transactionally unsafe
1736  SysCatalog::instance().createRole(
1737  dash_role_name, /*user_private_role=*/false, /*is_temporary=*/false);
1738  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1739  } else {
1740  // Dashboard system role already exists
1741  // Add/remove privileges on objects
1742  std::set<DBObjectKey> revoke_keys;
1743  auto ex_objects = rl->getDbObjects(true);
1744  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1745  if (key.permissionType != TableDBObjectType &&
1746  key.permissionType != ViewDBObjectType) {
1747  continue;
1748  }
1749  bool found = false;
1750  for (auto obj : objects) {
1751  found = key == obj.getObjectKey() ? true : false;
1752  if (found) {
1753  break;
1754  }
1755  }
1756  if (!found) {
1757  revoke_keys.insert(key);
1758  }
1759  }
1760  for (auto& key : revoke_keys) {
1761  // revoke privs on object since the object is no
1762  // longer used by the dashboard as source
1763  // NOTE(wamsi): Transactionally unsafe
1764  SysCatalog::instance().revokeDBObjectPrivileges(
1765  dash_role_name, *rl->findDbObject(key, true), *this);
1766  }
1767  // Update privileges on remaining objects
1768  // NOTE(wamsi): Transactionally unsafe
1769  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1770  }
1771 }
1772 
1773 void Catalog::addLinkToMap(LinkDescriptor& ld) {
1774  cat_write_lock write_lock(this);
1775  LinkDescriptor* new_ld = new LinkDescriptor();
1776  *new_ld = ld;
1777  linkDescriptorMap_[std::to_string(currentDB_.dbId) + ld.link] = new_ld;
1778  linkDescriptorMapById_[ld.linkId] = new_ld;
1779 }
1780 
1781 void Catalog::instantiateFragmenter(TableDescriptor* td) const {
1782  auto time_ms = measure<>::execution([&]() {
1783  // instanciate table fragmenter upon first use
1784  // assume only insert order fragmenter is supported
1786  vector<Chunk> chunkVec;
1787  auto columnDescs = getAllColumnMetadataForTable(td->tableId, true, false, true);
1788  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1789  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1790  if (td->sortedColumnId > 0) {
1791  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1792  chunkVec,
1793  dataMgr_.get(),
1794  const_cast<Catalog*>(this),
1795  td->tableId,
1796  td->shard,
1797  td->maxFragRows,
1798  td->maxChunkSize,
1799  td->fragPageSize,
1800  td->maxRows,
1801  td->persistenceLevel);
1802  } else {
1803  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1804  chunkVec,
1805  dataMgr_.get(),
1806  const_cast<Catalog*>(this),
1807  td->tableId,
1808  td->shard,
1809  td->maxFragRows,
1810  td->maxChunkSize,
1811  td->fragPageSize,
1812  td->maxRows,
1813  td->persistenceLevel,
1814  !td->storageType.empty());
1815  }
1816  });
1817  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1818  << time_ms << "ms";
1819 }
1820 
1821 foreign_storage::ForeignTable* Catalog::getForeignTableUnlocked(
1822  const std::string& tableName) const {
1823  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1824  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1825  return nullptr;
1826  }
1827  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1828 }
1829 
1830 const foreign_storage::ForeignTable* Catalog::getForeignTable(
1831  const std::string& tableName) const {
1832  cat_read_lock read_lock(this);
1833  return getForeignTableUnlocked(tableName);
1834 }
1835 
1836 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1837  const bool populateFragmenter) const {
1838  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1839  // pure metadata calls
1840  cat_read_lock read_lock(this);
1841  auto td = getMutableMetadataForTableUnlocked(tableName);
1842  if (!td) {
1843  return nullptr;
1844  }
1845  read_lock.unlock();
1846  if (populateFragmenter) {
1847  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1848  if (td->fragmenter == nullptr && !td->isView) {
1849  instantiateFragmenter(td);
1850  }
1851  }
1852  return td; // returns pointer to table descriptor
1853 }
1854 
1855 const TableDescriptor* Catalog::getMetadataForTable(int table_id,
1856  bool populateFragmenter) const {
1857  cat_read_lock read_lock(this);
1858  auto td = getMutableMetadataForTableUnlocked(table_id);
1859  if (!td) {
1860  return nullptr;
1861  }
1862  read_lock.unlock();
1863  if (populateFragmenter) {
1864  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1865  if (td->fragmenter == nullptr && !td->isView) {
1866  instantiateFragmenter(td);
1867  }
1868  }
1869  return td;
1870 }
1871 
1872 std::optional<std::string> Catalog::getTableName(int32_t table_id) const {
1873  cat_read_lock read_lock(this);
1874  auto td = getMutableMetadataForTableUnlocked(table_id);
1875  if (!td) {
1876  return {};
1877  }
1878  return td->tableName;
1879 }
1880 
1881 std::optional<int32_t> Catalog::getTableId(const std::string& table_name) const {
1882  cat_read_lock read_lock(this);
1883  auto td = getMutableMetadataForTableUnlocked(table_name);
1884  if (!td) {
1885  return {};
1886  }
1887  return td->tableId;
1888 }
1889 
1890 TableDescriptor* Catalog::getMutableMetadataForTableUnlocked(
1891  const std::string& table_name) const {
1892  auto it = tableDescriptorMap_.find(to_upper(table_name));
1893  if (it == tableDescriptorMap_.end()) {
1894  return nullptr;
1895  }
1896  return it->second;
1897 }
1898 
1899 TableDescriptor* Catalog::getMutableMetadataForTableUnlocked(int table_id) const {
1900  auto tableDescIt = tableDescriptorMapById_.find(table_id);
1901  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1902  return nullptr;
1903  }
1904  return tableDescIt->second;
1905 }
1906 
1907 const DictDescriptor* Catalog::getMetadataForDict(const int dict_id,
1908  const bool load_dict) const {
1909  cat_read_lock read_lock(this);
1910  const DictRef dictRef(currentDB_.dbId, dict_id);
1911  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1912  if (dictDescIt ==
1913  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
1914  return nullptr;
1915  }
1916  auto& dd = dictDescIt->second;
1917 
1918  if (load_dict) {
1919  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
1920  if (!dd->stringDict) {
1921  auto time_ms = measure<>::execution([&]() {
1922  if (string_dict_hosts_.empty()) {
1923  if (dd->dictIsTemp) {
1924  dd->stringDict = std::make_shared<StringDictionary>(
1925  dd->dictRef, dd->dictFolderPath, true, true, g_cache_string_hash);
1926  } else {
1927  dd->stringDict = std::make_shared<StringDictionary>(
1928  dd->dictRef, dd->dictFolderPath, false, true, g_cache_string_hash);
1929  }
1930  } else {
1931  dd->stringDict =
1932  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1933  }
1934  });
1935  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
1936  << dd->dictRef.dictId << " was " << time_ms << "ms";
1937  }
1938  }
1939 
1940  return dd.get();
1941 }
1942 
1943 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
1944  return string_dict_hosts_;
1945 }
1946 
1947 const ColumnDescriptor* Catalog::getMetadataForColumn(int tableId,
1948  const string& columnName) const {
1949  cat_read_lock read_lock(this);
1950 
1951  ColumnKey columnKey(tableId, to_upper(columnName));
1952  auto colDescIt = columnDescriptorMap_.find(columnKey);
1953  if (colDescIt ==
1954  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
1955  return nullptr;
1956  }
1957  return colDescIt->second;
1958 }
1959 
1960 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
1961  cat_read_lock read_lock(this);
1962  ColumnIdKey columnIdKey(table_id, column_id);
1963  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1964  if (colDescIt == columnDescriptorMapById_
1965  .end()) { // need to check to make sure column exists for table
1966  return nullptr;
1967  }
1968  return colDescIt->second;
1969 }
1970 
1971 const std::optional<std::string> Catalog::getColumnName(int table_id,
1972  int column_id) const {
1973  cat_read_lock read_lock(this);
1974  auto it = columnDescriptorMapById_.find(ColumnIdKey{table_id, column_id});
1975  if (it == columnDescriptorMapById_.end()) {
1976  return {};
1977  }
1978  return it->second->columnName;
1979 }
1980 
1981 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
1982  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1983  CHECK(tableDescriptorMapById_.end() != tabDescIt);
1984  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1985 
1986  auto spx = spi;
1987  int phi = 0;
1988  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
1989  {
1990  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
1991  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
1992  }
1993 
1994  CHECK(0 < spx && spx <= columnIdBySpi.size())
1995  << "spx = " << spx << ", size = " << columnIdBySpi.size();
1996  return columnIdBySpi[spx - 1] + phi;
1997 }
1998 
1999 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
2000  cat_read_lock read_lock(this);
2001  return getColumnIdBySpiUnlocked(table_id, spi);
2002 }
2003 
2004 const ColumnDescriptor* Catalog::getMetadataForColumnBySpi(const int tableId,
2005  const size_t spi) const {
2006  cat_read_lock read_lock(this);
2007 
2008  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
2009  ColumnIdKey columnIdKey(tableId, columnId);
2010  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2011  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
2012 }
2013 
2014 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
2015  const UserMetadata& user) {
2016  std::stringstream invalid_ids, restricted_ids;
2017 
2018  for (int32_t dashboard_id : dashboard_ids) {
2019  if (!getMetadataForDashboard(dashboard_id)) {
2020  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
2021  continue;
2022  }
2023  DBObject object(dashboard_id, DashboardDBObjectType);
2024  object.loadKey(*this);
2025  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
2026  std::vector<DBObject> privs = {object};
2027  if (!SysCatalog::instance().checkPrivileges(user, privs)) {
2028  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
2029  }
2030  }
2031 
2032  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
2033  std::stringstream error_message;
2034  error_message << "Delete dashboard(s) failed with error(s):";
2035  if (invalid_ids.str().size() > 0) {
2036  error_message << "\nDashboard id: " << invalid_ids.str()
2037  << " - Dashboard id does not exist";
2038  }
2039  if (restricted_ids.str().size() > 0) {
2040  error_message
2041  << "\nDashboard id: " << restricted_ids.str()
2042  << " - User should be either owner of dashboard or super user to delete it";
2043  }
2044  throw std::runtime_error(error_message.str());
2045  }
2046  std::vector<DBObject> dash_objs;
2047 
2048  for (int32_t dashboard_id : dashboard_ids) {
2049  dash_objs.emplace_back(dashboard_id, DashboardDBObjectType);
2050  }
2051  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
2052  SysCatalog::instance().revokeDBObjectPrivilegesFromAllBatch(dash_objs, this);
2053  {
2054  cat_write_lock write_lock(this);
2055  cat_sqlite_lock sqlite_lock(getObjForLock());
2056 
2057  sqliteConnector_.query("BEGIN TRANSACTION");
2058  try {
2059  for (int32_t dashboard_id : dashboard_ids) {
2060  auto dash = getMetadataForDashboard(dashboard_id);
2061  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
2062  // rollback if already deleted
2063  if (!dash) {
2064  throw std::runtime_error(
2065  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
2066  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
2067  }
2068  std::string user_id = std::to_string(dash->userId);
2069  std::string dash_name = dash->dashboardName;
2070  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
2071  dashboardDescriptorMap_.erase(viewDescIt);
2072  sqliteConnector_.query_with_text_params(
2073  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
2074  std::vector<std::string>{dash_name, user_id});
2075  }
2076  } catch (std::exception& e) {
2077  sqliteConnector_.query("ROLLBACK TRANSACTION");
2078  throw;
2079  }
2080  sqliteConnector_.query("END TRANSACTION");
2081  }
2082 }
2083 
2084 const DashboardDescriptor* Catalog::getMetadataForDashboard(
2085  const string& userId,
2086  const string& dashName) const {
2087  cat_read_lock read_lock(this);
2088 
2089  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
2090  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
2091  return nullptr;
2092  }
2093  return viewDescIt->second.get(); // returns pointer to view descriptor
2094 }
2095 
2096 const DashboardDescriptor* Catalog::getMetadataForDashboard(const int32_t id) const {
2097  cat_read_lock read_lock(this);
2098  std::string userId;
2099  std::string name;
2100  bool found{false};
2101  {
2102  for (auto descp : dashboardDescriptorMap_) {
2103  auto dash = descp.second.get();
2104  if (dash->dashboardId == id) {
2105  userId = std::to_string(dash->userId);
2106  name = dash->dashboardName;
2107  found = true;
2108  break;
2109  }
2110  }
2111  }
2112  if (found) {
2113  return getMetadataForDashboard(userId, name);
2114  }
2115  return nullptr;
2116 }
2117 
2118 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
2119  cat_read_lock read_lock(this);
2120  auto linkDescIt = linkDescriptorMap_.find(link);
2121  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
2122  return nullptr;
2123  }
2124  return linkDescIt->second; // returns pointer to view descriptor
2125 }
2126 
2127 const LinkDescriptor* Catalog::getMetadataForLink(int linkId) const {
2128  cat_read_lock read_lock(this);
2129  auto linkDescIt = linkDescriptorMapById_.find(linkId);
2130  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
2131  return nullptr;
2132  }
2133  return linkDescIt->second;
2134 }
2135 
2136 const foreign_storage::ForeignTable* Catalog::getForeignTable(int table_id) const {
2137  cat_read_lock read_lock(this);
2138  const auto table = getMutableMetadataForTableUnlocked(table_id);
2139  CHECK(table);
2140  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
2141  CHECK(foreign_table);
2142  return foreign_table;
2143 }
2144 
2145 void Catalog::getAllColumnMetadataForTableImpl(
2146  const TableDescriptor* td,
2147  list<const ColumnDescriptor*>& columnDescriptors,
2148  const bool fetchSystemColumns,
2149  const bool fetchVirtualColumns,
2150  const bool fetchPhysicalColumns) const {
2151  int32_t skip_physical_cols = 0;
2152  for (const auto& columnDescriptor : columnDescriptorMapById_) {
2153  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
2154  --skip_physical_cols;
2155  continue;
2156  }
2157  auto cd = columnDescriptor.second;
2158  if (cd->tableId != td->tableId) {
2159  continue;
2160  }
2161  if (!fetchSystemColumns && cd->isSystemCol) {
2162  continue;
2163  }
2164  if (!fetchVirtualColumns && cd->isVirtualCol) {
2165  continue;
2166  }
2167  if (!fetchPhysicalColumns) {
2168  const auto& col_ti = cd->columnType;
2169  skip_physical_cols = col_ti.get_physical_cols();
2170  }
2171  columnDescriptors.push_back(cd);
2172  }
2173 }
2174 
2175 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
2176  const int tableId,
2177  const bool fetchSystemColumns,
2178  const bool fetchVirtualColumns,
2179  const bool fetchPhysicalColumns) const {
2180  cat_read_lock read_lock(this);
2181  std::list<const ColumnDescriptor*> columnDescriptors;
2182  const TableDescriptor* td = getMutableMetadataForTableUnlocked(tableId);
2183  getAllColumnMetadataForTableImpl(td,
2184  columnDescriptors,
2185  fetchSystemColumns,
2186  fetchVirtualColumns,
2187  fetchPhysicalColumns);
2188  return columnDescriptors;
2189 }
2190 
2191 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
2192  cat_read_lock read_lock(this);
2193  list<const TableDescriptor*> table_list;
2194  for (auto p : tableDescriptorMapById_) {
2195  table_list.push_back(p.second);
2196  }
2197  return table_list;
2198 }
2199 
2200 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy() const {
2201  cat_read_lock read_lock(this);
2202  std::vector<TableDescriptor> tables;
2203  tables.reserve(tableDescriptorMapById_.size());
2204  for (auto table_entry : tableDescriptorMapById_) {
2205  tables.emplace_back(*table_entry.second);
2206  tables.back().fragmenter = nullptr;
2207  }
2208  return tables;
2209 }
2210 
2211 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
2212  cat_read_lock read_lock(this);
2213  list<const DashboardDescriptor*> dashboards;
2214  for (auto dashboard_entry : dashboardDescriptorMap_) {
2215  dashboards.push_back(dashboard_entry.second.get());
2216  }
2217  return dashboards;
2218 }
2219 
2220 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataForSysTable() const {
2221  cat_read_lock read_lock(this);
2222  std::vector<DashboardDescriptor> dashboards;
2223  dashboards.reserve(dashboardDescriptorMap_.size());
2224  for (auto dashboard_entry : dashboardDescriptorMap_) {
2225  const auto& cat_dashboard = dashboard_entry.second;
2226  dashboards.emplace_back();
2227  auto& dashboard = dashboards.back();
2228  dashboard.dashboardId = cat_dashboard->dashboardId;
2229  dashboard.dashboardName = cat_dashboard->dashboardName;
2230  dashboard.userId = cat_dashboard->userId;
2231  dashboard.updateTime = cat_dashboard->updateTime;
2232  dashboard.dashboardMetadata = cat_dashboard->dashboardMetadata;
2233  }
2234  return dashboards;
2235 }
2236 
2237 DictRef Catalog::addDictionaryTransactional(ColumnDescriptor& cd) {
2238  cat_write_lock write_lock(this);
2240  DictRef ref{};
2241  sqliteConnector_.query("BEGIN TRANSACTION");
2242  try {
2243  ref = addDictionaryNontransactional(cd);
2244  } catch (std::exception& e) {
2245  sqliteConnector_.query("ROLLBACK TRANSACTION");
2246  throw;
2247  }
2248  sqliteConnector_.query("END TRANSACTION");
2249  return ref;
2250 }
2251 
2252 DictRef Catalog::addDictionaryNontransactional(ColumnDescriptor& cd) {
2253  cat_write_lock write_lock(this);
2254  const auto& td = *tableDescriptorMapById_[cd.tableId];
2255  list<DictDescriptor> dds;
2256  setColumnDictionary(cd, dds, td, true);
2257  auto& dd = dds.back();
2258  CHECK(dd.dictRef.dictId);
2259 
2260  std::unique_ptr<StringDictionaryClient> client;
2261  if (!string_dict_hosts_.empty()) {
2262  client.reset(new StringDictionaryClient(
2263  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
2264  }
2265  if (client) {
2266  client->create(dd.dictRef, dd.dictIsTemp);
2267  }
2268 
2269  DictDescriptor* new_dd = new DictDescriptor(dd);
2270  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
2271  if (!dd.dictIsTemp) {
2272  boost::filesystem::create_directory(new_dd->dictFolderPath);
2273  }
2274  return dd.dictRef;
2275 }
2276 
2277 void Catalog::delDictionaryTransactional(const ColumnDescriptor& cd) {
2278  cat_write_lock write_lock(this);
2280  sqliteConnector_.query("BEGIN TRANSACTION");
2281  try {
2282  delDictionaryNontransactional(cd);
2283  } catch (std::exception& e) {
2284  sqliteConnector_.query("ROLLBACK TRANSACTION");
2285  throw;
2286  }
2287  sqliteConnector_.query("END TRANSACTION");
2288 }
2289 
2290 void Catalog::delDictionaryNontransactional(const ColumnDescriptor& cd) {
2291  cat_write_lock write_lock(this);
2292  cat_sqlite_lock sqlite_lock(getObjForLock());
2293  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
2294  return;
2295  }
2296  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
2297  return;
2298  }
2299  const auto dictId = cd.columnType.get_comp_param();
2300  CHECK_GT(dictId, 0);
2301  // decrement and zero check dict ref count
2302  const auto td = getMetadataForTable(cd.tableId, false);
2303  CHECK(td);
2304  sqliteConnector_.query_with_text_param(
2305  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
2306  std::to_string(dictId));
2307  sqliteConnector_.query_with_text_param(
2308  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
2309  const auto refcount = sqliteConnector_.getData<int>(0, 0);
2310  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
2311  << refcount;
2312  if (refcount > 0) {
2313  return;
2314  }
2315  const DictRef dictRef(currentDB_.dbId, dictId);
2316  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
2317  std::to_string(dictId));
2319  "/DB_" + std::to_string(currentDB_.dbId) + "_DICT_" +
2320  std::to_string(dictId));
2321 
2322  std::unique_ptr<StringDictionaryClient> client;
2323  if (!string_dict_hosts_.empty()) {
2324  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
2325  }
2326  if (client) {
2327  client->drop(dictRef);
2328  }
2329 
2330  dictDescriptorMapByRef_.erase(dictRef);
2331 }
2332 
2333 std::list<const DictDescriptor*> Catalog::getAllDictionariesWithColumnInName(
2334  const ColumnDescriptor* cd) {
2335  cat_read_lock read_lock(this);
2336  std::list<const DictDescriptor*> dds;
2337 
2338  auto table_name_opt = getTableName(cd->tableId);
2339  CHECK(table_name_opt.has_value());
2340  auto table_name = table_name_opt.value();
2341 
2342  for (const auto& [dkey, dd] : dictDescriptorMapByRef_) {
2343  if (dd->dictName.find(table_name + "_" + cd->columnName + "_dict") !=
2344  std::string::npos) {
2345  dds.push_back(dd.get());
2346  }
2347  }
2348 
2349  return dds;
2350 }
2351 
2352 void Catalog::getDictionary(const ColumnDescriptor& cd,
2353  std::map<int, StringDictionary*>& stringDicts) {
2354  // learn 'committed' ColumnDescriptor of this column
2355  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2356  CHECK(cit != columnDescriptorMap_.end());
2357  auto& ccd = *cit->second;
2358 
2359  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
2360  return;
2361  }
2362  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
2363  return;
2364  }
2365  if (!(ccd.columnType.get_comp_param() > 0)) {
2366  return;
2367  }
2368 
2369  auto dictId = ccd.columnType.get_comp_param();
2370  getMetadataForDict(dictId);
2371 
2372  const DictRef dictRef(currentDB_.dbId, dictId);
2373  auto dit = dictDescriptorMapByRef_.find(dictRef);
2374  CHECK(dit != dictDescriptorMapByRef_.end());
2375  CHECK(dit->second);
2376  CHECK(dit->second.get()->stringDict);
2377  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
2378 }
2379 
2380 size_t Catalog::getTotalMemorySizeForDictionariesForDatabase() const {
2381  size_t ret{0};
2382  for (auto const& kv : dictDescriptorMapByRef_) {
2383  if (kv.first.dbId == currentDB_.dbId) {
2384  auto dictionary = kv.second.get()->stringDict.get();
2385  if (dictionary) {
2386  ret += dictionary->computeCacheSize();
2387  }
2388  }
2389  }
2390  return ret;
2391 }
2392 
2393 void Catalog::alterColumnTypeTransactional(const ColumnDescriptor& cd) {
2394  cat_write_lock write_lock(this);
2396 
2397  sqliteConnector_.query("BEGIN TRANSACTION");
2398  try {
2399  const auto table_id = cd.tableId;
2400 
2401  auto catalog_cd = getMetadataForColumn(table_id, cd.columnId);
2402 
2403  CHECK(catalog_cd) << " can not alter non existing column";
2404 
2405  using BindType = SqliteConnector::BindType;
2406  std::vector<BindType> types(11, BindType::TEXT);
2407  if (!cd.default_value.has_value()) {
2408  types[8] = BindType::NULL_TYPE;
2409  }
2410  sqliteConnector_.query_with_text_params(
2411  "UPDATE mapd_columns SET "
2412  "coltype = ?,"
2413  "colsubtype = ?,"
2414  "coldim = ?,"
2415  "colscale = ?,"
2416  "is_notnull = ?,"
2417  "compression = ?,"
2418  "comp_param = ?,"
2419  "size = ?,"
2420  "default_value = ? "
2421  "WHERE tableid = ? and columnid = ?",
2422  std::vector<std::string>{std::to_string(cd.columnType.get_type()),
2430  cd.default_value.value_or("NULL"),
2431  std::to_string(table_id),
2432  std::to_string(cd.columnId)},
2433 
2434  types);
2435 
2436  auto ncd = new ColumnDescriptor(cd);
2437 
2438  ColumnDescriptorMap::iterator columnDescIt =
2439  columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2440  CHECK(columnDescIt != columnDescriptorMap_.end());
2441  auto ocd = columnDescIt->second;
2442 
2443  updateInColumnMap(ncd, ocd);
2444  } catch (std::exception& e) {
2445  sqliteConnector_.query("ROLLBACK TRANSACTION");
2446  throw;
2447  }
2448  sqliteConnector_.query("END TRANSACTION");
2449 }
2450 
2451 int Catalog::getNextAddedColumnId(const TableDescriptor& td) {
2452  cat_read_lock read_lock(this);
2453  sqliteConnector_.query_with_text_params(
2454  "SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?",
2455  std::vector<std::string>{std::to_string(td.tableId)});
2456  return sqliteConnector_.getData<int>(0, 0);
2457 }
2458 
2459 void Catalog::addColumnTransactional(const TableDescriptor& td, ColumnDescriptor& cd) {
2460  cat_write_lock write_lock(this);
2462  sqliteConnector_.query("BEGIN TRANSACTION");
2463  try {
2464  addColumnNontransactional(td, cd);
2465  } catch (std::exception& e) {
2466  sqliteConnector_.query("ROLLBACK TRANSACTION");
2467  throw;
2468  }
2469  sqliteConnector_.query("END TRANSACTION");
2470 }
2471 
2472 void Catalog::addColumnNontransactional(const TableDescriptor& td, ColumnDescriptor& cd) {
2473  cat_write_lock write_lock(this);
2475  cd.tableId = td.tableId;
2476  cd.db_id = getDatabaseId();
2477  if (td.nShards > 0 && td.shard < 0) {
2478  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2479  auto shard_cd = cd;
2480  addColumnNontransactional(*shard, shard_cd);
2481  }
2482  }
2484  addDictionaryNontransactional(cd);
2485  }
2486 
2487  using BindType = SqliteConnector::BindType;
2488  std::vector<BindType> types(17, BindType::TEXT);
2489  if (!cd.default_value.has_value()) {
2490  types[16] = BindType::NULL_TYPE;
2491  }
2492  sqliteConnector_.query_with_text_params(
2493  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2494  "colscale, is_notnull, "
2495  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2496  "is_deletedcol, default_value) "
2497  "VALUES (?, "
2498  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2499  "?, ?, ?, "
2500  "?, "
2501  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2502  std::vector<std::string>{std::to_string(td.tableId),
2503  std::to_string(td.tableId),
2504  cd.columnName,
2513  "",
2516  cd.virtualExpr,
2518  cd.default_value.value_or("NULL")},
2519  types);
2520 
2521  sqliteConnector_.query_with_text_params(
2522  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2523  std::vector<std::string>{std::to_string(td.tableId)});
2524 
2525  sqliteConnector_.query_with_text_params(
2526  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2527  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
2528  cd.columnId = sqliteConnector_.getData<int>(0, 0);
2529 
2530  ++tableDescriptorMapById_[td.tableId]->nColumns;
2531  auto ncd = new ColumnDescriptor(cd);
2532  addToColumnMap(ncd);
2533  addColumnDescriptor(ncd);
2534  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2535 }
2536 
2537 // NOTE: this function is deprecated
2538 void Catalog::addColumn(const TableDescriptor& td, ColumnDescriptor& cd) {
2539  // caller must handle sqlite/chunk transaction TOGETHER
2540  cd.tableId = td.tableId;
2541  cd.db_id = getDatabaseId();
2542  if (td.nShards > 0 && td.shard < 0) {
2543  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2544  auto shard_cd = cd;
2545  addColumn(*shard, shard_cd);
2546  }
2547  }
2549  addDictionaryNontransactional(cd);
2550  }
2551 
2552  using BindType = SqliteConnector::BindType;
2553  std::vector<BindType> types(17, BindType::TEXT);
2554  if (!cd.default_value.has_value()) {
2555  types[16] = BindType::NULL_TYPE;
2556  }
2557  sqliteConnector_.query_with_text_params(
2558  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2559  "colscale, is_notnull, "
2560  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2561  "is_deletedcol, default_value) "
2562  "VALUES (?, "
2563  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2564  "?, ?, ?, "
2565  "?, "
2566  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2567  std::vector<std::string>{std::to_string(td.tableId),
2568  std::to_string(td.tableId),
2569  cd.columnName,
2578  "",
2581  cd.virtualExpr,
2583  cd.default_value.value_or("NULL")},
2584  types);
2585 
2586  sqliteConnector_.query_with_text_params(
2587  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2588  std::vector<std::string>{std::to_string(td.tableId)});
2589 
2590  sqliteConnector_.query_with_text_params(
2591  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2592  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
2593  cd.columnId = sqliteConnector_.getData<int>(0, 0);
2594 
2595  ++tableDescriptorMapById_[td.tableId]->nColumns;
2596  auto ncd = new ColumnDescriptor(cd);
2597  addToColumnMap(ncd);
2598  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
2599 }
2600 
2601 void Catalog::dropColumnTransactional(const TableDescriptor& td,
2602  const ColumnDescriptor& cd) {
2603  dropColumnPolicies(td, cd);
2604 
2605  cat_write_lock write_lock(this);
2607  sqliteConnector_.query("BEGIN TRANSACTION");
2608  try {
2609  dropColumnNontransactional(td, cd);
2610  } catch (std::exception& e) {
2611  sqliteConnector_.query("ROLLBACK TRANSACTION");
2612  throw;
2613  }
2614  sqliteConnector_.query("END TRANSACTION");
2615 }
2616 
2617 void Catalog::dropColumnNontransactional(const TableDescriptor& td,
2618  const ColumnDescriptor& cd) {
2619  cat_write_lock write_lock(this);
2621 
2622  sqliteConnector_.query_with_text_params(
2623  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2624  std::vector<std::string>{std::to_string(td.tableId), std::to_string(cd.columnId)});
2625 
2626  sqliteConnector_.query_with_text_params(
2627  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2628  std::vector<std::string>{std::to_string(td.tableId)});
2629 
2630  ColumnDescriptorMap::iterator columnDescIt =
2631  columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2632  CHECK(columnDescIt != columnDescriptorMap_.end());
2633 
2634  auto ocd = columnDescIt->second;
2635  removeFromColumnMap(ocd);
2636  --tableDescriptorMapById_[td.tableId]->nColumns;
2637  removeColumnDescriptor(ocd);
2638  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2639 
2640  // for each shard
2641  if (td.nShards > 0 && td.shard < 0) {
2642  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2643  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2644  CHECK(shard_cd);
2645  dropColumnNontransactional(*shard, *shard_cd);
2646  }
2647  }
2648 }
2649 
2650 void Catalog::dropColumnPolicies(const TableDescriptor& td, const ColumnDescriptor& cd) {
2651 
2652  // for each shard
2653  if (td.nShards > 0 && td.shard < 0) {
2654  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2655  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2656  CHECK(shard_cd);
2657  dropColumnPolicies(*shard, *shard_cd);
2658  }
2659  }
2660 }
2661 
2662 // NOTE: this function is deprecated
2663 void Catalog::dropColumn(const TableDescriptor& td, const ColumnDescriptor& cd) {
2664  {
2665  cat_write_lock write_lock(this);
2666  cat_sqlite_lock sqlite_lock(getObjForLock());
2667  // caller must handle sqlite/chunk transaction TOGETHER
2668  sqliteConnector_.query_with_text_params(
2669  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2670  std::vector<std::string>{std::to_string(td.tableId),
2671  std::to_string(cd.columnId)});
2672 
2673  sqliteConnector_.query_with_text_params(
2674  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2675  std::vector<std::string>{std::to_string(td.tableId)});
2676 
2677  ColumnDescriptorMap::iterator columnDescIt =
2678  columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2679  CHECK(columnDescIt != columnDescriptorMap_.end());
2680 
2681  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
2682  removeFromColumnMap(columnDescIt->second);
2683  --tableDescriptorMapById_[td.tableId]->nColumns;
2684  }
2685 
2686  // for each shard
2687  if (td.nShards > 0 && td.shard < 0) {
2688  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2689  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2690  CHECK(shard_cd);
2691  dropColumn(*shard, *shard_cd);
2692  }
2693  }
2694 }
2695 
2696 void Catalog::removeColumnDescriptor(const ColumnDescriptor* cd) {
2697  if (!cd) {
2698  return;
2699  }
2700 
2701  auto tabDescIt = tableDescriptorMapById_.find(cd->tableId);
2702  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2703  auto td = tabDescIt->second;
2704  auto& cd_by_spi = td->columnIdBySpi_;
2705  cd_by_spi.erase(std::remove(cd_by_spi.begin(), cd_by_spi.end(), cd->columnId),
2706  cd_by_spi.end());
2707  delete cd;
2708  std::sort(cd_by_spi.begin(), cd_by_spi.end());
2709 }
2710 
2711 void Catalog::addColumnDescriptor(const ColumnDescriptor* cd) {
2712  if (!cd || cd->isGeoPhyCol) {
2713  return;
2714  }
2715 
2716  auto tabDescIt = tableDescriptorMapById_.find(cd->tableId);
2717  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2718  auto td = tabDescIt->second;
2719  auto& cd_by_spi = td->columnIdBySpi_;
2720 
2721  if (cd_by_spi.end() == std::find(cd_by_spi.begin(), cd_by_spi.end(), cd->columnId)) {
2722  cd_by_spi.push_back(cd->columnId);
2723  }
2724  std::sort(cd_by_spi.begin(), cd_by_spi.end());
2725 }
2726 
2727 // NOTE: this function is deprecated
2728 void Catalog::rollLegacy(const bool forward) {
2729  cat_write_lock write_lock(this);
2730  std::set<const TableDescriptor*> tds;
2731 
2732  for (const auto& cdr : columnDescriptorsForRoll) {
2733  auto ocd = cdr.first;
2734  auto ncd = cdr.second;
2735  CHECK(ocd || ncd);
2736  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2737  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2738  auto td = tabDescIt->second;
2739  auto& vc = td->columnIdBySpi_;
2740  if (forward) {
2741  if (ocd) {
2742  if (nullptr == ncd ||
2743  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2744  delDictionaryNontransactional(*ocd);
2745  }
2746 
2747  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2748 
2749  delete ocd;
2750  }
2751  if (ncd) {
2752  // append columnId if its new and not phy geo
2753  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2754  if (!ncd->isGeoPhyCol) {
2755  vc.push_back(ncd->columnId);
2756  }
2757  }
2758  }
2759  tds.insert(td);
2760  } else {
2761  if (ocd) {
2762  addToColumnMap(ocd);
2763  }
2764  // roll back the dict of new column
2765  if (ncd) {
2766  removeFromColumnMap(ncd);
2767  if (nullptr == ocd ||
2768  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2769  delDictionaryNontransactional(*ncd);
2770  }
2771  delete ncd;
2772  }
2773  }
2774  }
2775  columnDescriptorsForRoll.clear();
2776 
2777  if (forward) {
2778  for (const auto td : tds) {
2779  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2780  }
2781  }
2782 }
2783 
2784 void Catalog::expandGeoColumn(const ColumnDescriptor& cd,
2785  list<ColumnDescriptor>& columns) {
2786  const auto& col_ti = cd.columnType;
2787  if (IS_GEO(col_ti.get_type())) {
2788  switch (col_ti.get_type()) {
2789  case kPOINT: {
2790  ColumnDescriptor physical_cd_coords(true);
2791  physical_cd_coords.columnName = cd.columnName + "_coords";
2792  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2793  // Raw data: compressed/uncompressed coords
2794  coords_ti.set_subtype(kTINYINT);
2795  size_t unit_size;
2796  if (col_ti.get_compression() == kENCODING_GEOINT &&
2797  col_ti.get_comp_param() == 32) {
2798  unit_size = 4 * sizeof(int8_t);
2799  } else {
2800  CHECK(col_ti.get_compression() == kENCODING_NONE);
2801  unit_size = 8 * sizeof(int8_t);
2802  }
2803  coords_ti.set_size(2 * unit_size);
2804  physical_cd_coords.columnType = coords_ti;
2805  columns.push_back(physical_cd_coords);
2806 
2807  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2808 
2809  break;
2810  }
2811  case kMULTIPOINT:
2812  case kLINESTRING: {
2813  ColumnDescriptor physical_cd_coords(true);
2814  physical_cd_coords.columnName = cd.columnName + "_coords";
2815  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2816  // Raw data: compressed/uncompressed coords
2817  coords_ti.set_subtype(kTINYINT);
2818  physical_cd_coords.columnType = coords_ti;
2819  columns.push_back(physical_cd_coords);
2820 
2821  ColumnDescriptor physical_cd_bounds(true);
2822  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2823  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2824  bounds_ti.set_subtype(kDOUBLE);
2825  bounds_ti.set_size(4 * sizeof(double));
2826  physical_cd_bounds.columnType = bounds_ti;
2827  columns.push_back(physical_cd_bounds);
2828 
2829  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2830 
2831  break;
2832  }
2833  case kMULTILINESTRING: {
2834  ColumnDescriptor physical_cd_coords(true);
2835  physical_cd_coords.columnName = cd.columnName + "_coords";
2836  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2837  // Raw data: compressed/uncompressed coords
2838  coords_ti.set_subtype(kTINYINT);
2839  physical_cd_coords.columnType = coords_ti;
2840  columns.push_back(physical_cd_coords);
2841 
2842  ColumnDescriptor physical_cd_linestring_sizes(true);
2843  physical_cd_linestring_sizes.columnName = cd.columnName + "_linestring_sizes";
2844  SQLTypeInfo linestring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2845  linestring_sizes_ti.set_subtype(kINT);
2846  physical_cd_linestring_sizes.columnType = linestring_sizes_ti;
2847  columns.push_back(physical_cd_linestring_sizes);
2848 
2849  ColumnDescriptor physical_cd_bounds(true);
2850  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2851  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2852  bounds_ti.set_subtype(kDOUBLE);
2853  bounds_ti.set_size(4 * sizeof(double));
2854  physical_cd_bounds.columnType = bounds_ti;
2855  columns.push_back(physical_cd_bounds);
2856 
2857  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2858 
2859  break;
2860  }
2861  case kPOLYGON: {
2862  ColumnDescriptor physical_cd_coords(true);
2863  physical_cd_coords.columnName = cd.columnName + "_coords";
2864  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2865  // Raw data: compressed/uncompressed coords
2866  coords_ti.set_subtype(kTINYINT);
2867  physical_cd_coords.columnType = coords_ti;
2868  columns.push_back(physical_cd_coords);
2869 
2870  ColumnDescriptor physical_cd_ring_sizes(true);
2871  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2872  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2873  ring_sizes_ti.set_subtype(kINT);
2874  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2875  columns.push_back(physical_cd_ring_sizes);
2876 
2877  ColumnDescriptor physical_cd_bounds(true);
2878  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2879  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2880  bounds_ti.set_subtype(kDOUBLE);
2881  bounds_ti.set_size(4 * sizeof(double));
2882  physical_cd_bounds.columnType = bounds_ti;
2883  columns.push_back(physical_cd_bounds);
2884 
2885  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2886 
2887  break;
2888  }
2889  case kMULTIPOLYGON: {
2890  ColumnDescriptor physical_cd_coords(true);
2891  physical_cd_coords.columnName = cd.columnName + "_coords";
2892  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2893  // Raw data: compressed/uncompressed coords
2894  coords_ti.set_subtype(kTINYINT);
2895  physical_cd_coords.columnType = coords_ti;
2896  columns.push_back(physical_cd_coords);
2897 
2898  ColumnDescriptor physical_cd_ring_sizes(true);
2899  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2900  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2901  ring_sizes_ti.set_subtype(kINT);
2902  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2903  columns.push_back(physical_cd_ring_sizes);
2904 
2905  ColumnDescriptor physical_cd_poly_rings(true);
2906  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2907  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2908  poly_rings_ti.set_subtype(kINT);
2909  physical_cd_poly_rings.columnType = poly_rings_ti;
2910  columns.push_back(physical_cd_poly_rings);
2911 
2912  ColumnDescriptor physical_cd_bounds(true);
2913  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2914  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2915  bounds_ti.set_subtype(kDOUBLE);
2916  bounds_ti.set_size(4 * sizeof(double));
2917  physical_cd_bounds.columnType = bounds_ti;
2918  columns.push_back(physical_cd_bounds);
2919 
2920  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2921 
2922  break;
2923  }
2924  default:
2925  throw runtime_error("Unrecognized geometry type.");
2926  break;
2927  }
2928  }
2929 }
2930 
2931 namespace {
2933  auto timing_type_entry =
2935  CHECK(timing_type_entry != foreign_table.options.end());
2936  if (timing_type_entry->second ==
2939  foreign_table.options);
2940  }
2942 }
2943 } // namespace
2944 
2945 void Catalog::createTable(
2946  TableDescriptor& td,
2947  const list<ColumnDescriptor>& cols,
2948  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2949  bool isLogicalTable) {
2950  cat_write_lock write_lock(this);
2951  list<ColumnDescriptor> cds = cols;
2952  list<DictDescriptor> dds;
2953  std::set<std::string> toplevel_column_names;
2954  list<ColumnDescriptor> columns;
2955 
2956  if (!td.storageType.empty() &&
2959  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2960  }
2961  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2962  }
2963 
2964  for (auto cd : cds) {
2965  if (cd.columnName == "rowid") {
2966  throw std::runtime_error(
2967  "Cannot create column with name rowid. rowid is a system defined column.");
2968  }
2969  columns.push_back(cd);
2970  toplevel_column_names.insert(cd.columnName);
2971  if (cd.columnType.is_geometry()) {
2972  expandGeoColumn(cd, columns);
2973  }
2974  }
2975  cds.clear();
2976 
2977  ColumnDescriptor cd;
2978  // add row_id column -- Must be last column in the table
2979  cd.columnName = "rowid";
2980  cd.isSystemCol = true;
2981  cd.columnType = SQLTypeInfo(kBIGINT, true);
2982 #ifdef MATERIALIZED_ROWID
2983  cd.isVirtualCol = false;
2984 #else
2985  cd.isVirtualCol = true;
2986  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2987 #endif
2988  columns.push_back(cd);
2989  toplevel_column_names.insert(cd.columnName);
2990 
2991  if (td.hasDeletedCol) {
2992  ColumnDescriptor cd_del;
2993  cd_del.columnName = "$deleted$";
2994  cd_del.isSystemCol = true;
2995  cd_del.isVirtualCol = false;
2996  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2997  cd_del.isDeletedCol = true;
2998 
2999  columns.push_back(cd_del);
3000  }
3001 
3002  for (auto& column : columns) {
3003  column.db_id = getDatabaseId();
3004  }
3005 
3006  td.nColumns = columns.size();
3007  // TODO(sy): don't take disk locks or touch sqlite connector for temporary tables
3008  cat_sqlite_lock sqlite_lock(getObjForLock());
3009  sqliteConnector_.query("BEGIN TRANSACTION");
3011  try {
3012  sqliteConnector_.query_with_text_params(
3013  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
3014  std::vector<std::string>{td.tableName,
3015  std::to_string(td.userId),
3017  std::to_string(td.isView),
3018  "",
3023  std::to_string(td.maxRows),
3024  td.partitions,
3026  std::to_string(td.shard),
3027  std::to_string(td.nShards),
3029  td.storageType,
3032  td.keyMetainfo});
3033 
3034  // now get the auto generated tableid
3035  sqliteConnector_.query_with_text_param(
3036  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
3037  td.tableId = sqliteConnector_.getData<int>(0, 0);
3038  int colId = 1;
3039  for (auto cd : columns) {
3041  const bool is_foreign_col =
3042  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
3043  if (!is_foreign_col) {
3044  // Ideally we would like to not persist string dictionaries for system tables,
3045  // since system table content can be highly dynamic and string dictionaries
3046  // are not currently vacuumed. However, in distributed this causes issues
3047  // when the cluster is out of sync (when the agg resets but leaves persist) so
3048  // for the sake of testing we need to leave this as normal dictionaries until
3049  // we solve the distributed issue.
3050  auto use_temp_dictionary = false; // td.is_system_table;
3051  setColumnDictionary(cd, dds, td, isLogicalTable, use_temp_dictionary);
3052  }
3053  }
3054 
3055  if (toplevel_column_names.count(cd.columnName)) {
3056  if (!cd.isGeoPhyCol) {
3057  td.columnIdBySpi_.push_back(colId);
3058  }
3059  }
3060 
3061  using BindType = SqliteConnector::BindType;
3062  std::vector<BindType> types(17, BindType::TEXT);
3063  if (!cd.default_value.has_value()) {
3064  types[16] = BindType::NULL_TYPE;
3065  }
3066  sqliteConnector_.query_with_text_params(
3067  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
3068  "coldim, colscale, is_notnull, "
3069  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
3070  "virtual_expr, is_deletedcol, default_value) "
3071  "VALUES (?, ?, ?, ?, ?, "
3072  "?, "
3073  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
3074  std::vector<std::string>{std::to_string(td.tableId),
3075  std::to_string(colId),
3076  cd.columnName,
3085  "",
3088  cd.virtualExpr,
3090  cd.default_value.value_or("NULL")},
3091  types);
3092  cd.tableId = td.tableId;
3093  cd.columnId = colId++;
3094  cds.push_back(cd);
3095  }
3096  if (td.isView) {
3097  sqliteConnector_.query_with_text_params(
3098  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
3099  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
3100  }
3102  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
3103  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
3104  sqliteConnector_.query_with_text_params(
3105  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
3106  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
3107  std::vector<std::string>{std::to_string(foreign_table.tableId),
3108  std::to_string(foreign_table.foreign_server->id),
3109  foreign_table.getOptionsAsJsonString(),
3110  std::to_string(foreign_table.last_refresh_time),
3111  std::to_string(foreign_table.next_refresh_time)});
3112  }
3113  } catch (std::exception& e) {
3114  sqliteConnector_.query("ROLLBACK TRANSACTION");
3115  throw;
3116  }
3117  } else { // Temporary table
3118  td.tableId = nextTempTableId_++;
3119  int colId = 1;
3120  for (auto cd : columns) {
3122  const bool is_foreign_col =
3123  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
3124 
3125  if (!is_foreign_col) {
3126  // Create a new temporary dictionary
3127  std::string fileName("");
3128  std::string folderPath("");
3129  DictRef dict_ref(currentDB_.dbId, nextTempDictId_);
3130  nextTempDictId_++;
3131  DictDescriptor dd(dict_ref,
3132  fileName,
3134  false,
3135  1,
3136  folderPath,
3137  true); // Is dictName (2nd argument) used?
3138  dds.push_back(dd);
3139  if (!cd.columnType.is_array()) {
3141  }
3142  cd.columnType.set_comp_param(dict_ref.dictId);
3143  set_dict_key(cd);
3144  }
3145  }
3146  if (toplevel_column_names.count(cd.columnName)) {
3147  if (!cd.isGeoPhyCol) {
3148  td.columnIdBySpi_.push_back(colId);
3149  }
3150  }
3151  cd.tableId = td.tableId;
3152  cd.columnId = colId++;
3153  cds.push_back(cd);
3154  }
3155 
3157  serializeTableJsonUnlocked(&td, cds);
3158  }
3159  }
3160 
3161  try {
3162  auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
3163  if (cache) {
3164  CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.tableId}))
3165  << "Disk cache at " + cache->getCacheDirectory()
3166  << " contains preexisting data for new table. Please "
3167  "delete or clear cache before continuing";
3168  }
3169 
3170  addTableToMap(&td, cds, dds);
3171  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
3172  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
3173  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
3174  }
3175  } catch (std::exception& e) {
3176  sqliteConnector_.query("ROLLBACK TRANSACTION");
3177  removeTableFromMap(td.tableName, td.tableId, true);
3178  throw;
3179  }
3180  sqliteConnector_.query("END TRANSACTION");
3181 
3182  if (td.storageType != StorageType::FOREIGN_TABLE) {
3183  write_lock.unlock();
3184  sqlite_lock.unlock();
3185  getMetadataForTable(td.tableName,
3186  true); // cause instantiateFragmenter() to be called
3187  }
3188 }
3189 
3190 void Catalog::serializeTableJsonUnlocked(const TableDescriptor* td,
3191  const std::list<ColumnDescriptor>& cds) const {
3192  // relies on the catalog write lock
3193  using namespace rapidjson;
3194 
3195  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
3196 
3197  const auto db_name = currentDB_.dbName;
3198  const auto file_path = table_json_filepath(basePath_, db_name);
3199 
3200  Document d;
3201  if (boost::filesystem::exists(file_path)) {
3202  // look for an existing file for this database
3203  std::ifstream reader(file_path.string());
3204  CHECK(reader.is_open());
3205  IStreamWrapper json_read_wrapper(reader);
3206  d.ParseStream(json_read_wrapper);
3207  } else {
3208  d.SetObject();
3209  }
3210  CHECK(d.IsObject());
3211  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
3212 
3213  Value table(kObjectType);
3214  table.AddMember(
3215  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
3216  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
3217  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
3218 
3219  for (const auto& cd : cds) {
3220  Value column(kObjectType);
3221  column.AddMember(
3222  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
3223  column.AddMember("coltype",
3224  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
3225  d.GetAllocator());
3226  column.AddMember("colsubtype",
3227  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
3228  d.GetAllocator());
3229  column.AddMember("compression",
3230  Value().SetInt(static_cast<int>(cd.columnType.get_compression())),
3231  d.GetAllocator());
3232  column.AddMember("comp_param",
3233  Value().SetInt(static_cast<int>(cd.columnType.get_comp_param())),
3234  d.GetAllocator());
3235  column.AddMember("size",
3236  Value().SetInt(static_cast<int>(cd.columnType.get_size())),
3237  d.GetAllocator());
3238  column.AddMember(
3239  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
3240  column.AddMember(
3241  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
3242  column.AddMember(
3243  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
3244  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
3245  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
3246  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
3247  table["columns"].PushBack(column, d.GetAllocator());
3248  }
3249  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
3250 
3251  // Overwrite the existing file
3252  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3253  CHECK(writer.is_open());
3254  OStreamWrapper json_wrapper(writer);
3255 
3256  Writer<OStreamWrapper> json_writer(json_wrapper);
3257  d.Accept(json_writer);
3258  writer.close();
3259 }
3260 
3261 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
3262  // relies on the catalog write lock
3263  using namespace rapidjson;
3264 
3265  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
3266 
3267  const auto db_name = currentDB_.dbName;
3268  const auto file_path = table_json_filepath(basePath_, db_name);
3269 
3270  CHECK(boost::filesystem::exists(file_path));
3271  Document d;
3272 
3273  std::ifstream reader(file_path.string());
3274  CHECK(reader.is_open());
3275  IStreamWrapper json_read_wrapper(reader);
3276  d.ParseStream(json_read_wrapper);
3277 
3278  CHECK(d.IsObject());
3279  auto table_name_ref = StringRef(table_name.c_str());
3280  CHECK(d.HasMember(table_name_ref));
3281  CHECK(d.RemoveMember(table_name_ref));
3282 
3283  // Overwrite the existing file
3284  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3285  CHECK(writer.is_open());
3286  OStreamWrapper json_wrapper(writer);
3287 
3288  Writer<OStreamWrapper> json_writer(json_wrapper);
3289  d.Accept(json_writer);
3290  writer.close();
3291 }
3292 
3293 void Catalog::createForeignServer(
3294  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3295  bool if_not_exists) {
3296  cat_write_lock write_lock(this);
3297  cat_sqlite_lock sqlite_lock(getObjForLock());
3298  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
3299 }
3300 
3301 void Catalog::createForeignServerNoLocks(
3302  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3303  bool if_not_exists) {
3304  const auto& name = foreign_server->name;
3305 
3306  sqliteConnector_.query_with_text_params(
3307  "SELECT name from omnisci_foreign_servers where name = ?",
3308  std::vector<std::string>{name});
3309 
3310  if (sqliteConnector_.getNumRows() == 0) {
3311  foreign_server->creation_time = std::time(nullptr);
3312  sqliteConnector_.query_with_text_params(
3313  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
3314  "creation_time, "
3315  "options) "
3316  "VALUES (?, ?, ?, ?, ?)",
3317  std::vector<std::string>{name,
3318  foreign_server->data_wrapper_type,
3319  std::to_string(foreign_server->user_id),
3320  std::to_string(foreign_server->creation_time),
3321  foreign_server->getOptionsAsJsonString()});
3322  sqliteConnector_.query_with_text_params(
3323  "SELECT id from omnisci_foreign_servers where name = ?",
3324  std::vector<std::string>{name});
3325  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
3326  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
3327  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
3328  std::move(foreign_server);
3329  CHECK(foreignServerMap_.find(name) == foreignServerMap_.end())
3330  << "Attempting to insert a foreign server into foreign server map that already "
3331  "exists.";
3332  foreignServerMap_[name] = foreign_server_shared;
3333  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
3334  } else if (!if_not_exists) {
3335  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
3336  "\" already exists."};
3337  }
3338 
3339  const auto& server_it = foreignServerMap_.find(name);
3340  CHECK(server_it != foreignServerMap_.end());
3341  CHECK(foreignServerMapById_.find(server_it->second->id) != foreignServerMapById_.end());
3342 }
3343 
3344 const foreign_storage::ForeignServer* Catalog::getForeignServer(
3345  const std::string& server_name) const {
3346  foreign_storage::ForeignServer* foreign_server = nullptr;
3347  cat_read_lock read_lock(this);
3348 
3349  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
3350  foreign_server = foreignServerMap_.find(server_name)->second.get();
3351  }
3352  return foreign_server;
3353 }
3354 
3355 const std::unique_ptr<const foreign_storage::ForeignServer>
3356 Catalog::getForeignServerFromStorage(const std::string& server_name) {
3357  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
3358  cat_sqlite_lock sqlite_lock(getObjForLock());
3359  sqliteConnector_.query_with_text_params(
3360  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
3361  "FROM omnisci_foreign_servers WHERE name = ?",
3362  std::vector<std::string>{server_name});
3363  if (sqliteConnector_.getNumRows() > 0) {
3364  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
3365  sqliteConnector_.getData<int>(0, 0),
3366  sqliteConnector_.getData<std::string>(0, 1),
3367  sqliteConnector_.getData<std::string>(0, 2),
3368  sqliteConnector_.getData<std::string>(0, 3),
3369  sqliteConnector_.getData<std::int32_t>(0, 4),
3370  sqliteConnector_.getData<std::int32_t>(0, 5));
3371  }
3372  return foreign_server;
3373 }
3374 
3375 const std::unique_ptr<const foreign_storage::ForeignTable>
3376 Catalog::getForeignTableFromStorage(int table_id) {
3377  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
3378  cat_sqlite_lock sqlite_lock(getObjForLock());
3379  sqliteConnector_.query_with_text_params(
3380  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
3381  "omnisci_foreign_tables WHERE table_id = ?",
3382  std::vector<std::string>{to_string(table_id)});
3383  auto num_rows = sqliteConnector_.getNumRows();
3384  if (num_rows > 0) {
3385  CHECK_EQ(size_t(1), num_rows);
3386  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
3387  sqliteConnector_.getData<int>(0, 0),
3388  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
3389  sqliteConnector_.getData<std::string>(0, 2),
3390  sqliteConnector_.getData<int64_t>(0, 3),
3391  sqliteConnector_.getData<int64_t>(0, 4));
3392  }
3393  return foreign_table;
3394 }
3395 
3396 void Catalog::changeForeignServerOwner(const std::string& server_name,
3397  const int new_owner_id) {
3398  cat_write_lock write_lock(this);
3399  foreign_storage::ForeignServer* foreign_server =
3400  foreignServerMap_.find(server_name)->second.get();
3401  CHECK(foreign_server);
3402  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
3403  // update in-memory server
3404  foreign_server->user_id = new_owner_id;
3405 }
3406 
3407 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
3408  const std::string& data_wrapper) {
3409  cat_write_lock write_lock(this);
3410  auto data_wrapper_type = to_upper(data_wrapper);
3411  // update in-memory server
3412  foreign_storage::ForeignServer* foreign_server =
3413  foreignServerMap_.find(server_name)->second.get();
3414  CHECK(foreign_server);
3415  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
3416  foreign_server->data_wrapper_type = data_wrapper_type;
3417  try {
3418  foreign_server->validate();
3419  } catch (const std::exception& e) {
3420  // validation did not succeed:
3421  // revert to saved data_wrapper_type & throw exception
3422  foreign_server->data_wrapper_type = saved_data_wrapper_type;
3423  throw;
3424  }
3425  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
3426 }
3427 
3428 void Catalog::setForeignServerOptions(const std::string& server_name,
3429  const std::string& options) {
3430  cat_write_lock write_lock(this);
3431  // update in-memory server
3432  foreign_storage::ForeignServer* foreign_server =
3433  foreignServerMap_.find(server_name)->second.get();
3434  CHECK(foreign_server);
3435  auto saved_options = foreign_server->options;
3436  foreign_server->populateOptionsMap(options, true);
3437  try {
3438  foreign_server->validate();
3439  } catch (const std::exception& e) {
3440  // validation did not succeed:
3441  // revert to saved options & throw exception
3442  foreign_server->options = saved_options;
3443  throw;
3444  }
3445  setForeignServerProperty(server_name, "options", options);
3446 }
3447 
3448 void Catalog::renameForeignServer(const std::string& server_name,
3449  const std::string& name) {
3450  cat_write_lock write_lock(this);
3451  auto foreign_server_it = foreignServerMap_.find(server_name);
3452  CHECK(foreign_server_it != foreignServerMap_.end());
3453  setForeignServerProperty(server_name, "name", name);
3454  auto foreign_server_shared = foreign_server_it->second;
3455  foreign_server_shared->name = name;
3456  foreignServerMap_[name] = foreign_server_shared;
3457  foreignServerMap_.erase(foreign_server_it);
3458 }
3459 
3460 void Catalog::dropForeignServer(const std::string& server_name) {
3461  cat_write_lock write_lock(this);
3462  cat_sqlite_lock sqlite_lock(getObjForLock());
3463 
3464  sqliteConnector_.query_with_text_params(
3465  "SELECT id from omnisci_foreign_servers where name = ?",
3466  std::vector<std::string>{server_name});
3467  auto num_rows = sqliteConnector_.getNumRows();
3468  if (num_rows > 0) {
3469  CHECK_EQ(size_t(1), num_rows);
3470  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
3471  sqliteConnector_.query_with_text_param(
3472  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
3473  std::to_string(server_id));
3474  if (sqliteConnector_.getNumRows() > 0) {
3475  throw std::runtime_error{"Foreign server \"" + server_name +
3476  "\" is referenced "
3477  "by existing foreign tables and cannot be dropped."};
3478  }
3479  sqliteConnector_.query("BEGIN TRANSACTION");
3480  try {
3481  sqliteConnector_.query_with_text_params(
3482  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
3483  std::vector<std::string>{server_name});
3484  } catch (const std::exception& e) {
3485  sqliteConnector_.query("ROLLBACK TRANSACTION");
3486  throw;
3487  }
3488  sqliteConnector_.query("END TRANSACTION");
3489  foreignServerMap_.erase(server_name);
3490  foreignServerMapById_.erase(server_id);
3491  }
3492 }
3493 
3494 void Catalog::getForeignServersForUser(
3495  const rapidjson::Value* filters,
3496  const UserMetadata& user,
3497  std::vector<const foreign_storage::ForeignServer*>& results) {
3498  sys_read_lock syscat_read_lock(&SysCatalog::instance());
3499  cat_read_lock read_lock(this);
3500  cat_sqlite_lock sqlite_lock(getObjForLock());
3501  // Customer facing and internal SQlite names
3502  std::map<std::string, std::string> col_names{{"server_name", "name"},
3503  {"data_wrapper", "data_wrapper_type"},
3504  {"created_at", "creation_time"},
3505  {"options", "options"}};
3506 
3507  // TODO add "owner" when FSI privilege is implemented
3508  std::stringstream filter_string;
3509  std::vector<std::string> arguments;
3510 
3511  if (filters != nullptr) {
3512  // Create SQL WHERE clause for SQLite query
3513  int num_filters = 0;
3514  filter_string << " WHERE";
3515  for (auto& filter_def : filters->GetArray()) {
3516  if (num_filters > 0) {
3517  filter_string << " " << std::string(filter_def["chain"].GetString());
3518  ;
3519  }
3520 
3521  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
3522  col_names.end()) {
3523  throw std::runtime_error{"Attribute with name \"" +
3524  std::string(filter_def["attribute"].GetString()) +
3525  "\" does not exist."};
3526  }
3527 
3528  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
3529 
3530  bool equals_operator = false;
3531  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
3532  filter_string << " = ? ";
3533  equals_operator = true;
3534  } else {
3535  filter_string << " LIKE ? ";
3536  }
3537 
3538  bool timestamp_column =
3539  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
3540 
3541  if (timestamp_column && !equals_operator) {
3542  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
3543  }
3544 
3545  if (timestamp_column && equals_operator) {
3546  arguments.push_back(std::to_string(
3547  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
3548  } else {
3549  arguments.emplace_back(filter_def["value"].GetString());
3550  }
3551 
3552  num_filters++;
3553  }
3554  }
3555  // Create select query for the omnisci_foreign_servers table
3556  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
3557  query += filter_string.str();
3558 
3559  sqliteConnector_.query_with_text_params(query, arguments);
3560  auto num_rows = sqliteConnector_.getNumRows();
3561 
3562  if (sqliteConnector_.getNumRows() == 0) {
3563  return;
3564  }
3565 
3566  CHECK(sqliteConnector_.getNumCols() == 1);
3567  // Return pointers to objects
3568  results.reserve(num_rows);
3569  for (size_t row = 0; row < num_rows; ++row) {
3570  const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
3571  if (shared::contains(INTERNAL_SERVERS, server_name)) {
3572  continue;
3573  }
3574  const foreign_storage::ForeignServer* foreign_server = getForeignServer(server_name);
3575  CHECK(foreign_server != nullptr);
3576 
3577  DBObject dbObject(foreign_server->name, ServerDBObjectType);
3578  dbObject.loadKey(*this);
3579  std::vector<DBObject> privObjects = {dbObject};
3580  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
3581  // skip server, as there are no privileges to access it
3582  continue;
3583  }
3584  results.push_back(foreign_server);
3585  }
3586 }
3587 
3588 // returns the table epoch or -1 if there is something wrong with the shared epoch
3589 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
3590  cat_read_lock read_lock(this);
3591  const auto td = getMetadataForTable(table_id, false);
3592  if (!td) {
3593  std::stringstream table_not_found_error_message;
3594  table_not_found_error_message << "Table (" << db_id << "," << table_id
3595  << ") not found";
3596  throw std::runtime_error(table_not_found_error_message.str());
3597  }
3598  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
3599  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3600  // check all shards have same checkpoint
3601  const auto physicalTables = physicalTableIt->second;
3602  CHECK(!physicalTables.empty());
3603  size_t curr_epoch{0}, first_epoch{0};
3604  int32_t first_table_id{0};
3605  bool are_epochs_inconsistent{false};
3606  for (size_t i = 0; i < physicalTables.size(); i++) {
3607  int32_t physical_tb_id = physicalTables[i];
3608  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3609  CHECK(phys_td);
3610 
3611  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
3612  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3613  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
3614  if (i == 0) {
3615  first_epoch = curr_epoch;
3616  first_table_id = physical_tb_id;
3617  } else if (first_epoch != curr_epoch) {
3618  are_epochs_inconsistent = true;
3619  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
3620  << ", db id: " << db_id
3621  << ". First table (table id: " << first_table_id
3622  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
3623  << ", has inconsistent epoch: " << curr_epoch
3624  << ". See previous INFO logs for all epochs and their table ids.";
3625  }
3626  }
3627  if (are_epochs_inconsistent) {
3628  // oh dear the shards do not agree on the epoch for this table
3629  return -1;
3630  }
3631  return curr_epoch;
3632  } else {
3633  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3634  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3635  << ", epoch: " << epoch;
3636  return epoch;
3637  }
3638 }
3639 
3640 std::vector<const foreign_storage::ForeignTable*>
3641 Catalog::getAllForeignTablesForForeignServer(const int32_t foreign_server_id) {
3642  cat_read_lock read_lock(this);
3643  std::vector<const foreign_storage::ForeignTable*> foreign_tables;
3644  for (auto entry : tableDescriptorMapById_) {
3645  auto table_descriptor = entry.second;
3646  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
3647  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
3648  CHECK(foreign_table);
3649  if (foreign_table->foreign_server->id == foreign_server_id) {
3650  foreign_tables.emplace_back(foreign_table);
3651  }
3652  }
3653  }
3654  return foreign_tables;
3655 }
3656 
3657 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
3658  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
3659  << " back to new epoch " << new_epoch;
3660  const auto td = getMetadataForTable(table_id, false);
3661  if (!td) {
3662  std::stringstream table_not_found_error_message;
3663  table_not_found_error_message << "Table (" << db_id << "," << table_id
3664  << ") not found";
3665  throw std::runtime_error(table_not_found_error_message.str());
3666  }
3667  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3668  std::stringstream is_temp_table_error_message;
3669  is_temp_table_error_message << "Cannot set epoch on temporary table";
3670  throw std::runtime_error(is_temp_table_error_message.str());
3671  }
3672 
3673  File_Namespace::FileMgrParams file_mgr_params;
3674  file_mgr_params.epoch = new_epoch;
3675  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3676 
3677  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3678  CHECK(!physical_tables.empty());
3679  for (const auto table : physical_tables) {
3680  auto table_id = table->tableId;
3681  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID " << table_id
3682  << " back to new epoch " << new_epoch;
3683  // Should have table lock from caller so safe to do this after, avoids
3684  // having to repopulate data on error
3685  removeChunks(table_id);
3686  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3687  }
3688 }
3689 
3690 void Catalog::alterPhysicalTableMetadata(
3691  const TableDescriptor* td,
3692  const TableDescriptorUpdateParams& table_update_params) {
3693  // Only called from parent alterTableParamMetadata, expect already to have catalog and
3694  // sqlite write locks
3695 
3696  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
3697 
3698  TableDescriptor* mutable_td = getMutableMetadataForTableUnlocked(td->tableId);
3699  CHECK(mutable_td);
3700  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
3701  sqliteConnector_.query_with_text_params(
3702  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
3703  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
3704  std::to_string(td->tableId)});
3705  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
3706  }
3707 
3708  if (td->maxRows != table_update_params.max_rows) {
3709  sqliteConnector_.query_with_text_params(
3710  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
3711  std::vector<std::string>{std::to_string(table_update_params.max_rows),
3712  std::to_string(td->tableId)});
3713  mutable_td->maxRows = table_update_params.max_rows;
3714  }
3715 }
3716 
3717 void Catalog::alterTableMetadata(const TableDescriptor* td,
3718  const TableDescriptorUpdateParams& table_update_params) {
3719  cat_write_lock write_lock(this);
3720  cat_sqlite_lock sqlite_lock(getObjForLock());
3721  sqliteConnector_.query("BEGIN TRANSACTION");
3722  try {
3723  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
3724  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3725  const auto physical_tables = physical_table_it->second;
3726  CHECK(!physical_tables.empty());
3727  for (size_t i = 0; i < physical_tables.size(); i++) {
3728  int32_t physical_tb_id = physical_tables[i];
3729  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3730  CHECK(phys_td);
3731  alterPhysicalTableMetadata(phys_td, table_update_params);
3732  }
3733  }
3734  alterPhysicalTableMetadata(td, table_update_params);
3735  } catch (std::exception& e) {
3736  sqliteConnector_.query("ROLLBACK TRANSACTION");
3737  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
3738  }
3739  sqliteConnector_.query("END TRANSACTION");
3740 }
3741 
3742 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
3743  const int32_t max_rollback_epochs) {
3744  // Must be called from AlterTableParamStmt or other method that takes executor and
3745  // TableSchema locks
3746  if (max_rollback_epochs <= -1) {
3747  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
3748  }
3749  const auto td = getMetadataForTable(
3750  table_id, false); // Deep copy as there will be gap between read and write locks
3751  CHECK(td); // Existence should have already been checked in
3752  // ParserNode::AlterTableParmStmt
3753  TableDescriptorUpdateParams table_update_params(td);
3754  table_update_params.max_rollback_epochs = max_rollback_epochs;
3755  if (table_update_params == td) { // Operator is overloaded to test for equality
3756  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
3757  << " to existing value, skipping operation";
3758  return;
3759  }
3760  File_Namespace::FileMgrParams file_mgr_params;
3761  file_mgr_params.epoch = -1; // Use existing epoch
3762  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
3763  setTableFileMgrParams(table_id, file_mgr_params);
3764  alterTableMetadata(td, table_update_params);
3765 }
3766 
3767 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
3768  if (max_rows < 0) {
3769  throw std::runtime_error("Max rows cannot be a negative number.");
3770  }
3771  const auto td = getMetadataForTable(table_id);
3772  CHECK(td);
3773  TableDescriptorUpdateParams table_update_params(td);
3774  table_update_params.max_rows = max_rows;
3775  if (table_update_params == td) {
3776  LOG(INFO) << "Max rows value of " << max_rows
3777  << " is the same as the existing value. Skipping update.";
3778  return;
3779  }
3780  alterTableMetadata(td, table_update_params);
3781  CHECK(td->fragmenter);
3782  td->fragmenter->dropFragmentsToSize(max_rows);
3783 }
3784 
3785 // For testing purposes only
3786 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
3787  cat_write_lock write_lock(this);
3788  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
3789  CHECK(td_entry != tableDescriptorMap_.end());
3790  auto td = td_entry->second;
3791 
3792  Executor::clearExternalCaches(true, td, getDatabaseId());
3793 
3794  TableDescriptorUpdateParams table_update_params(td);
3795  table_update_params.max_rollback_epochs = -1;
3796  write_lock.unlock();
3797 
3798  alterTableMetadata(td, table_update_params);
3799  File_Namespace::FileMgrParams file_mgr_params;
3800  file_mgr_params.max_rollback_epochs = -1;
3801  setTableFileMgrParams(td->tableId, file_mgr_params);
3802 }
3803 
3804 void Catalog::setTableFileMgrParams(
3805  const int table_id,
3806  const File_Namespace::FileMgrParams& file_mgr_params) {
3807  // Expects parent to have write lock
3808  const auto td = getMetadataForTable(table_id, false);
3809  const auto db_id = this->getDatabaseId();
3810  if (!td) {
3811  std::stringstream table_not_found_error_message;
3812  table_not_found_error_message << "Table (" << db_id << "," << table_id
3813  << ") not found";
3814  throw std::runtime_error(table_not_found_error_message.str());
3815  }
3816  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3817  std::stringstream is_temp_table_error_message;
3818  is_temp_table_error_message << "Cannot set storage params on temporary table";
3819  throw std::runtime_error(is_temp_table_error_message.str());
3820  }
3821 
3822  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3823  CHECK(!physical_tables.empty());
3824  for (const auto table : physical_tables) {
3825  auto table_id = table->tableId;
3826  removeChunks(table_id);
3827  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3828  }
3829 }
3830 
3831 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3832  const int32_t table_id) const {
3833  cat_read_lock read_lock(this);
3834  std::vector<TableEpochInfo> table_epochs;
3835  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3836  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3837  const auto physical_tables = physical_table_it->second;
3838  CHECK(!physical_tables.empty());
3839 
3840  for (const auto physical_tb_id : physical_tables) {
3841  const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3842  CHECK(phys_td);
3843 
3844  auto table_id = phys_td->tableId;
3845  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3846  table_epochs.emplace_back(table_id, epoch);
3847  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3848  << ", table id: " << table_id << ", epoch: " << epoch;
3849  }
3850  } else {
3851  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3852  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3853  << ", epoch: " << epoch;
3854  table_epochs.emplace_back(table_id, epoch);
3855  }
3856  return table_epochs;
3857 }
3858 
3859 void Catalog::setTableEpochs(const int32_t db_id,
3860  const std::vector<TableEpochInfo>& table_epochs) const {
3861  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3862  CHECK(td);
3863  File_Namespace::FileMgrParams file_mgr_params;
3864  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3865 
3866  for (const auto& table_epoch_info : table_epochs) {
3867  removeChunks(table_epoch_info.table_id);
3868  file_mgr_params.epoch = table_epoch_info.table_epoch;
3869  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3870  db_id, table_epoch_info.table_id, file_mgr_params);
3871  LOG(INFO) << "Set table epoch for db id: " << db_id
3872  << ", table id: " << table_epoch_info.table_id
3873  << ", back to epoch: " << table_epoch_info.table_epoch;
3874  }
3875 }
3876 
3877 namespace {
3878 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3879  std::string table_epochs_str{"["};
3880  bool first_entry{true};
3881  for (const auto& table_epoch : table_epochs) {
3882  if (first_entry) {
3883  first_entry = false;
3884  } else {
3885  table_epochs_str += ", ";
3886  }
3887  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3888  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3889  }
3890  table_epochs_str += "]";
3891  return table_epochs_str;
3892 }
3893 } // namespace
3894 
3895 void Catalog::setTableEpochsLogExceptions(
3896  const int32_t db_id,
3897  const std::vector<TableEpochInfo>& table_epochs) const {
3898  try {
3899  setTableEpochs(db_id, table_epochs);
3900  } catch (std::exception& e) {
3901  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3902  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3903  << ", Error: " << e.what();
3904  }
3905 }
3906 
3907 const ColumnDescriptor* Catalog::getDeletedColumn(const TableDescriptor* td) const {
3908  cat_read_lock read_lock(this);
3909  const auto it = deletedColumnPerTable_.find(td);
3910  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3911 }
3912 
3913 const bool Catalog::checkMetadataForDeletedRecs(const TableDescriptor* td,
3914  int delete_column_id) const {
3915  // check if there are rows deleted by examining the deletedColumn metadata
3916  CHECK(td);
3917  auto fragmenter = td->fragmenter;
3918  if (fragmenter) {
3919  return fragmenter->hasDeletedRows(delete_column_id);
3920  } else {
3921  return false;
3922  }
3923 }
3924 
3925 const ColumnDescriptor* Catalog::getDeletedColumnIfRowsDeleted(
3926  const TableDescriptor* td) const {
3927  std::vector<const TableDescriptor*> tds;
3928  const ColumnDescriptor* cd;
3929  {
3930  cat_read_lock read_lock(this);
3931 
3932  const auto it = deletedColumnPerTable_.find(td);
3933  // if not a table that supports delete return nullptr, nothing more to do
3934  if (it == deletedColumnPerTable_.end()) {
3935  return nullptr;
3936  }
3937  cd = it->second;
3938  tds = getPhysicalTablesDescriptors(td, false);
3939  }
3940  // individual tables are still protected by higher level locks
3941  for (auto tdd : tds) {
3942  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3943  return cd;
3944  }
3945  }
3946  // no deletes so far recorded in metadata
3947  return nullptr;
3948 }
3949 
3950 void Catalog::setDeletedColumn(const TableDescriptor* td, const ColumnDescriptor* cd) {
3951  cat_write_lock write_lock(this);
3952  setDeletedColumnUnlocked(td, cd);
3953 }
3954 
3955 void Catalog::setDeletedColumnUnlocked(const TableDescriptor* td,
3956  const ColumnDescriptor* cd) {
3957  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3958  CHECK(it_ok.second);
3959 }
3960 
3961 namespace {
3962 
3964  const Catalog& cat,
3965  const Parser::SharedDictionaryDef& shared_dict_def) {
3966  const auto& table_name = shared_dict_def.get_foreign_table();
3967  const auto td = cat.getMetadataForTable(table_name, false);
3968  CHECK(td);
3969  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3970  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3971 }
3972 
3973 } // namespace
3974 
3975 void Catalog::addReferenceToForeignDict(ColumnDescriptor& referencing_column,
3976  Parser::SharedDictionaryDef shared_dict_def,
3977  const bool persist_reference) {
3978  cat_write_lock write_lock(this);
3979  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3980  CHECK(foreign_ref_col);
3981  referencing_column.columnType = foreign_ref_col->columnType;
3982  const int dict_id = referencing_column.columnType.get_comp_param();
3983  const DictRef dict_ref(currentDB_.dbId, dict_id);
3984  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3985  CHECK(dictIt != dictDescriptorMapByRef_.end());
3986  const auto& dd = dictIt->second;
3987  CHECK_GE(dd->refcount, 1);
3988  ++dd->refcount;
3989  if (persist_reference) {
3990  cat_sqlite_lock sqlite_lock(getObjForLock());
3991  sqliteConnector_.query_with_text_params(
3992  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3993  {std::to_string(dict_id)});
3994  }
3995 }
3996 
3997 bool Catalog::setColumnSharedDictionary(
3998  ColumnDescriptor& cd,
3999  std::list<ColumnDescriptor>& cdd,
4000  std::list<DictDescriptor>& dds,
4001  const TableDescriptor td,
4002  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
4003  cat_write_lock write_lock(this);
4004  cat_sqlite_lock sqlite_lock(getObjForLock());
4005 
4006  if (shared_dict_defs.empty()) {
4007  return false;
4008  }
4009  for (const auto& shared_dict_def : shared_dict_defs) {
4010  // check if the current column is a referencing column
4011  const auto& column = shared_dict_def.get_column();
4012  if (cd.columnName == column) {
4013  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
4014  // Dictionaries are being shared in table to be created
4015  const auto& ref_column = shared_dict_def.get_foreign_column();
4016  auto colIt =
4017  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
4018  return !ref_column.compare(it.columnName);
4019  });
4020  CHECK(colIt != cdd.end());
4021  cd.columnType = colIt->columnType;
4022 
4023  const int dict_id = colIt->columnType.get_comp_param();
4024  CHECK_GE(dict_id, 1);
4025  auto dictIt = std::find_if(
4026  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
4027  return it.dictRef.dbId == this->currentDB_.dbId &&
4028  it.dictRef.dictId == dict_id;
4029  });
4030  if (dictIt != dds.end()) {
4031  // There exists dictionary definition of a dictionary column
4032  CHECK_GE(dictIt->refcount, 1);
4033  ++dictIt->refcount;
4034  if (!table_is_temporary(&td)) {
4035  // Persist reference count
4036  sqliteConnector_.query_with_text_params(
4037  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
4038  {std::to_string(dict_id)});
4039  }
4040  } else {
4041  // The dictionary is referencing a column which is referencing a column in
4042  // diffrent table
4043  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
4044  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
4045  }
4046  } else {
4047  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
4048  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
4049  if (table_is_temporary(foreign_td)) {
4050  if (!table_is_temporary(&td)) {
4051  throw std::runtime_error(
4052  "Only temporary tables can share dictionaries with other temporary "
4053  "tables.");
4054  }
4055  addReferenceToForeignDict(cd, shared_dict_def, false);
4056  } else {
4057  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
4058  }
4059  }
4060  return true;
4061  }
4062  }
4063  return false;
4064 }
4065 
4066 void Catalog::setColumnDictionary(ColumnDescriptor& cd,
4067  std::list<DictDescriptor>& dds,
4068  const TableDescriptor& td,
4069  bool is_logical_table,
4070  bool use_temp_dictionary) {
4071  cat_write_lock write_lock(this);
4072 
4073  std::string dictName{"Initial_key"};
4074  int dictId{0};
4075  std::string folderPath;
4076  if (is_logical_table) {
4077  cat_sqlite_lock sqlite_lock(getObjForLock());
4078 
4079  sqliteConnector_.query_with_text_params(
4080  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
4081  "?, 1)",
4082  std::vector<std::string>{
4083  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
4084  sqliteConnector_.query_with_text_param(
4085  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
4086  dictId = sqliteConnector_.getData<int>(0, 0);
4087  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
4088  sqliteConnector_.query_with_text_param(
4089  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
4090  folderPath = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
4091  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
4092  }
4093  DictDescriptor dd(currentDB_.dbId,
4094  dictId,
4095  dictName,
4097  false,
4098  1,
4099  folderPath,
4100  use_temp_dictionary);
4101  dds.push_back(dd);
4102  if (!cd.columnType.is_array()) {
4104  }
4105  cd.columnType.set_comp_param(dictId);
4106  set_dict_key(cd);
4107 }
4108 
4109 void Catalog::createShardedTable(
4110  TableDescriptor& td,
4111  const list<ColumnDescriptor>& cols,
4112  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
4113  /* create logical table */
4114  TableDescriptor* tdl = &td;
4115  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
4116  int32_t logical_tb_id = tdl->tableId;
4117  std::string logical_table_name = tdl->tableName;
4118 
4119  /* create physical tables and link them to the logical table */
4120  std::vector<int32_t> physicalTables;
4121  for (int32_t i = 1; i <= td.nShards; i++) {
4122  TableDescriptor* tdp = &td;
4123  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
4124  tdp->shard = i - 1;
4125  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
4126  int32_t physical_tb_id = tdp->tableId;
4127 
4128  /* add physical table to the vector of physical tables */
4129  physicalTables.push_back(physical_tb_id);
4130  }
4131 
4132  if (!physicalTables.empty()) {
4133  cat_write_lock write_lock(this);
4134  /* add logical to physical tables correspondence to the map */
4135  const auto it_ok =
4136  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
4137  CHECK(it_ok.second);
4138  /* update sqlite mapd_logical_to_physical in sqlite database */
4139  if (!table_is_temporary(&td)) {
4140  updateLogicalToPhysicalTableMap(logical_tb_id);
4141  }
4142  }
4143 }
4144 
4145 void Catalog::truncateTable(const TableDescriptor* td) {
4146  // truncate all corresponding physical tables
4147  const auto physical_tables = getPhysicalTablesDescriptors(td);
4148  for (const auto table : physical_tables) {
4149  doTruncateTable(table);
4150  }
4151 }
4152 
4153 void Catalog::doTruncateTable(const TableDescriptor* td) {
4154  // must destroy fragmenter before deleteChunks is called.
4155  removeFragmenterForTable(td->tableId);
4156 
4157  const int tableId = td->tableId;
4158  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4159  // assuming deleteChunksWithPrefix is atomic
4160  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4161  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4162 
4163  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4164 
4165  cat_write_lock write_lock(this);
4166  std::unique_ptr<StringDictionaryClient> client;
4167  if (SysCatalog::instance().isAggregator()) {
4168  CHECK(!string_dict_hosts_.empty());
4169  DictRef dict_ref(currentDB_.dbId, -1);
4170  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
4171  }
4172  // clean up any dictionaries
4173  // delete all column descriptors for the table
4174  for (const auto& columnDescriptor : columnDescriptorMapById_) {
4175  auto cd = columnDescriptor.second;
4176  if (cd->tableId != td->tableId) {
4177  continue;
4178  }
4179  const int dict_id = cd->columnType.get_comp_param();
4180  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
4181  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
4182  const DictRef dict_ref(currentDB_.dbId, dict_id);
4183  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
4184  CHECK(dictIt != dictDescriptorMapByRef_.end());
4185  const auto& dd = dictIt->second;
4186  CHECK_GE(dd->refcount, 1);
4187  // if this is the only table using this dict reset the dict
4188  if (dd->refcount == 1) {
4189  // close the dictionary
4190  dd->stringDict.reset();
4191  File_Namespace::renameForDelete(dd->dictFolderPath);
4192  if (client) {
4193  client->drop(dd->dictRef);
4194  }
4195  if (!dd->dictIsTemp) {
4196  boost::filesystem::create_directory(dd->dictFolderPath);
4197  }
4198  }
4199 
4200  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
4201  dd->dictName,
4202  dd->dictNBits,
4203  dd->dictIsShared,
4204  dd->refcount,
4205  dd->dictFolderPath,
4206  dd->dictIsTemp);
4207  dictDescriptorMapByRef_.erase(dictIt);
4208  // now create new Dict -- need to figure out what to do here for temp tables
4209  if (client) {
4210  client->create(new_dd->dictRef, new_dd->dictIsTemp);
4211  }
4212  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
4213  getMetadataForDict(new_dd->dictRef.dictId);
4214  }
4215  }
4216 }
4217 
4218 // NOTE(Misiu): Only used by --multi-instance clusters.
4219 void Catalog::refreshDictionaryCachesForTableUnlocked(const TableDescriptor& td) {
4220  for (auto col_id = 0; col_id < td.nColumns; ++col_id) {
4221  if (auto it = columnDescriptorMapById_.find({td.tableId, col_id});
4222  it != columnDescriptorMapById_.end()) {
4223  auto cd = it->second;
4224  auto dict_id = cd->columnType.get_comp_param();
4225  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
4226  DictRef dict_ref(currentDB_.dbId, dict_id);
4227  if (auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4228  dict_it != dictDescriptorMapByRef_.end()) {
4229  // getMetadataForDict() will only reload if the stringDict is null.
4230  dict_it->second->stringDict = nullptr;
4231  }
4232  getMetadataForDict(dict_id, true);
4233  }
4234  }
4235  }
4236 }
4237 
4238 // NOTE(sy): Only used by --multi-instance clusters.
4239 void Catalog::invalidateCachesForTable(const int table_id) {
4240  // When called, exactly one thread has a LockMgr data or insert lock for the table.
4241  cat_read_lock read_lock(this);
4242  ChunkKey const table_key{getDatabaseId(), table_id};
4243  auto td = getMutableMetadataForTableUnlocked(table_id);
4244  CHECK(td);
4245  getDataMgr().deleteChunksWithPrefix(table_key, MemoryLevel::GPU_LEVEL);
4246  getDataMgr().deleteChunksWithPrefix(table_key, MemoryLevel::CPU_LEVEL);
4247  Executor::clearExternalCaches(false, td, getDatabaseId());
4248 
4249  refreshDictionaryCachesForTableUnlocked(*td);
4250 
4251  // TODO(sy): doTruncateTable() says "destroy fragmenter before deleteChunks is called"
4252  // removeFragmenterForTable(table_key[CHUNK_KEY_TABLE_IDX]);
4253  if (td->fragmenter != nullptr) {
4254  auto tableDescIt = tableDescriptorMapById_.find(table_id);
4255  CHECK(tableDescIt != tableDescriptorMapById_.end());
4256  tableDescIt->second->fragmenter = nullptr;
4257  CHECK(td->fragmenter == nullptr);
4258  }
4259  getDataMgr().getGlobalFileMgr()->closeFileMgr(table_key[CHUNK_KEY_DB_IDX],
4260  table_key[CHUNK_KEY_TABLE_IDX]);
4261  if (dynamic_cast<foreign_storage::ForeignTable*>(td)) {
4262  dataMgr_->getPersistentStorageMgr()->getForeignStorageMgr()->refreshTable(table_key,
4263  true);
4264  } else {
4265  dataMgr_->removeMutableTableDiskCacheData(currentDB_.dbId, table_id);
4266  }
4267  instantiateFragmenter(td);
4268 }
4269 
4270 void Catalog::removeFragmenterForTable(const int table_id) const {
4271  cat_write_lock write_lock(this);
4272  auto td = getMetadataForTable(table_id, false);
4273  if (td->fragmenter != nullptr) {
4274  auto tableDescIt = tableDescriptorMapById_.find(table_id);
4275  CHECK(tableDescIt != tableDescriptorMapById_.end());
4276  tableDescIt->second->fragmenter = nullptr;
4277  CHECK(td->fragmenter == nullptr);
4278  }
4279 }
4280 
4281 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
4282 void Catalog::removeChunks(const int table_id) const {
4283  removeFragmenterForTable(table_id);
4284 
4285  // remove the chunks from in memory structures
4286  ChunkKey chunkKey = {currentDB_.dbId, table_id};
4287 
4288  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
4289  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
4290 }
4291 
4292 void Catalog::dropTable(const TableDescriptor* td) {
4293  SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
4295  std::vector<const TableDescriptor*> tables_to_drop;
4296  {
4297  cat_read_lock read_lock(this);
4298  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
4299  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4300  // remove all corresponding physical tables if this is a logical table
4301  const auto physicalTables = physicalTableIt->second;
4302  CHECK(!physicalTables.empty());
4303  for (size_t i = 0; i < physicalTables.size(); i++) {
4304  int32_t physical_tb_id = physicalTables[i];
4305  const TableDescriptor* phys_td =
4306  getMutableMetadataForTableUnlocked(physical_tb_id);
4307  CHECK(phys_td);
4308  tables_to_drop.emplace_back(phys_td);
4309  }
4310  }
4311  tables_to_drop.emplace_back(td);
4312  }
4313 
4314  for (auto table : tables_to_drop) {
4315  eraseTablePhysicalData(table);
4316  }
4317  deleteTableCatalogMetadata(td, tables_to_drop);
4318 }
4319 
4320 void Catalog::deleteTableCatalogMetadata(
4321  const TableDescriptor* logical_table,
4322  const std::vector<const TableDescriptor*>& physical_tables) {
4323  cat_write_lock write_lock(this);
4324  cat_sqlite_lock sqlite_lock(getObjForLock());
4325  sqliteConnector_.query("BEGIN TRANSACTION");
4326  try {
4327  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
4328  sqliteConnector_.query_with_text_param(
4329  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
4330  std::to_string(logical_table->tableId));
4331  logicalToPhysicalTableMapById_.erase(logical_table->tableId);
4332  for (auto table : physical_tables) {
4333  eraseTableMetadata(table);
4334  }
4335  } catch (std::exception& e) {
4336  sqliteConnector_.query("ROLLBACK TRANSACTION");
4337  throw;
4338  }
4339  sqliteConnector_.query("END TRANSACTION");
4340 }
4341 
4342 void Catalog::eraseTableMetadata(const TableDescriptor* td) {
4343  executeDropTableSqliteQueries(td);
4345  dropTableFromJsonUnlocked(td->tableName);
4346  }
4347  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4348  {
4349  INJECT_TIMER(removeTableFromMap_);
4350  removeTableFromMap(td->tableName, td->tableId);
4351  }
4352 }
4353 
4354 void Catalog::executeDropTableSqliteQueries(const TableDescriptor* td) {
4355  const int tableId = td->tableId;
4356  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
4357  std::to_string(tableId));
4358  sqliteConnector_.query_with_text_params(
4359  "select comp_param from mapd_columns where compression = ? and tableid = ?",
4360  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
4361  int numRows = sqliteConnector_.getNumRows();
4362  std::vector<int> dict_id_list;
4363  for (int r = 0; r < numRows; ++r) {
4364  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
4365  }
4366  for (auto dict_id : dict_id_list) {
4367  sqliteConnector_.query_with_text_params(
4368  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
4369  std::vector<std::string>{std::to_string(dict_id)});
4370  }
4371  sqliteConnector_.query_with_text_params(
4372  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
4373  "mapd_columns where compression = ? "
4374  "and tableid = ?) and refcount = 0",
4375  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
4376  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
4377  std::to_string(tableId));
4378  if (td->isView) {
4379  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
4380  std::to_string(tableId));
4381  }
4383  sqliteConnector_.query_with_text_param(
4384  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
4385  }
4386 }
4387 
4388 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
4389  cat_write_lock write_lock(this);
4390  cat_sqlite_lock sqlite_lock(getObjForLock());
4391 
4392  sqliteConnector_.query("BEGIN TRANSACTION");
4393  try {
4394  sqliteConnector_.query_with_text_params(
4395  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4396  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
4397  } catch (std::exception& e) {
4398  sqliteConnector_.query("ROLLBACK TRANSACTION");
4399  throw;
4400  }
4401  sqliteConnector_.query("END TRANSACTION");
4402  TableDescriptorMap::iterator tableDescIt =
4403  tableDescriptorMap_.find(to_upper(td->tableName));
4404  CHECK(tableDescIt != tableDescriptorMap_.end());
4405  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4406  // Get table descriptor to change it
4407  TableDescriptor* changeTd = tableDescIt->second;
4408  changeTd->tableName = newTableName;
4409  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
4410  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
4411  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4412 }
4413 
4414 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
4415  {
4416  cat_write_lock write_lock(this);
4417  cat_sqlite_lock sqlite_lock(getObjForLock());
4418  // rename all corresponding physical tables if this is a logical table
4419  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
4420  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4421  const auto physicalTables = physicalTableIt->second;
4422  CHECK(!physicalTables.empty());
4423  for (size_t i = 0; i < physicalTables.size(); i++) {
4424  int32_t physical_tb_id = physicalTables[i];
4425  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
4426  CHECK(phys_td);
4427  std::string newPhysTableName =
4428  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
4429  renamePhysicalTable(phys_td, newPhysTableName);
4430  }
4431  }
4432  renamePhysicalTable(td, newTableName);
4433  }
4434  {
4435  DBObject object(newTableName, TableDBObjectType);
4436  // update table name in direct and effective priv map
4437  DBObjectKey key;
4438  key.dbId = currentDB_.dbId;
4439  key.objectId = td->tableId;
4440  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
4441  object.setObjectKey(key);
4442  auto objdescs = SysCatalog::instance().getMetadataForObject(
4443  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
4444  for (auto obj : objdescs) {
4445  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4446  if (grnt) {
4447  grnt->renameDbObject(object);
4448  }
4449  }
4450  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4451  }
4452 }
4453 
4454 void Catalog::renamePhysicalTables(
4455  std::vector<std::pair<std::string, std::string>>& names,
4456  std::vector<int>& tableIds) {
4457  cat_write_lock write_lock(this);
4458  cat_sqlite_lock sqlite_lock(getObjForLock());
4459 
4460  // execute the SQL query
4461  for (size_t i = 0; i < names.size(); i++) {
4462  int tableId = tableIds[i];
4463  const std::string& newTableName = names[i].second;
4464  sqliteConnector_.query_with_text_params(
4465  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4466  std::vector<std::string>{newTableName, std::to_string(tableId)});
4467  }
4468 
4469  // reset the table descriptors, give Calcite a kick
4470  for (size_t i = 0; i < names.size(); i++) {
4471  const auto& [curTableName, newTableName] = names[i];
4472 
4473  TableDescriptorMap::iterator tableDescIt =
4474  tableDescriptorMap_.find(to_upper(curTableName));
4475  CHECK(tableDescIt != tableDescriptorMap_.end());
4476  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4477 
4478  // Get table descriptor to change it
4479  TableDescriptor* changeTd = tableDescIt->second;
4480  changeTd->tableName = newTableName;
4481  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
4482  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
4483  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4484  }
4485 }
4486 
4487 // Collect an 'overlay' mapping of the tableNames->tableId
4488 // to account for possible chained renames
4489 // (for swap: a->b, b->c, c->d, d->a)
4490 const TableDescriptor* Catalog::getCachedTableDescriptor(
4491  const std::map<std::string, int>& cached_table_map,
4492  const std::string& cur_table_name) {
4493  if (auto it = cached_table_map.find(cur_table_name); it != cached_table_map.end()) {
4494  auto table_id = it->second;
4495  return (table_id == -1) ? NULL : getMetadataForTable(table_id);
4496  }
4497  return getMetadataForTable(cur_table_name);
4498 }
4499 
4500 namespace {
4501 void replace_cached_table_name(std::map<std::string, int>& cachedTableMap,
4502  const std::string& curTableName,
4503  const std::string& newTableName,
4504  int tableId) {
4505  // mark old/cur name as deleted
4506  cachedTableMap[curTableName] = -1;
4507 
4508  // insert the 'new' name
4509  cachedTableMap[newTableName] = tableId;
4510 }
4511 } // namespace
4512 
4513 void Catalog::renameTables(
4514  const std::vector<std::pair<std::string, std::string>>& names) {
4515  // tableId of all tables being renamed
4516  // ... in matching order to 'names'
4517  std::vector<int> tableIds;
4518 
4519  // (sorted & unique) list of tables ids for locking
4520  // (with names index of src in case of error)
4521  // <tableId, strIndex>
4522  // std::map is by definition/implementation sorted
4523  // std::map current usage below tests to avoid over-write
4524  std::map<int, size_t> uniqueOrderedTableIds;
4525 
4526  // mapping of modified tables names -> tableId
4527  std::map<std::string, int> cachedTableMap;
4528 
4529  // -------- Setup --------
4530 
4531  // gather tableIds pre-execute; build maps
4532  for (size_t i = 0; i < names.size(); i++) {
4533  const auto& [curTableName, newTableName] = names[i];
4534 
4535  // make sure the table being renamed exists,
4536  // or will exist when executed in 'name' order
4537  auto td = getCachedTableDescriptor(cachedTableMap, curTableName);
4538  CHECK(td);
4539 
4540  tableIds.push_back(td->tableId);
4541  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
4542  // don't overwrite as it should map to the first names index 'i'
4543  uniqueOrderedTableIds[td->tableId] = i;
4544  }
4545  replace_cached_table_name(cachedTableMap, curTableName, newTableName, td->tableId);
4546  }
4547 
4548  CHECK_EQ(tableIds.size(), names.size());
4549 
4550  // The outer Stmt created a write lock before calling the catalog rename table
4551  // -> TODO: might want to sort out which really should set the lock :
4552  // the comment in the outer scope indicates it should be in here
4553  // but it's not clear if the access done there *requires* it out there
4554  //
4555  // Lock tables pre-execute (may/will be in different order than rename occurs)
4556  // const auto execute_write_lock = heavyai::unique_lock<heavyai::shared_mutex>(
4557  // *legacylockmgr::LockMgr<heavyai::shared_mutex, bool>::getMutex(
4558  // legacylockmgr::ExecutorOuterLock, true));
4559 
4560  // acquire the locks for all tables being renamed
4562  for (auto& idPair : uniqueOrderedTableIds) {
4563  const std::string& tableName = names[idPair.second].first;
4564  tableLocks.emplace_back(
4567  *this, tableName, false)));
4568  }
4569 
4570  // -------- Rename --------
4571 
4572  {
4573  cat_write_lock write_lock(this);
4574  // collect all (tables + physical tables) into a single list
4575  std::vector<std::pair<std::string, std::string>> allNames;
4576  std::vector<int> allTableIds;
4577 
4578  for (size_t i = 0; i < names.size(); i++) {
4579  int tableId = tableIds[i];
4580  const auto& [curTableName, newTableName] = names[i];
4581 
4582  // rename all corresponding physical tables if this is a logical table
4583  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
4584  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4585  const auto physicalTables = physicalTableIt->second;
4586  CHECK(!physicalTables.empty());
4587  for (size_t k = 0; k < physicalTables.size(); k++) {
4588  int32_t physical_tb_id = physicalTables[k];
4589  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
4590  CHECK(phys_td);
4591  std::string newPhysTableName = generatePhysicalTableName(newTableName, (k + 1));
4592  allNames.emplace_back(phys_td->tableName, newPhysTableName);
4593  allTableIds.push_back(phys_td->tableId);
4594  }
4595  }
4596  allNames.emplace_back(curTableName, newTableName);
4597  allTableIds.push_back(tableId);
4598  }
4599 
4600  // rename all tables in one transaction
4601  execInTransaction(&Catalog::renamePhysicalTables, allNames, allTableIds);
4602  }
4603 
4604  // now update the SysCatalog
4605  for (size_t i = 0; i < names.size(); i++) {
4606  int tableId = tableIds[i];
4607  const std::string& newTableName = names[i].second;
4608  {
4609  // update table name in direct and effective priv map
4610  DBObjectKey key;
4611  key.dbId = currentDB_.dbId;
4612  key.objectId = tableId;
4613  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
4614 
4615  DBObject object(newTableName, TableDBObjectType);
4616  object.setObjectKey(key);
4617 
4618  auto objdescs = SysCatalog::instance().getMetadataForObject(
4619  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
4620  for (auto obj : objdescs) {
4621  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4622  if (grnt) {
4623  grnt->renameDbObject(object);
4624  }
4625  }
4626  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4627  }
4628  }
4629 }
4630 
4631 void Catalog::renameColumn(const TableDescriptor* td,
4632  const ColumnDescriptor* cd,
4633  const string& newColumnName) {
4634  cat_write_lock write_lock(this);
4635  cat_sqlite_lock sqlite_lock(getObjForLock());
4636  sqliteConnector_.query("BEGIN TRANSACTION");
4637  try {
4638  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4639  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4640  CHECK(cdx);
4641  std::string new_column_name = cdx->columnName;
4642  new_column_name.replace(0, cd->columnName.size(), newColumnName);
4643  sqliteConnector_.query_with_text_params(
4644  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
4645  std::vector<std::string>{new_column_name,
4646  std::to_string(td->tableId),
4647  std::to_string(cdx->columnId)});
4648  }
4649  } catch (std::exception& e) {
4650  sqliteConnector_.query("ROLLBACK TRANSACTION");
4651  throw;
4652  }
4653  sqliteConnector_.query("END TRANSACTION");
4654  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4655  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4656  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4657  CHECK(cdx);
4658  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
4659  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
4660  CHECK(columnDescIt != columnDescriptorMap_.end());
4661  ColumnDescriptor* changeCd = columnDescIt->second;
4662  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
4663  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
4664  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
4665  changeCd;
4666  }
4667  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4668 }
4669 
4670 int32_t Catalog::createDashboard(DashboardDescriptor& vd) {
4671  cat_write_lock write_lock(this);
4672  cat_sqlite_lock sqlite_lock(getObjForLock());
4673  sqliteConnector_.query("BEGIN TRANSACTION");
4674  try {
4675  // TODO(andrew): this should be an upsert
4676  sqliteConnector_.query_with_text_params(
4677  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
4678  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4679  if (sqliteConnector_.getNumRows() > 0) {
4680  sqliteConnector_.query_with_text_params(
4681  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
4682  "update_time = "
4683  "datetime('now') where name = ? "
4684  "and userid = ?",
4685  std::vector<std::string>{vd.dashboardState,
4686  vd.imageHash,
4687  vd.dashboardMetadata,
4688  vd.dashboardName,
4689  std::to_string(vd.userId)});
4690  } else {
4691  sqliteConnector_.query_with_text_params(
4692  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
4693  "update_time, "
4694  "userid) "
4695  "VALUES "
4696  "(?,?,?,?, "
4697  "datetime('now'), ?)",
4698  std::vector<std::string>{vd.dashboardName,
4699  vd.dashboardState,
4700  vd.imageHash,
4701  vd.dashboardMetadata,
4702  std::to_string(vd.userId)});
4703  }
4704  } catch (std::exception& e) {
4705  sqliteConnector_.query("ROLLBACK TRANSACTION");
4706  throw;
4707  }
4708  sqliteConnector_.query("END TRANSACTION");
4709 
4710  // now get the auto generated dashboardId
4711  try {
4712  sqliteConnector_.query_with_text_params(
4713  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
4714  "WHERE name = ? and userid = ?",
4715  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4716  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
4717  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4718  } catch (std::exception& e) {
4719  throw;
4720  }
4722  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4723  addFrontendViewToMap(vd);
4724  sqlite_lock.unlock();
4725  write_lock.unlock();
4726  if (!isInfoSchemaDb()) {
4727  // NOTE(wamsi): Transactionally unsafe
4728  createOrUpdateDashboardSystemRole(
4730  }
4731  return vd.dashboardId;
4732 }
4733 
4734 void Catalog::replaceDashboard(DashboardDescriptor& vd) {
4735  cat_write_lock write_lock(this);
4736  cat_sqlite_lock sqlite_lock(getObjForLock());
4737 
4738  CHECK(sqliteConnector_.getSqlitePtr());
4739  sqliteConnector_.query("BEGIN TRANSACTION");
4740  try {
4741  sqliteConnector_.query_with_text_params(
4742  "SELECT id FROM mapd_dashboards WHERE id = ?",
4743  std::vector<std::string>{std::to_string(vd.dashboardId)});
4744  if (sqliteConnector_.getNumRows() > 0) {
4745  sqliteConnector_.query_with_text_params(
4746  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
4747  "?, userid = ?, update_time = datetime('now') where id = ? ",
4748  std::vector<std::string>{vd.dashboardName,
4749  vd.dashboardState,
4750  vd.imageHash,
4751  vd.dashboardMetadata,
4752  std::to_string(vd.userId),
4754  } else {
4755  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4756  << " does not exist in db";
4757  throw runtime_error("Error replacing dashboard id " +
4758  std::to_string(vd.dashboardId) + " does not exist in db");
4759  }
4760  } catch (std::exception& e) {
4761  sqliteConnector_.query("ROLLBACK TRANSACTION");
4762  throw;
4763  }
4764  sqliteConnector_.query("END TRANSACTION");
4765 
4766  bool found{false};
4767  for (auto descp : dashboardDescriptorMap_) {
4768  auto dash = descp.second.get();
4769  if (dash->dashboardId == vd.dashboardId) {
4770  found = true;
4771  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
4772  dash->dashboardName);
4773  if (viewDescIt ==
4774  dashboardDescriptorMap_.end()) { // check to make sure view exists
4775  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
4776  << " dashboard " << dash->dashboardName << " does not exist in map";
4777  throw runtime_error("No metadata for dashboard for user " +
4778  std::to_string(dash->userId) + " dashboard " +
4779  dash->dashboardName + " does not exist in map");
4780  }
4781  dashboardDescriptorMap_.erase(viewDescIt);
4782  break;
4783  }
4784  }
4785  if (!found) {
4786  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4787  << " does not exist in map";
4788  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
4789  " does not exist in map");
4790  }
4791 
4792  // now reload the object
4793  sqliteConnector_.query_with_text_params(
4794  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4795  "mapd_dashboards "
4796  "WHERE id = ?",
4797  std::vector<std::string>{std::to_string(vd.dashboardId)});
4798  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
4800  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4801  addFrontendViewToMapNoLock(vd);
4802  sqlite_lock.unlock();
4803  write_lock.unlock();
4804  if (!isInfoSchemaDb()) {
4805  // NOTE(wamsi): Transactionally unsafe
4806  createOrUpdateDashboardSystemRole(
4808  }
4809 }
4810 
4811 std::string Catalog::calculateSHA1(const std::string& data) {
4812  boost::uuids::detail::sha1 sha1;
4813  unsigned int digest[5];
4814  sha1.process_bytes(data.c_str(), data.length());
4815  sha1.get_digest(digest);
4816  std::stringstream ss;
4817  for (size_t i = 0; i < 5; i++) {
4818  ss << std::hex << digest[i];
4819  }
4820  return ss.str();
4821 }
4822 
4823 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4824  cat_write_lock write_lock(this);
4825  cat_sqlite_lock sqlite_lock(getObjForLock());
4826  sqliteConnector_.query("BEGIN TRANSACTION");
4827  try {
4828  ld.link = calculateSHA1(ld.viewState + ld.viewMetadata + std::to_string(ld.userId))
4829  .substr(0, 8);
4830  sqliteConnector_.query_with_text_params(
4831  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4832  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4833  if (sqliteConnector_.getNumRows() > 0) {
4834  sqliteConnector_.query_with_text_params(
4835  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4836  "link = ?",
4837  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4838  } else {
4839  sqliteConnector_.query_with_text_params(
4840  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4841  "update_time) VALUES (?,?,?,?, datetime('now'))",
4842  std::vector<std::string>{
4843  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4844  }
4845  // now get the auto generated dashid
4846  sqliteConnector_.query_with_text_param(
4847  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4848  "WHERE link = ?",
4849  ld.link);
4850  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4851  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4852  } catch (std::exception& e) {
4853  sqliteConnector_.query("ROLLBACK TRANSACTION");
4854  throw;
4855  }
4856  sqliteConnector_.query("END TRANSACTION");
4857  addLinkToMap(ld);
4858  return ld.link;
4859 }
4860 
4861 const ColumnDescriptor* Catalog::getShardColumnMetadataForTable(
4862  const TableDescriptor* td) const {
4863  cat_read_lock read_lock(this);
4864 
4865  const auto column_descriptors =
4866  getAllColumnMetadataForTable(td->tableId, false, true, true);
4867 
4868  const ColumnDescriptor* shard_cd{nullptr};
4869  int i = 1;
4870  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4871  ++cd_itr, ++i) {
4872  if (i == td->shardedColumnId) {
4873  shard_cd = *cd_itr;
4874  }
4875  }
4876  return shard_cd;
4877 }
4878 
4879 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4880  const TableDescriptor* logical_table_desc,
4881  bool populate_fragmenter) const {
4882  cat_read_lock read_lock(this);
4883  const auto physicalTableIt =
4884  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4885  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4886  return {logical_table_desc};
4887  }
4888  const auto physicalTablesIds = physicalTableIt->second;
4889  CHECK(!physicalTablesIds.empty());
4890  read_lock.unlock();
4891  std::vector<const TableDescriptor*> physicalTables;
4892  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4893  physicalTables.push_back(
4894  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4895  }
4896  return physicalTables;
4897 }
4898 
4899 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4900  const {
4901  cat_read_lock read_lock(this);
4902  std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4903  table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4904  for (const auto [table_id, td] : tableDescriptorMapById_) {
4905  // Only include ids for physical persisted tables
4906  if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4907  logicalToPhysicalTableMapById_.find(table_id) ==
4908  logicalToPhysicalTableMapById_.end()) {
4909  table_and_shard_ids.emplace_back(table_id, td->shard);
4910  }
4911  }
4912  return table_and_shard_ids;
4913 }
4914 
4915 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4916  cat_read_lock read_lock(this);
4917 
4918  std::map<int, const ColumnDescriptor*> mapping;
4919 
4920  const auto tables = getAllTableMetadata();
4921  for (const auto td : tables) {
4922  if (td->shard >= 0) {
4923  // skip shards, they're not standalone tables
4924  continue;
4925  }
4926 
4927  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4928  const auto& ti = cd->columnType;
4929  if (ti.is_string()) {
4930  if (ti.get_compression() == kENCODING_DICT) {
4931  // if foreign reference, get referenced tab.col
4932  const auto dict_id = ti.get_comp_param();
4933 
4934  // ignore temp (negative) dictionaries
4935  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4936  mapping[dict_id] = cd;
4937  }
4938  }
4939  }
4940  }
4941  }
4942 
4943  return mapping;
4944 }
4945 
4946 bool Catalog::filterTableByTypeAndUser(const TableDescriptor* td,
4947  const UserMetadata& user_metadata,
4948  const GetTablesType get_tables_type) const {
4949  if (td->shard >= 0) {
4950  // skip shards, they're not standalone tables
4951  return false;
4952  }
4953  switch (get_tables_type) {
4954  case GET_PHYSICAL_TABLES: {
4955  if (td->isView) {
4956  return false;
4957  }
4958  break;
4959  }
4960  case GET_VIEWS: {
4961  if (!td->isView) {
4962  return false;
4963  }
4964  break;
4965  }
4966  default:
4967  break;
4968  }
4970  dbObject.loadKey(*this);
4971  std::vector<DBObject> privObjects = {dbObject};
4972  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4973  // skip table, as there are no privileges to access it
4974  return false;
4975  }
4976  return true;
4977 }
4978 
4979 std::vector<std::string> Catalog::getTableNamesForUser(
4980  const UserMetadata& user_metadata,
4981  const GetTablesType get_tables_type) const {
4982  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4983  cat_read_lock read_lock(this);
4984  std::vector<std::string> table_names;
4985  const auto tables = getAllTableMetadata();
4986  for (const auto td : tables) {
4987  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4988  table_names.push_back(td->tableName);
4989  }
4990  }
4991  return table_names;
4992 }
4993 
4994 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4995  const UserMetadata& user_metadata,
4996  const GetTablesType get_tables_type,
4997  const std::string& filter_table_name) const {
4998  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4999  cat_read_lock read_lock(this);
5000 
5001  std::vector<TableMetadata> tables_metadata;
5002  const auto tables = getAllTableMetadata();
5003  for (const auto td : tables) {
5004  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
5005  if (!filter_table_name.empty()) {
5006  if (td->tableName != filter_table_name) {
5007  continue;
5008  }
5009  }
5010  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
5011  // descriptor outside catalog lock
5012  tables_metadata.emplace_back(table_metadata);
5013  }
5014  }
5015  return tables_metadata;
5016 }
5017 
5018 int Catalog::getLogicalTableId(const int physicalTableId) const {
5019  cat_read_lock read_lock(this);
5020  for (const auto& l : logicalToPhysicalTableMapById_) {
5021  if (l.second.end() != std::find_if(l.second.begin(),
5022  l.second.end(),
5023  [&](decltype(*l.second.begin()) tid) -> bool {
5024  return physicalTableId == tid;
5025  })) {
5026  return l.first;
5027  }
5028  }
5029  return physicalTableId;
5030 }
5031 
5032 void Catalog::checkpoint(const int logicalTableId) const {
5033  const auto td = getMetadataForTable(logicalTableId);
5034  const auto shards = getPhysicalTablesDescriptors(td);
5035  for (const auto shard : shards) {
5036  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
5037  }
5038 }
5039 
5040 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
5041  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
5042  try {
5043  checkpoint(logical_table_id);
5044  } catch (...) {
5045  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
5046  throw;
5047  }
5048 }
5049 
5050 void Catalog::resetTableEpochFloor(const int logicalTableId) const {
5051  cat_read_lock read_lock(this);
5052  const auto td = getMetadataForTable(logicalTableId, false);
5053  const auto shards = getPhysicalTablesDescriptors(td, false);
5054  for (const auto shard : shards) {
5055  getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
5056  }
5057 }
5058 
5059 void Catalog::eraseDbMetadata() {
5060  const auto tables = getAllTableMetadata();
5061  for (const auto table : tables) {
5062  eraseTableMetadata(table);
5063  }
5064  // Physically erase database metadata
5065  boost::filesystem::remove(basePath_ + "/" + shared::kCatalogDirectoryName + "/" +
5066  currentDB_.dbName);
5067  calciteMgr_->updateMetadata(currentDB_.dbName, "");
5068 }
5069 
5070 void Catalog::eraseDbPhysicalData() {
5071  const auto tables = getAllTableMetadata();
5072  for (const auto table : tables) {
5073  eraseTablePhysicalData(table);
5074  }
5075 }
5076 
5077 void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
5078  const int tableId = td->tableId;
5079  // must destroy fragmenter before deleteChunks is called.
5080  removeFragmenterForTable(tableId);
5081 
5082  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
5083  {
5084  INJECT_TIMER(deleteChunksWithPrefix);
5085  // assuming deleteChunksWithPrefix is atomic
5086  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
5087  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
5088  }
5089  if (!td->isView) {
5090  INJECT_TIMER(Remove_Table);
5091  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
5092  }
5093 }
5094 
5095 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
5096  const size_t shardNumber) {
5097  std::string physicalTableName =
5098  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
5099  return (physicalTableName);
5100 }
5101 
5102 void Catalog::buildForeignServerMapUnlocked() {
5104  sqliteConnector_.query(
5105  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
5106  "omnisci_foreign_servers");
5107  auto num_rows = sqliteConnector_.getNumRows();
5108 
5109  for (size_t row = 0; row < num_rows; row++) {
5110  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
5111  sqliteConnector_.getData<int>(row, 0),
5112  sqliteConnector_.getData<std::string>(row, 1),
5113  sqliteConnector_.getData<std::string>(row, 2),
5114  sqliteConnector_.getData<std::string>(row, 3),
5115  sqliteConnector_.getData<std::int32_t>(row, 4),
5116  sqliteConnector_.getData<std::int32_t>(row, 5));
5117  foreignServerMap_[foreign_server->name] = foreign_server;
5118  foreignServerMapById_[foreign_server->id] = foreign_server;
5119  }
5120 }
5121 
5122 void Catalog::updateForeignTablesInMapUnlocked() {
5124  sqliteConnector_.query(
5125  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
5126  "omnisci_foreign_tables");
5127  auto num_rows = sqliteConnector_.getNumRows();
5128  for (size_t r = 0; r < num_rows; r++) {
5129  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
5130  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
5131  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
5132  const auto last_refresh_time = sqliteConnector_.getData<int64_t>(r, 3);
5133  const auto next_refresh_time = sqliteConnector_.getData<int64_t>(r, 4);
5134 
5135  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
5136  auto foreign_table =
5137  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
5138  CHECK(foreign_table);
5139  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
5140  CHECK(foreign_table->foreign_server);
5141  foreign_table->populateOptionsMap(options);
5142  foreign_table->last_refresh_time = last_refresh_time;
5143  foreign_table->next_refresh_time = next_refresh_time;
5144  if (foreign_table->is_system_table) {
5145  foreign_table->is_in_memory_system_table =
5147  foreign_table->foreign_server->data_wrapper_type);
5148  }
5149  }
5150 }
5151 
5152 void Catalog::reloadForeignTableUnlocked(foreign_storage::ForeignTable& foreign_table) {
5154  CHECK_NE(foreign_table.tableId, 0)
5155  << "reloadForeignTable expects a table with valid id";
5156  sqliteConnector_.query(
5157  "SELECT server_id, options, last_refresh_time, next_refresh_time from "
5158  "omnisci_foreign_tables WHERE table_id == " +
5159  std::to_string(foreign_table.tableId));
5160  auto num_rows = sqliteConnector_.getNumRows();
5161  CHECK_EQ(num_rows, 1U) << "Expected single entry in omnisci_foreign_tables for table'"
5162  << foreign_table.tableName << "', instead got " << num_rows;
5163  const auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
5164  const auto& options = sqliteConnector_.getData<std::string>(0, 1);
5165  const auto last_refresh_time = sqliteConnector_.getData<int64_t>(0, 2);
5166  const auto next_refresh_time = sqliteConnector_.getData<int64_t>(0, 3);
5167 
5168  foreign_table.foreign_server = foreignServerMapById_[server_id].get();
5169  CHECK(foreign_table.foreign_server);
5170  foreign_table.populateOptionsMap(options);
5171  foreign_table.last_refresh_time = last_refresh_time;
5172  foreign_table.next_refresh_time = next_refresh_time;
5173  if (foreign_table.is_system_table) {
5174  foreign_table.is_in_memory_system_table =
5176  foreign_table.foreign_server->data_wrapper_type);
5177  }
5178 }
5179 
5180 void Catalog::reloadDictionariesFromDiskUnlocked() {
5181  std::string dictQuery(
5182  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
5183  sqliteConnector_.query(dictQuery);
5184  auto numRows = sqliteConnector_.getNumRows();
5185  for (size_t r = 0; r < numRows; ++r) {
5186  auto dictId = sqliteConnector_.getData<int>(r, 0);
5187  auto dictName = sqliteConnector_.getData<string>(r, 1);
5188  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
5189  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
5190  auto refcount = sqliteConnector_.getData<int>(r, 4);
5191  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
5192  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
5193  DictRef dict_ref(currentDB_.dbId, dictId);
5194  DictDescriptor dd(dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
5195  if (auto it = dictDescriptorMapByRef_.find(dict_ref);
5196  it == dictDescriptorMapByRef_.end()) {
5197  dictDescriptorMapByRef_[dict_ref] = std::make_unique<DictDescriptor>(dd);
5198  } else {
5199  *it->second = dd;
5200  }
5201  }
5202 }
5203 
5204 std::list<ColumnDescriptor*> Catalog::sqliteGetColumnsForTableUnlocked(int32_t table_id) {
5205  std::list<ColumnDescriptor*> cds;
5206  // TODO(Misiu): Change ColumnDescriptorMap_ to use smartpointers. Right now we use
5207  // temporary smartpointers that release their memory once initialized.
5208  std::list<std::unique_ptr<ColumnDescriptor>> smart_cds;
5209  std::string columnQuery(
5210  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
5211  "is_notnull, compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
5212  "virtual_expr, is_deletedcol, default_value from mapd_columns WHERE tableid = " +
5213  std::to_string(table_id) + " ORDER BY tableid, columnid");
5214  sqliteConnector_.query(columnQuery);
5215  auto numRows = sqliteConnector_.getNumRows();
5216  int32_t skip_physical_cols = 0;
5217  for (size_t r = 0; r < numRows; ++r) {
5218  std::unique_ptr<ColumnDescriptor> cd = std::make_unique<ColumnDescriptor>();
5219  cd->tableId = sqliteConnector_.getData<int>(r, 0);
5220  cd->columnId = sqliteConnector_.getData<int>(r, 1);
5221  cd->columnName = sqliteConnector_.getData<string>(r, 2);
5222  cd->columnType.set_type((SQLTypes)sqliteConnector_.getData<int>(r, 3));
5223  cd->columnType.set_subtype((SQLTypes)sqliteConnector_.getData<int>(r, 4));
5224  cd->columnType.set_dimension(sqliteConnector_.getData<int>(r, 5));
5225  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
5226  cd->columnType.set_notnull(sqliteConnector_.getData<bool>(r, 7));
5227  cd->columnType.set_compression((EncodingType)sqliteConnector_.getData<int>(r, 8));
5228  cd->columnType.set_comp_param(sqliteConnector_.getData<int>(r, 9));
5229  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
5230  cd->chunks = sqliteConnector_.getData<string>(r, 11);
5231  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
5232  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
5233  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
5234  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
5235  if (sqliteConnector_.isNull(r, 16)) {
5236  cd->default_value = std::nullopt;
5237  } else {
5238  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
5239  }
5240  cd->isGeoPhyCol = skip_physical_cols-- > 0;
5241  cd->db_id = getDatabaseId();
5242  set_dict_key(*cd);
5243  smart_cds.emplace_back(std::move(cd));
5244  }
5245  // Once we have correctly initialized all columns, release their ownership as we
5246  // currently handle them as free pointers.
5247  for (auto& cd : smart_cds) {
5248  cds.emplace_back(cd.release());
5249  }
5250  return cds;
5251 }
5252 
5253 TableDescriptor* Catalog::createTableFromDiskUnlocked(int32_t table_id) {
5254  std::string query(
5255  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
5256  "max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, "
5257  "num_shards, key_metainfo, userid, sort_column_id, storage_type, "
5258  "max_rollback_epochs, is_system_table from mapd_tables WHERE tableid = " +
5259  std::to_string(table_id));
5260  sqliteConnector_.query(query);
5261  auto numRows = sqliteConnector_.getNumRows();
5262  if (!numRows) {
5263  throw NoTableFoundException(table_id);
5264  }
5265 
5266  const auto& storage_type = sqliteConnector_.getData<string>(0, 17);
5267  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
5268  const auto& table_name = sqliteConnector_.getData<string>(0, 1);
5269  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
5270  "supported table option (table "
5271  << table_name << " [" << table_id << "] in database " << currentDB_.dbName
5272  << ").";
5273  }
5274 
5275  // TODO(Misiu): Get rid of manual memory allocation and use smart pointers for
5276  // TableDecriptorMap_. Currently we use a smartpointer to cleanup if we catch
5277  // exceptions during initialization and then release ownership into the existing system.
5278  std::unique_ptr<TableDescriptor> td;
5279  td = (storage_type == StorageType::FOREIGN_TABLE)
5280  ? std::make_unique<foreign_storage::ForeignTable>()
5281  : std::make_unique<TableDescriptor>();
5282 
5283  td->tableId = sqliteConnector_.getData<int>(0, 0);
5284  td->tableName = sqliteConnector_.getData<string>(0, 1);
5285  td->nColumns = sqliteConnector_.getData<int>(0, 2);
5286  td->isView = sqliteConnector_.getData<bool>(0, 3);
5287  td->fragments = sqliteConnector_.getData<string>(0, 4);
5288  td->fragType = static_cast<Fragmenter_Namespace::FragmenterType>(
5289  sqliteConnector_.getData<int>(0, 5));
5290  td->maxFragRows = sqliteConnector_.getData<int>(0, 6);
5291  td->maxChunkSize = sqliteConnector_.getData<int64_t>(0, 7);
5292  td->fragPageSize = sqliteConnector_.getData<int>(0, 8);
5293  td->maxRows = sqliteConnector_.getData<int64_t>(0, 9);
5294  td->partitions = sqliteConnector_.getData<string>(0, 10);
5295  td->shardedColumnId = sqliteConnector_.getData<int>(0, 11);
5296  td->shard = sqliteConnector_.getData<int>(0, 12);
5297  td->nShards = sqliteConnector_.getData<int>(0, 13);
5298  td->keyMetainfo = sqliteConnector_.getData<string>(0, 14);
5299  td->userId = sqliteConnector_.getData<int>(0, 15);
5300  td->sortedColumnId =
5301  sqliteConnector_.isNull(0, 16) ? 0 : sqliteConnector_.getData<int>(0, 16);
5302  td->storageType = storage_type;
5303  td->maxRollbackEpochs = sqliteConnector_.getData<int>(0, 18);
5304  td->is_system_table = sqliteConnector_.getData<bool>(0, 19);
5305  td->hasDeletedCol = false;
5306 
5307  if (td->isView) {
5308  updateViewUnlocked(*td);
5309  } else {
5310  td->fragmenter = nullptr;
5311  }
5312 
5313  if (auto ftd = dynamic_cast<foreign_storage::ForeignTable*>(td.get())) {
5314  reloadForeignTableUnlocked(*ftd);
5315  }
5316 
5317  return td.release();
5318 }
5319 
5320 void Catalog::setForeignServerProperty(const std::string& server_name,
5321  const std::string& property,
5322  const std::string& value) {
5323  cat_sqlite_lock sqlite_lock(getObjForLock());
5324  sqliteConnector_.query_with_text_params(
5325  "SELECT id from omnisci_foreign_servers where name = ?",
5326  std::vector<std::string>{server_name});
5327  auto num_rows = sqliteConnector_.getNumRows();
5328  if (num_rows > 0) {
5329  CHECK_EQ(size_t(1), num_rows);
5330  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
5331  sqliteConnector_.query_with_text_params(
5332  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
5333  std::vector<std::string>{value, std::to_string(server_id)});
5334  } else {
5335  throw std::runtime_error{"Can not change property \"" + property +
5336  "\" for foreign server." + " Foreign server \"" +
5337  server_name + "\" is not found."};
5338  }
5339 }
5340 
5341 void Catalog::createDefaultServersIfNotExists() {
5346 
5347  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
5348  "default_local_delimited",
5350  options,
5352  local_csv_server->validate();
5353  createForeignServerNoLocks(std::move(local_csv_server), true);
5354 
5355 #ifdef ENABLE_IMPORT_PARQUET
5356  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
5357  "default_local_parquet",
5359  options,
5361  local_parquet_server->validate();
5362  createForeignServerNoLocks(std::move(local_parquet_server), true);
5363 #endif
5364 
5365  auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
5366  "default_local_regex_parsed",
5368  options,
5370  local_regex_parser_server->validate();
5371  createForeignServerNoLocks(std::move(local_regex_parser_server), true);
5372 }
5373 
5374 // prepare a fresh file reload on next table access
5375 void Catalog::setForReload(const int32_t tableId) {
5376  const auto td = getMetadataForTable(tableId);
5377  for (const auto shard : getPhysicalTablesDescriptors(td)) {
5378  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
5379  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
5380  }
5381 }
5382 
5383 // get a table's data dirs
5384 std::vector<std::string> Catalog::getTableDataDirectories(
5385  const TableDescriptor* td) const {
5386  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
5387  std::vector<std::string> file_paths;
5388  for (auto shard : getPhysicalTablesDescriptors(td)) {
5389  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
5390  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
5391  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
5392  file_paths.push_back(file_path.filename().string());
5393  }
5394  return file_paths;
5395 }
5396 
5397 // get a column's dict dir basename
5398 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd,
5399  bool file_name_only) const {
5400  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
5402  cd->columnType.get_comp_param() > 0) {
5403  const auto dictId = cd->columnType.get_comp_param();
5404  const