OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
migrations::MigrationMgr Class Reference

#include <MigrationMgr.h>

Static Public Member Functions

static void migrateDateInDaysMetadata (const Catalog_Namespace::TableDescriptorMapById &table_descriptors_by_id, const int database_id, Catalog_Namespace::Catalog *cat, SqliteConnector &sqlite)
 
static bool dropRenderGroupColumns (const Catalog_Namespace::TableDescriptorMapById &table_descriptors_by_id, Catalog_Namespace::Catalog *cat)
 
static void executeRebrandMigration (const std::string &base_path)
 
static void takeMigrationLock (const std::string &base_path)
 
static void relaxMigrationLock ()
 
static bool migrationEnabled ()
 
static void destroy ()
 

Static Private Attributes

static std::unique_ptr
< heavyai::DistributedSharedMutex
migration_mutex_
 
static bool migration_enabled_ {false}
 

Detailed Description

Definition at line 31 of file MigrationMgr.h.

Member Function Documentation

static void migrations::MigrationMgr::destroy ( )
inlinestatic

Definition at line 49 of file MigrationMgr.h.

References migration_mutex_.

Referenced by Catalog_Namespace::SysCatalog::destroy().

49  {
50  if (migration_mutex_) {
51  migration_mutex_->unlock();
52  migration_mutex_.reset();
53  }
54  }
static std::unique_ptr< heavyai::DistributedSharedMutex > migration_mutex_
Definition: MigrationMgr.h:57

+ Here is the caller graph for this function:

bool migrations::MigrationMgr::dropRenderGroupColumns ( const Catalog_Namespace::TableDescriptorMapById table_descriptors_by_id,
Catalog_Namespace::Catalog cat 
)
static

Definition at line 206 of file MigrationMgr.cpp.

References cat(), CHECK, Data_Namespace::DISK_LEVEL, logger::ERROR, logger::INFO, kINT, kMULTIPOLYGON, kPOLYGON, LOG, table_is_temporary(), and to_string().

Referenced by Catalog_Namespace::Catalog::checkDropRenderGroupColumnsMigration().

208  {
209  // for this catalog...
210  CHECK(cat);
211  auto& catalog = *cat;
212 
213  // skip info schema catalog
214  if (catalog.isInfoSchemaDb()) {
215  return true;
216  }
217 
218  // report catalog
219  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Processing catalog '"
220  << catalog.name() << "'";
221 
222  // HeavyConnect cache
223  auto* heavyconnect_cache =
224  catalog.getDataMgr().getPersistentStorageMgr()->getDiskCache();
225 
226  bool all_tables_migrated = true;
227 
228  // all tables...
229  for (auto td_itr = table_descriptors_by_id.begin();
230  td_itr != table_descriptors_by_id.end();
231  td_itr++) {
232  // the table...
233  auto const* td = td_itr->second;
234  CHECK(td);
235 
236  // skip views and temps
237  if (td->isView) {
238  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Skipping view '"
239  << td->tableName << "'";
240  continue;
241  }
242  if (table_is_temporary(td)) {
243  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Skipping temporary table '"
244  << td->tableName << "'";
245  continue;
246  }
247 
248  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Examining table '"
249  << td->tableName << "'";
250 
251  // find render group columns
252  auto logical_cds =
253  catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
254  // prepare to capture names
255  std::vector<std::string> columns_to_drop;
256  // iterate all columns
257  for (auto cd_itr = logical_cds.begin(); cd_itr != logical_cds.end(); cd_itr++) {
258  auto const* cd = *cd_itr;
259  CHECK(cd);
260  // poly or multipoly column?
261  auto const cd_type = cd->columnType.get_type();
262  if (cd_type == kPOLYGON || cd_type == kMULTIPOLYGON) {
263  // next logical column
264  auto const next_itr = std::next(cd_itr);
265  if (next_itr == logical_cds.end()) {
266  // no next column, perhaps table already migrated
267  break;
268  }
269  // the next column
270  auto const* next_cd = *next_itr;
271  CHECK(next_cd);
272  // expected name?
273  auto const next_name = next_cd->columnName;
274  auto const expected_name = cd->columnName + "_render_group";
275  if (next_name == expected_name) {
276  // expected type?
277  auto const next_type = next_cd->columnType.get_type();
278  if (next_type == kINT) {
279  // expected ID increment
280  auto const next_id = next_cd->columnId;
281  auto const expected_next_id =
282  cd->columnId + cd->columnType.get_physical_cols() + 1;
283  if (next_id == expected_next_id) {
284  // report
285  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Found render "
286  "group column '"
287  << next_name << "'";
288  // capture name
289  columns_to_drop.emplace_back(next_name);
290  // restart with the column after the one we identified
291  cd_itr++;
292  } else {
293  LOG(ERROR) << "MigrationMgr: dropRenderGroupColumns: Expected render "
294  "group column '"
295  << next_name << "' has wrong ID (" << next_id << "/"
296  << expected_next_id << "), skipping...";
297  }
298  } else {
299  LOG(ERROR) << "MigrationMgr: dropRenderGroupColumns: Expected render "
300  "group column '"
301  << next_name << "' has wrong type (" << to_string(next_type)
302  << "), skipping...";
303  }
304  }
305  }
306  }
307 
308  // any to drop?
309  if (columns_to_drop.size() == 0) {
310  LOG(INFO)
311  << "MigrationMgr: dropRenderGroupColumns: No render group columns found";
312  continue;
313  }
314 
315  // drop the columns
316  catalog.getSqliteConnector().query("BEGIN TRANSACTION");
317  try {
318  std::vector<int> column_ids;
319  for (auto const& column : columns_to_drop) {
320  auto const* cd = catalog.getMetadataForColumn(td->tableId, column);
321  CHECK(cd);
322  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Removing render "
323  "group column '"
324  << cd->columnName << "'";
325  catalog.dropColumn(*td, *cd);
326  column_ids.push_back(cd->columnId);
327  }
328  if (!td->isForeignTable()) {
329  for (auto const* physical_td : catalog.getPhysicalTablesDescriptors(td)) {
330  CHECK(physical_td);
331  // getMetadataForTable() may not have been called on this table, so
332  // the table may not yet have a fragmenter (which that function lazy
333  // creates by default) so call it manually here to force creation of
334  // the fragmenter so we can use it to actually drop the columns
335  if (physical_td->fragmenter == nullptr) {
336  catalog.getMetadataForTable(physical_td->tableId, true);
337  CHECK(physical_td->fragmenter);
338  }
339  physical_td->fragmenter->dropColumns(column_ids);
340  }
341  }
342  catalog.rollLegacy(true);
343  if (!td->isForeignTable()) {
344  if (td->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
345  catalog.resetTableEpochFloor(td->tableId);
346  catalog.checkpoint(td->tableId);
347  }
348  }
349  catalog.getSqliteConnector().query("END TRANSACTION");
350  } catch (std::exception& e) {
351  if (!td->isForeignTable()) {
352  catalog.setForReload(td->tableId);
353  }
354  catalog.rollLegacy(false);
355  catalog.getSqliteConnector().query("ROLLBACK TRANSACTION");
356  LOG(ERROR) << "MigrationMgr: dropRenderGroupColumns: Failed to drop render group "
357  "columns for Table '"
358  << td->tableName << "' in Database '" << catalog.name() << "' ("
359  << e.what() << ")";
360 
361  // trigger overall failure
362  all_tables_migrated = false;
363 
364  // don't do anything else for this table
365  continue;
366  }
367 
368  // flush any HeavyConnect foreign table cache
369  if (heavyconnect_cache && td->isForeignTable()) {
370  for (auto const* physical_td : catalog.getPhysicalTablesDescriptors(td)) {
371  CHECK(physical_td);
372  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Flushing HeavyConnect "
373  "cache for table '"
374  << physical_td->tableName << "'";
375  heavyconnect_cache->clearForTablePrefix(
376  {catalog.getCurrentDB().dbId, physical_td->tableId});
377  }
378  }
379  }
380 
381  return all_tables_migrated;
382 }
std::string cat(Ts &&...args)
#define LOG(tag)
Definition: Logger.h:285
std::string to_string(char const *&&v)
bool table_is_temporary(const TableDescriptor *const td)
#define CHECK(condition)
Definition: Logger.h:291
Definition: sqltypes.h:72

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void migrations::MigrationMgr::executeRebrandMigration ( const std::string &  base_path)
static

Definition at line 467 of file MigrationMgr.cpp.

References shared::kCatalogDirectoryName, shared::kDataDirectoryName, shared::kDefaultExportDirName, shared::kDefaultImportDirName, shared::kDefaultKeyFileName, shared::kDefaultKeyStoreDirName, shared::kDefaultLicenseFileName, shared::kDefaultLogDirName, shared::kSystemCatalogName, migrations::anonymous_namespace{MigrationMgr.cpp}::rename_and_symlink_file(), and migrations::anonymous_namespace{MigrationMgr.cpp}::rename_and_symlink_path().

Referenced by CommandLineOptions::parse_command_line().

467  {
468  bool migration_occurred{false};
469 
470  // clang-format off
471  const std::map<std::string, std::string> old_to_new_dir_names {
472  {"mapd_catalogs", shared::kCatalogDirectoryName},
473  {"mapd_data", shared::kDataDirectoryName},
474  {"mapd_log", shared::kDefaultLogDirName},
475  {"mapd_export", shared::kDefaultExportDirName},
476  {"mapd_import", shared::kDefaultImportDirName},
477  {"omnisci_key_store", shared::kDefaultKeyStoreDirName}
478  };
479  // clang-format on
480 
481  const auto storage_base_path = std::filesystem::canonical(base_path);
482  // Rename legacy directories (if they exist), and create symlinks from legacy directory
483  // names to the new names (if they don't already exist).
484  for (const auto& [old_dir_name, new_dir_name] : old_to_new_dir_names) {
485  auto old_path = storage_base_path / old_dir_name;
486  auto new_path = storage_base_path / new_dir_name;
487  if (rename_and_symlink_path(old_path, new_path)) {
488  migration_occurred = true;
489  }
490  }
491 
492  // Rename legacy files and create symlinks to them.
493  const auto license_updated = rename_and_symlink_file(
494  storage_base_path, "", "omnisci.license", shared::kDefaultLicenseFileName);
495  const auto key_updated = rename_and_symlink_file(storage_base_path,
497  "omnisci.pem",
499  const auto sys_catalog_updated = rename_and_symlink_file(storage_base_path,
501  "omnisci_system_catalog",
503  if (license_updated || key_updated || sys_catalog_updated) {
504  migration_occurred = true;
505  }
506 
507  // Delete the disk cache directory and legacy files that will no longer be used.
508  const std::array<std::filesystem::path, 9> files_to_delete{
509  storage_base_path / "omnisci_disk_cache",
510  storage_base_path / "omnisci_server_pid.lck",
511  storage_base_path / "mapd_server_pid.lck",
512  storage_base_path / shared::kDefaultLogDirName / "omnisci_server.FATAL",
513  storage_base_path / shared::kDefaultLogDirName / "omnisci_server.ERROR",
514  storage_base_path / shared::kDefaultLogDirName / "omnisci_server.WARNING",
515  storage_base_path / shared::kDefaultLogDirName / "omnisci_server.INFO",
516  storage_base_path / shared::kDefaultLogDirName / "omnisci_web_server.ALL",
517  storage_base_path / shared::kDefaultLogDirName / "omnisci_web_server.ACCESS"};
518 
519  for (const auto& file_path : files_to_delete) {
520  if (std::filesystem::exists(file_path)) {
521  std::filesystem::remove_all(file_path);
522  std::cout << "Rebrand migration: Deleted file " << file_path << std::endl;
523  migration_occurred = true;
524  }
525  }
526  if (migration_occurred) {
527  std::cout << "Rebrand migration completed" << std::endl;
528  }
529 }
const std::string kDataDirectoryName
const std::string kDefaultLogDirName
const std::string kSystemCatalogName
const std::string kDefaultExportDirName
bool rename_and_symlink_path(const std::filesystem::path &old_path, const std::filesystem::path &new_path)
const std::string kDefaultImportDirName
const std::string kDefaultKeyFileName
const std::string kDefaultKeyStoreDirName
bool rename_and_symlink_file(const std::filesystem::path &base_path, const std::string &dir_name, const std::string &old_file_name, const std::string &new_file_name)
const std::string kCatalogDirectoryName
const std::string kDefaultLicenseFileName

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void migrations::MigrationMgr::migrateDateInDaysMetadata ( const Catalog_Namespace::TableDescriptorMapById table_descriptors_by_id,
const int  database_id,
Catalog_Namespace::Catalog cat,
SqliteConnector sqlite 
)
static

Definition at line 91 of file MigrationMgr.cpp.

References cat(), logger::ERROR, SqliteConnector::getData(), Executor::getExecutor(), SqliteConnector::getNumRows(), logger::INFO, kDATE, kENCODING_DATE_IN_DAYS, LOG, MAPD_VERSION, SqliteConnector::query(), SqliteConnector::query_with_text_params(), TableOptimizer::recomputeMetadata(), to_string(), and Executor::UNITARY_EXECUTOR_ID.

Referenced by Catalog_Namespace::Catalog::checkDateInDaysColumnMigration().

95  {
96  std::vector<int> tables_migrated = {};
97  std::unordered_map<int, std::vector<std::string>> tables_to_migrate;
98  sqlite.query("BEGIN TRANSACTION");
99  try {
100  sqlite.query(
101  "select name from sqlite_master WHERE type='table' AND "
102  "name='mapd_version_history'");
103  if (sqlite.getNumRows() == 0) {
104  sqlite.query(
105  "CREATE TABLE mapd_version_history(version integer, migration_history text "
106  "unique)");
107  sqlite.query(
108  "CREATE TABLE mapd_date_in_days_column_migration_tmp(table_id integer primary "
109  "key)");
110  } else {
111  sqlite.query(
112  "select * from mapd_version_history where migration_history = "
113  "'date_in_days_column'");
114  if (sqlite.getNumRows() != 0) {
115  // no need for further execution
116  sqlite.query("END TRANSACTION");
117  return;
118  }
119  LOG(INFO) << "Checking for date columns requiring metadata migration.";
120  sqlite.query(
121  "select name from sqlite_master where type='table' AND "
122  "name='mapd_date_in_days_column_migration_tmp'");
123  if (sqlite.getNumRows() != 0) {
124  sqlite.query("select table_id from mapd_date_in_days_column_migration_tmp");
125  if (sqlite.getNumRows() != 0) {
126  for (size_t i = 0; i < sqlite.getNumRows(); i++) {
127  tables_migrated.push_back(sqlite.getData<int>(i, 0));
128  }
129  }
130  } else {
131  sqlite.query(
132  "CREATE TABLE mapd_date_in_days_column_migration_tmp(table_id integer "
133  "primary key)");
134  }
135  }
136  sqlite.query_with_text_params(
137  "SELECT tables.tableid, tables.name, columns.name FROM mapd_tables tables, "
138  "mapd_columns columns where tables.tableid = columns.tableid AND "
139  "columns.coltype = ?1 AND columns.compression = ?2",
140  std::vector<std::string>{
141  std::to_string(static_cast<int>(SQLTypes::kDATE)),
143  if (sqlite.getNumRows() != 0) {
144  for (size_t i = 0; i < sqlite.getNumRows(); i++) {
145  tables_to_migrate[sqlite.getData<int>(i, 0)] = {
146  sqlite.getData<std::string>(i, 1), sqlite.getData<std::string>(i, 2)};
147  }
148  }
149  } catch (const std::exception& e) {
150  LOG(ERROR) << "Failed to complete migration on date in days column metadata: "
151  << e.what();
152  sqlite.query("ROLLBACK");
153  throw;
154  }
155  sqlite.query("END TRANSACTION");
156 
157  for (auto& id_names : tables_to_migrate) {
158  if (std::find(tables_migrated.begin(), tables_migrated.end(), id_names.first) ==
159  tables_migrated.end()) {
160  sqlite.query("BEGIN TRANSACTION");
161  try {
162  LOG(INFO) << "Table: " << id_names.second[0]
163  << " may suffer from issues with DATE column: " << id_names.second[1]
164  << ". Running an OPTIMIZE command to solve any issues with metadata.";
165 
166  // TODO(adb): Could have the TableOptimizer get the Executor and avoid including
167  // Execute.h
169  auto table_desc_itr = table_descriptors_by_id.find(id_names.first);
170  if (table_desc_itr == table_descriptors_by_id.end()) {
171  throw std::runtime_error("Table descriptor does not exist for table " +
172  id_names.second[0] + " does not exist.");
173  }
174  auto td = table_desc_itr->second;
175  TableOptimizer optimizer(td, executor.get(), *cat);
176  optimizer.recomputeMetadata();
177 
178  sqlite.query_with_text_params(
179  "INSERT INTO mapd_date_in_days_column_migration_tmp VALUES(?)",
180  std::vector<std::string>{std::to_string(id_names.first)});
181  } catch (const std::exception& e) {
182  LOG(ERROR) << "Failed to complete metadata migration on date in days column: "
183  << e.what();
184  sqlite.query("ROLLBACK");
185  throw;
186  }
187  sqlite.query("COMMIT");
188  }
189  }
190 
191  sqlite.query("BEGIN TRANSACTION");
192  try {
193  sqlite.query("DROP TABLE mapd_date_in_days_column_migration_tmp");
194  sqlite.query_with_text_params(
195  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
196  std::vector<std::string>{std::to_string(MAPD_VERSION), "date_in_days_column"});
197  } catch (const std::exception& e) {
198  LOG(ERROR) << "Failed to complete migraion on date in days column: " << e.what();
199  sqlite.query("ROLLBACK");
200  throw;
201  }
202  sqlite.query("END TRANSACTION");
203  LOG(INFO) << "Successfully migrated all date in days column metadata.";
204 }
std::string cat(Ts &&...args)
T getData(const int row, const int col)
virtual void query_with_text_params(std::string const &query_only)
#define LOG(tag)
Definition: Logger.h:285
virtual void query(const std::string &queryString)
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
std::string to_string(char const *&&v)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
static const int32_t MAPD_VERSION
Definition: release.h:32
Definition: sqltypes.h:80
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
virtual size_t getNumRows() const
void recomputeMetadata() const
Recomputes per-chunk metadata for each fragment in the table. Updates and deletes can cause chunk met...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

static bool migrations::MigrationMgr::migrationEnabled ( )
inlinestatic

Definition at line 47 of file MigrationMgr.h.

References migration_enabled_.

Referenced by Catalog_Namespace::SysCatalog::init(), and CommandLineOptions::parse_command_line().

47 { return migration_enabled_; }
static bool migration_enabled_
Definition: MigrationMgr.h:58

+ Here is the caller graph for this function:

void migrations::MigrationMgr::relaxMigrationLock ( )
static

Definition at line 76 of file MigrationMgr.cpp.

References g_multi_instance, migration_enabled_, and migration_mutex_.

Referenced by Catalog_Namespace::SysCatalog::init().

76  {
77 // TODO: support lock on Windows
78 #ifndef _WIN32
79  // Only used for --multi-instance clusters.
80  if (!g_multi_instance) {
81  return;
82  }
83 
84  // If we ran migrations, now relax the exclusive lock to a shared lock.
86  migration_mutex_->convert_lock_shared();
87  }
88 #endif // _WIN32
89 }
bool g_multi_instance
Definition: heavyai_locks.h:22
static bool migration_enabled_
Definition: MigrationMgr.h:58
static std::unique_ptr< heavyai::DistributedSharedMutex > migration_mutex_
Definition: MigrationMgr.h:57

+ Here is the caller graph for this function:

void migrations::MigrationMgr::takeMigrationLock ( const std::string &  base_path)
static

Definition at line 39 of file MigrationMgr.cpp.

References g_multi_instance, shared::kLockfilesDirectoryName, migration_enabled_, and migration_mutex_.

Referenced by Catalog_Namespace::SysCatalog::init(), and CommandLineOptions::parse_command_line().

39  {
40 // TODO: support lock on Windows
41 #ifndef _WIN32
42  // Only used for --multi-instance clusters.
43  if (!g_multi_instance) {
44  migration_enabled_ = true;
45  return;
46  }
47 
48  // If we already have the migration lock then do nothing.
49  if (migration_mutex_) {
50  return;
51  }
52 
53  // Initialize the migration mutex. Will be locked until process exit.
54  migration_mutex_ = std::make_unique<heavyai::DistributedSharedMutex>(
55  std::filesystem::path(base_path) / shared::kLockfilesDirectoryName /
56  "migration.lockfile");
57 
58  // Take an exclusive lock if we can. If we get the exclusive lock, then later it will be
59  // relaxed to a shared lock, after we run migrations.
61  if (!g_multi_instance && !migration_enabled_) {
62  throw std::runtime_error(
63  "another HeavyDB server instance is already using data directory: " + base_path);
64  }
65 
66  // If we didn't get the exclusive lock, we'll wait for a shared lock instead, and we
67  // won't run migrations.
68  if (!migration_enabled_) {
69  migration_mutex_->lock_shared();
70  }
71 #else
72  migration_enabled_ = true;
73 #endif // _WIN32
74 }
bool g_multi_instance
Definition: heavyai_locks.h:22
static bool migration_enabled_
Definition: MigrationMgr.h:58
static std::unique_ptr< heavyai::DistributedSharedMutex > migration_mutex_
Definition: MigrationMgr.h:57
const std::string kLockfilesDirectoryName

+ Here is the caller graph for this function:

Member Data Documentation

bool migrations::MigrationMgr::migration_enabled_ {false}
inlinestaticprivate

Definition at line 58 of file MigrationMgr.h.

Referenced by migrationEnabled(), relaxMigrationLock(), and takeMigrationLock().

std::unique_ptr<heavyai::DistributedSharedMutex> migrations::MigrationMgr::migration_mutex_
inlinestaticprivate

Definition at line 57 of file MigrationMgr.h.

Referenced by destroy(), relaxMigrationLock(), and takeMigrationLock().


The documentation for this class was generated from the following files: