29 : catalog_(catalog) {}
38 std::find_if(cds.begin(), cds.end(), [&added_column](
const ColumnDescriptor* cd) {
39 return added_column.columnId == cd->columnId;
52 auto cd_it = std::find_if(
54 return renamed_column.columnId == cd->columnId;
56 if (cd_it != cds.end()) {
58 auto old_name = renamed_column.columnName;
59 if (cd->columnName != old_name) {
68 std::find_if(cds.begin(), cds.end(), [&added_dict](
const ColumnDescriptor* cd) {
69 return added_dict.columnId == cd->columnId;
71 if (cd_it != cds.end()) {
76 for (
const auto& dd : dds) {
77 if (!added_dict.columnType.is_dict_encoded_type() ||
78 dd->dictRef.dictId != added_dict.columnType.get_comp_param()) {
80 temp_cd.columnType.set_comp_param(dd->dictRef.dictId);
81 temp_cd.columnType.setStringDictKey(
91 auto cd_it = std::find_if(
93 return altered_column.new_cd.columnId == cd->columnId;
95 if (cd_it != cds.end()) {
104 for (
auto& [src_cd, dst_cd] : src_dst_cds) {
105 if (!src_cd->columnType.is_dict_encoded_type()) {
119 if (mem_level >= data_mgr.levelSizes_.size()) {
122 for (
int device = 0; device < data_mgr.levelSizes_[mem_level]; ++device) {
123 if (data_mgr.isBufferOnDevice(key, mem_level, device)) {
140 for (
auto& [src_cd, dst_cd] : src_dst_cds) {
141 if (src_cd->columnType.is_varlen_indeed() != dst_cd->columnType.is_varlen_indeed()) {
145 std::set<int> fragment_ids;
146 for (
const auto& [key, _] : chunk_metadata) {
149 for (
const auto& frag_id : fragment_ids) {
151 if (src_cd->columnType.is_varlen_indeed()) {
153 data_key.push_back(1);
155 auto index_key = key;
156 index_key.push_back(2);
169 for (
auto& [src_cd, dst_cd] : src_dst_cds) {
170 if (!dst_cd->columnType.is_geometry()) {
183 std::list<AlterTableAlterColumnCommandRecoveryMgr::ColumnAltered>
185 const std::list<std::pair<ColumnDescriptor, ColumnDescriptor>>& altered_columns) {
186 std::list<ColumnAltered> retval;
187 for (
const auto& [old_cd, new_cd] : altered_columns) {
188 retval.emplace_back(old_cd, new_cd);
193 std::list<std::pair<ColumnDescriptor, ColumnDescriptor>>
195 const std::list<ColumnAltered>& altered_columns) {
196 std::list<std::pair<ColumnDescriptor, ColumnDescriptor>> retval;
197 for (
const auto& [old_cd, new_cd] : altered_columns) {
198 retval.emplace_back(old_cd, new_cd);
212 std::list<std::pair<ColumnDescriptor, ColumnDescriptor>> altered_column_pairs;
229 const std::string& base_path) {
238 return prefix.string();
242 std::filesystem::path(
"alter_column_recovery_db_" + filepath_info.
db_name +
243 "_table_" + filepath_info.
table_name +
".json"))
263 throw std::runtime_error{
"Error trying to read file \"" +
filename +
264 "\". The error was: " + std::strerror(errno)};
266 std::string json_string;
273 rapidjson::Document d;
299 if (!std::filesystem::exists(prefix)) {
300 if (!std::filesystem::create_directory(prefix)) {
301 throw std::runtime_error{
"Error trying to create crash recovery directory \"" +
302 prefix +
"\". The error was: " + std::strerror(errno)};
307 std::ofstream ofs(
filename +
".tmp");
309 throw std::runtime_error{
"Error trying to create file \"" +
filename +
310 "\". The error was: " + std::strerror(errno)};
321 }
catch (std::exception& except) {
322 LOG(
WARNING) <<
"Alter column type: failed to clear source dictionaries: "
329 }
catch (std::exception& except) {
330 LOG(
WARNING) <<
"Alter column type: failed to clear remaining chunks: "
337 }
catch (std::exception& except) {
338 LOG(
WARNING) <<
"Alter column type: failed to remove geo's source column : "
350 for (
auto& [src_cd, dst_cd] : src_dst_cds) {
351 if (!dst_cd->columnType.is_dict_encoded_type()) {
358 auto string_dictionary =
361 if (!string_dictionary->checkpoint()) {
362 throw std::runtime_error(
"Failed to checkpoint dictionary while altering column " +
363 dst_cd->columnName +
".");
369 std::list<std::filesystem::path>
371 std::list<std::filesystem::path>
result;
374 if (!std::filesystem::exists(path)) {
378 for (
const auto& entry : std::filesystem::directory_iterator(path)) {
379 auto entry_path = entry.path().string();
380 if (entry_path.find(
"alter_column_recovery_db_" +
catalog_.
name() +
"_table_") !=
382 entry_path.find(
".json") != std::string::npos) {
383 if (entry_path.find(
".tmp") != std::string::npos &&
384 entry_path.find(
".tmp") == entry_path.size() - std::string{
".tmp"}.size()) {
385 if (!std::filesystem::remove(entry_path)) {
386 throw std::runtime_error(
"Failed to remove incomplete recovery file: " +
389 LOG(
INFO) <<
"Removing incomplete ALTER COLUMN recovery file: " + entry_path;
392 result.emplace_back(entry.path());
402 std::list<std::pair<ColumnDescriptor, ColumnDescriptor>>& pairs_list) {
404 std::list<std::pair<const ColumnDescriptor*, ColumnDescriptor*>>
result;
405 for (
auto& [src, dst] : pairs_list) {
413 result.emplace_back(catalog_cd, &dst);
426 auto table_id = recovery_param.
src_dst_cds.begin()->first.tableId;
428 CHECK(table_name_opt.has_value());
429 auto table_name = table_name_opt.value();
431 const auto td_with_lock =
434 const auto td = td_with_lock();
437 LOG(
INFO) <<
"Starting crash recovery for table: " << td->tableName;
441 auto current_first_epoch = table_epochs[0].table_epoch;
457 int alter_catalog_checkpoint_epoch = recovery_param.
table_epoch;
460 alter_catalog_checkpoint_epoch++;
462 int alter_data_checkpoint_epoch = alter_catalog_checkpoint_epoch + 1;
463 int completed_alter_column_checkpoint_epoch = alter_data_checkpoint_epoch + 1;
465 if (current_first_epoch == alter_catalog_checkpoint_epoch) {
473 }
else if (current_first_epoch == alter_data_checkpoint_epoch) {
477 }
catch (std::exception& except) {
478 throw std::runtime_error(
"Alter column recovery error during cleanup: " +
479 std::string(except.what()));
488 CHECK_EQ(current_first_epoch, completed_alter_column_checkpoint_epoch);
491 LOG(
INFO) <<
"Completed crash recovery for table: " << td->tableName;
496 if (recovery_files.empty()) {
501 for (
const auto& filepath : recovery_files) {
503 std::filesystem::remove(filepath);
508 std::map<std::string, AlterTableAlterColumnCommandRecoveryMgr>
510 std::map<std::string, AlterTableAlterColumnCommandRecoveryMgr>
result;
513 auto base_path = syscat.getCatalogBasePath();
515 if (!std::filesystem::exists(prefix)) {
519 auto catalog_metadata = syscat.getAllDBMetadata();
521 for (
const auto& entry : std::filesystem::directory_iterator(prefix)) {
522 auto entry_path = entry.path().string();
524 for (
const auto& db_metadata : catalog_metadata) {
525 if (result.count(db_metadata.dbName)) {
528 auto match_db = entry_path.find(
"alter_column_recovery_db_" + db_metadata.dbName);
529 if (match_db == std::string::npos) {
532 auto catalog = syscat.getCatalog(db_metadata.dbName);
533 CHECK(catalog.get());
534 result.emplace(db_metadata.dbName, *catalog);
545 for (
auto& [dbname, recovery_mgr] : recovery_mgrs) {
546 recovery_mgr.resolveIncompleteAlterColumnCommands();
550 namespace json_utils {
554 rapidjson::Document::AllocatorType& allocator) {
555 json_val.SetObject();
557 if (default_value.has_value()) {
560 json_val, default_value.value(),
"default_value_literal", allocator);
579 CHECK(json_val.IsObject());
581 bool has_default_value;
583 if (has_default_value) {
584 std::string default_value;
std::list< ColumnAltered > altered_columns
static void invalidateCachesByTable(size_t table_key)
alter_column_shared::TypePairs TypePairs
std::vector< int > ChunkKey
std::list< ColumnDescriptor > renamed_columns
void cleanupDeleteDictionaries(const TableDescriptor *td, const TypePairs &src_dst_cds)
std::list< std::pair< ColumnDescriptor, ColumnDescriptor > > toPairedCds(const std::list< ColumnAltered > &altered_columns)
void deleteChunk(const ChunkKey &key, const MemoryLevel mem_level, const int device_id)
class for a per-database catalog. also includes metadata for the current database and the current use...
void cleanupClearChunk(const ChunkKey &key, const MemoryLevel mem_level)
void set_value(rapidjson::Value &json_val, const ColumnDescriptor &column_desc, rapidjson::Document::AllocatorType &allocator)
CompareResult compare_column_descriptors(const ColumnDescriptor *lhs, const ColumnDescriptor *rhs)
RecoveryParamFilepathInfo getRecoveryFilepathInfo(const int32_t table_id=-1)
Data_Namespace::DataMgr & getDataMgr() const
void delDictionaryTransactional(const ColumnDescriptor &cd)
#define CHUNK_KEY_FRAGMENT_IDX
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
void recoverAlterTableAlterColumnFromFile(const std::string &filename)
RecoveryInfo deserializeRecoveryInformation(const std::string &filename)
std::list< ColumnDescriptor > updated_dict_cds
void renameColumn(const TableDescriptor *td, const ColumnDescriptor *cd, const std::string &newColumnName)
void resetTableEpochFloor(const int logicalTableId) const
rapidjson::Document read_from_file(const std::string &file_path)
void cleanupDropSourceGeoColumns(const TableDescriptor *td, const TypePairs &src_dst_cds)
static SysCatalog & instance()
This file contains the class specification and related data structures for SysCatalog.
Catalog_Namespace::Catalog & catalog_
std::list< std::pair< ColumnDescriptor, ColumnDescriptor > > src_dst_cds
void checkpoint(const TableDescriptor *td, const TypePairs &src_dst_cds)
const DBMetadata & getCurrentDB() const
std::string serializeRecoveryInformation(const RecoveryInfo ¶m)
std::list< const DictDescriptor * > getAllDictionariesWithColumnInName(const ColumnDescriptor *cd)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
static const std::string kRecoveryDirectoryName
void cleanupClearRemainingChunks(const TableDescriptor *td, const TypePairs &src_dst_cds)
void resolveIncompleteAlterColumnCommands()
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getDatabaseId() const
std::string write_to_string(const rapidjson::Document &document)
std::string recoveryFilepath(const RecoveryParamFilepathInfo &filepath_info)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
specifies the content in-memory of a row in the column metadata table
TypePairs getSrcDstCds(int table_id, std::list< std::pair< ColumnDescriptor, ColumnDescriptor >> &pairs_list)
void checkpointWithAutoRollback(const int logical_table_id) const
static void resolveIncompleteAlterColumnCommandsForAllCatalogs()
std::optional< std::string > default_value
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
static std::filesystem::path getRecoveryPrefix(const std::string &base_path)
std::list< std::filesystem::path > getRecoveryFiles()
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
void get_value(const rapidjson::Value &json_val, ColumnDescriptor &column_desc)
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
void dropColumnTransactional(const TableDescriptor &td, const ColumnDescriptor &cd)
void readSerializedRecoveryInformation(RecoveryInfo ¶m, const RecoveryParamFilepathInfo &filepath_info)
static std::map< std::string, AlterTableAlterColumnCommandRecoveryMgr > createRecoveryManagersForCatalogs()
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
std::string filename(char const *path)
void cleanup(const TableDescriptor *td, const TypePairs &src_dst_cds)
std::optional< std::string > getTableName(int32_t table_id) const
void alterColumnTypeTransactional(const ColumnDescriptor &cd)
static std::list< ColumnAltered > fromPairedCds(const std::list< std::pair< ColumnDescriptor, ColumnDescriptor >> &altered_columns)
std::list< ColumnDescriptor > added_columns
const std::string & getCatalogBasePath() const
void rollback(const TableDescriptor *td, const RecoveryInfo ¶m)
void writeSerializedRecoveryInformation(const RecoveryInfo ¶m, const RecoveryParamFilepathInfo &filepath_info)
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
AlterTableAlterColumnCommandRecoveryMgr(Catalog_Namespace::Catalog &catalog)