20 #include <boost/lexical_cast.hpp>
27 #include <type_traits>
41 #define DROP_FRAGMENT_FACTOR \
42 0.97 // drop to 97% of max so we don't keep adding and dropping fragments
52 namespace Fragmenter_Namespace {
54 InsertOrderFragmenter::InsertOrderFragmenter(
55 const vector<int> chunkKeyPrefix,
56 vector<Chunk>& chunkVec,
59 const int physicalTableId,
61 const size_t maxFragmentRows,
62 const size_t maxChunkSize,
63 const size_t pageSize,
66 const bool uses_foreign_storage)
67 : chunkKeyPrefix_(chunkKeyPrefix)
70 , physicalTableId_(physicalTableId)
72 , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
76 , maxChunkSize_(maxChunkSize)
78 , fragmenterType_(
"insert_order")
79 , defaultInsertLevel_(defaultInsertLevel)
80 , uses_foreign_storage_(uses_foreign_storage)
81 , hasMaterializedRowId_(
false)
82 , mutex_access_inmem_states(new std::mutex) {
86 for (
auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
87 int columnId = colIt->getColumnDesc()->columnId;
88 columnMap_[columnId] = *colIt;
89 if (colIt->getColumnDesc()->columnName ==
"rowid") {
90 hasMaterializedRowId_ =
true;
91 rowIdColId_ = columnId;
94 conditionallyInstantiateFileMgrWithParams();
98 InsertOrderFragmenter::~InsertOrderFragmenter() {}
104 key.push_back(column_id);
105 key.push_back(fragment_id);
116 array_chunk.
temp_cd = *array_cd;
128 const size_t num_elements,
131 std::map<int, Chunk_NS::Chunk>& column_map)
132 : device_id_(device_id)
133 , chunk_key_prefix_(chunk_key_prefix)
134 , fragment_info_(fragment_info)
137 , num_elements_(num_elements)
140 , column_map_(column_map)
142 , index_buffer_(nullptr)
143 , disk_level_src_chunk_{src_cd}
144 , mem_level_src_chunk_{src_cd} {
145 key_ =
get_chunk_key(chunk_key_prefix, src_cd->columnId, fragment_info->fragmentId);
157 index_buffer->
unPin();
163 disk_level_src_chunk_.getChunkBuffer(
167 mem_level_src_chunk_.getChunkBuffer(
data_mgr_,
171 disk_level_src_chunk_.getBuffer()->size(),
174 mem_level_src_chunk_.getBuffer()->getEncoder()->getNumElems());
176 auto db_id = catalog_->getDatabaseId();
180 std::tie(src_data_, std::ignore) = source->getSourceData();
181 }
catch (std::exception& except) {
183 throw std::runtime_error(
"Column " + src_cd_->columnName +
": " + except.what());
215 std::unique_ptr<data_conversion::BaseSource>
source;
232 const size_t num_elements,
235 std::map<int, Chunk_NS::Chunk>& column_map,
236 const std::list<const ColumnDescriptor*>& columns)
246 , dst_columns_(columns) {}
249 std::list<Chunk_NS::Chunk>& geo_chunks = param_.geo_chunks;
250 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata = param_.geo_chunk_metadata;
252 for (
auto dst_cd : dst_columns_) {
253 geo_chunks.emplace_back(dst_cd,
true);
254 auto& dst_chunk = geo_chunks.back();
256 createChunkScratchBuffer(dst_chunk);
257 dst_chunk.initEncoder();
259 chunk_metadata.push_back(std::make_unique<ChunkMetadata>());
264 for (
auto& dst_chunk : param_.geo_chunks) {
265 freeChunkScratchBuffer(dst_chunk);
270 auto convert_encoder =
273 convert_encoder->encodeAndAppendData(src_data_, num_elements_);
274 convert_encoder->finalize(num_elements_);
275 }
catch (std::exception& except) {
276 throw std::runtime_error(
"Column " + (*dst_columns_.begin())->columnName +
": " +
282 auto metadata_it = param_.geo_chunk_metadata.begin();
283 auto chunk_it = param_.geo_chunks.begin();
284 for (
auto dst_cd : dst_columns_) {
285 auto& chunk = *chunk_it;
286 auto& metadata = *metadata_it;
288 auto encoder = chunk.getBuffer()->getEncoder();
290 encoder->resetChunkStats(metadata->chunkStats);
291 encoder->setNumElems(num_elements_);
294 get_chunk_key(chunk_key_prefix_, dst_cd->columnId, fragment_info_->fragmentId);
296 if (dst_cd->columnType.is_varlen_indeed()) {
297 auto data_key = dst_key;
298 data_key.push_back(1);
299 auto index_key = dst_key;
300 index_key.push_back(2);
302 chunk.getBuffer()->setUpdated();
303 chunk.getIndexBuf()->setUpdated();
305 Chunk fragmenter_chunk{dst_cd,
false};
307 data_mgr_->getGlobalFileMgr()->putBuffer(data_key, chunk.getBuffer()));
308 fragmenter_chunk.setIndexBuffer(
309 data_mgr_->getGlobalFileMgr()->putBuffer(index_key, chunk.getIndexBuf()));
310 column_map_[src_cd_->columnId] = fragmenter_chunk;
313 chunk.getBuffer()->setUpdated();
315 Chunk fragmenter_chunk{dst_cd,
false};
317 data_mgr_->getGlobalFileMgr()->putBuffer(dst_key, chunk.getBuffer()));
318 column_map_[src_cd_->columnId] = fragmenter_chunk;
337 const size_t num_elements,
340 std::map<int, Chunk_NS::Chunk>& column_map)
352 auto db_id = catalog_->getDatabaseId();
353 param_.db_id = db_id;
355 if (dst_cd_->columnType.is_array()) {
357 param_.scalar_temp_chunk = scalar_temp_chunk_.chunk;
360 auto& dst_chunk = param_.dst_chunk;
362 createChunkScratchBuffer(dst_chunk);
364 if (dst_cd_->columnType.is_array()) {
365 createChunkScratchBuffer(param_.scalar_temp_chunk);
368 buffer_ = dst_chunk.getBuffer();
369 index_buffer_ = dst_chunk.getIndexBuf();
373 freeChunkScratchBuffer(param_.dst_chunk);
374 if (dst_cd_->columnType.is_array()) {
375 freeChunkScratchBuffer(param_.scalar_temp_chunk);
380 auto& dst_chunk = param_.dst_chunk;
381 disk_level_src_chunk_.getBuffer()->syncEncoder(dst_chunk.getBuffer());
382 if (disk_level_src_chunk_.getIndexBuf() && dst_chunk.getIndexBuf()) {
383 disk_level_src_chunk_.getIndexBuf()->syncEncoder(dst_chunk.getIndexBuf());
386 dst_chunk.initEncoder();
388 auto convert_encoder =
392 convert_encoder->encodeAndAppendData(src_data_, num_elements_);
393 convert_encoder->finalize(num_elements_);
394 }
catch (std::exception& except) {
395 throw std::runtime_error(
"Column " + src_cd_->columnName +
": " + except.what());
398 auto metadata = convert_encoder->getMetadata(
399 dst_cd_->columnType.is_array() ? param_.scalar_temp_chunk : dst_chunk);
401 buffer_->getEncoder()->resetChunkStats(metadata->chunkStats);
402 buffer_->getEncoder()->setNumElems(num_elements_);
404 buffer_->setUpdated();
406 index_buffer_->setUpdated();
411 if (dst_cd_->columnType.is_varlen_indeed()) {
412 auto data_key = key_;
413 data_key.push_back(1);
414 auto index_key = key_;
415 index_key.push_back(2);
417 Chunk fragmenter_chunk{dst_cd_,
false};
419 data_mgr_->getGlobalFileMgr()->putBuffer(data_key, buffer_));
420 fragmenter_chunk.setIndexBuffer(
421 data_mgr_->getGlobalFileMgr()->putBuffer(index_key, index_buffer_));
422 column_map_[src_cd_->columnId] = fragmenter_chunk;
425 Chunk fragmenter_chunk{dst_cd_,
false};
427 column_map_[src_cd_->columnId] = fragmenter_chunk;
437 const int fragment_id,
438 const int num_devices) {
440 return (table_id + fragment_id) % num_devices;
442 return fragment_id % num_devices;
447 const size_t num_rows_left,
448 const size_t num_rows_inserted,
449 const std::unordered_map<int, size_t>& var_len_col_info,
450 const size_t max_chunk_size,
452 std::map<int, Chunk_NS::Chunk>& column_map,
453 const std::vector<size_t>& valid_row_indices) {
454 size_t num_rows_to_insert = min(rows_left_in_current_fragment, num_rows_left);
455 if (rows_left_in_current_fragment != 0) {
456 for (
const auto& var_len_col_info_it : var_len_col_info) {
457 CHECK_LE(var_len_col_info_it.second, max_chunk_size);
458 size_t bytes_left = max_chunk_size - var_len_col_info_it.second;
459 auto find_it = insert_chunks.
chunks.find(var_len_col_info_it.first);
460 if (find_it == insert_chunks.
chunks.end()) {
463 const auto& chunk = find_it->second;
464 auto column_type = chunk->getColumnDesc()->columnType;
465 const int8_t* index_buffer_ptr =
466 column_type.is_varlen_indeed() ? chunk->getIndexBuf()->getMemoryPtr() :
nullptr;
467 CHECK(column_type.is_varlen());
469 auto col_map_it = column_map.find(var_len_col_info_it.first);
471 std::min(num_rows_to_insert,
472 col_map_it->second.getNumElemsForBytesEncodedDataAtIndices(
473 index_buffer_ptr, valid_row_indices, bytes_left));
476 return num_rows_to_insert;
481 void InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams() {
485 if (!uses_foreign_storage_ &&
488 catalog_->getMetadataForTable(physicalTableId_,
false );
491 dataMgr_->getGlobalFileMgr()->setFileMgrParams(
492 chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
496 void InsertOrderFragmenter::getChunkMetadata() {
497 if (uses_foreign_storage_ ||
501 dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
506 int fragment_subkey_index = 3;
508 chunk_metadata.end(),
509 [&](
const auto& pair1,
const auto& pair2) {
510 return pair1.first[3] < pair2.first[3];
513 for (
auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
515 int cur_column_id = chunk_itr->first[2];
516 int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
518 if (fragmentInfoVec_.empty() ||
519 cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
520 auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
521 CHECK(new_fragment_info);
522 maxFragmentId_ = cur_fragment_id;
523 new_fragment_info->fragmentId = cur_fragment_id;
524 new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
525 numTuples_ += new_fragment_info->getPhysicalNumTuples();
526 for (
const auto level_size : dataMgr_->levelSizes_) {
527 new_fragment_info->deviceIds.push_back(
530 new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
531 new_fragment_info->physicalTableId = physicalTableId_;
532 new_fragment_info->shard = shard_;
533 fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
535 if (chunk_itr->second->numElements !=
536 fragmentInfoVec_.back()->getPhysicalNumTuples()) {
537 LOG(
FATAL) <<
"Inconsistency in num tuples within fragment for table " +
541 fragmentInfoVec_.back()->getPhysicalNumTuples()) +
546 CHECK(fragmentInfoVec_.back().get());
547 fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
551 size_t maxFixedColSize = 0;
553 for (
auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
554 auto size = colIt->second.getColumnDesc()->columnType.get_size();
556 varLenColInfo_.insert(std::make_pair(colIt->first, 0));
561 maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
565 maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
566 setLastFragmentVarLenColumnSizes();
569 void InsertOrderFragmenter::dropFragmentsToSize(
const size_t max_rows) {
571 dropFragmentsToSizeNoInsertLock(max_rows);
574 void InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock(
const size_t max_rows) {
579 if (fragmentInfoVec_.empty() ||
580 numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
584 if (numTuples_ > max_rows) {
585 size_t preNumTuples = numTuples_;
586 vector<int> dropFragIds;
588 while (numTuples_ > targetRows) {
589 CHECK_GT(fragmentInfoVec_.size(), size_t(0));
590 size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
591 dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
592 fragmentInfoVec_.pop_front();
593 CHECK_GE(numTuples_, numFragTuples);
594 numTuples_ -= numFragTuples;
596 deleteFragments(dropFragIds);
597 LOG(
INFO) <<
"dropFragmentsToSize, numTuples pre: " << preNumTuples
598 <<
" post: " << numTuples_ <<
" maxRows: " << max_rows;
602 void InsertOrderFragmenter::deleteFragments(
const vector<int>& dropFragIds) {
612 auto chunkKeyPrefix = chunkKeyPrefix_;
614 chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
619 const auto delete_lock =
624 for (
const auto fragId : dropFragIds) {
625 for (
const auto& col : columnMap_) {
626 int colId = col.first;
627 vector<int> fragPrefix = chunkKeyPrefix_;
628 fragPrefix.push_back(colId);
629 fragPrefix.push_back(fragId);
630 dataMgr_->deleteChunksWithPrefix(fragPrefix);
635 void InsertOrderFragmenter::updateColumnChunkMetadata(
637 const int fragment_id,
638 const std::shared_ptr<ChunkMetadata> metadata) {
642 CHECK(metadata.get());
643 auto fragment_info = getFragmentInfo(fragment_id);
644 CHECK(fragment_info);
645 fragment_info->setChunkMetadata(cd->
columnId, metadata);
648 void InsertOrderFragmenter::updateChunkStats(
650 std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map,
651 std::optional<Data_Namespace::MemoryLevel> memory_level) {
660 LOG(
WARNING) <<
"Skipping chunk stats update for logical table " << physicalTableId_;
664 const auto column_id = cd->
columnId;
665 const auto col_itr = columnMap_.find(column_id);
666 CHECK(col_itr != columnMap_.end());
668 for (
auto const& fragment : fragmentInfoVec_) {
669 auto stats_itr = stats_map.find(fragment->fragmentId);
670 if (stats_itr != stats_map.end()) {
671 auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
672 CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
673 ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
676 fragment->fragmentId};
678 &catalog_->getDataMgr(),
680 memory_level.value_or(defaultInsertLevel_),
682 chunk_meta_it->second->numBytes,
683 chunk_meta_it->second->numElements);
684 auto buf = chunk->getBuffer();
686 if (!buf->hasEncoder()) {
687 throw std::runtime_error(
"No encoder for chunk " +
show_chunk(chunk_key));
689 auto encoder = buf->getEncoder();
691 auto chunk_stats = stats_itr->second;
693 auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
694 encoder->getMetadata(old_chunk_metadata);
695 auto& old_chunk_stats = old_chunk_metadata->chunkStats;
697 const bool didResetStats = encoder->resetChunkStats(chunk_stats);
702 if (!didResetStats) {
703 VLOG(3) <<
"Skipping chunk stats reset for " <<
show_chunk(chunk_key);
708 VLOG(3) <<
"Nulls: " << (chunk_stats.has_nulls ?
"True" :
"False");
717 VLOG(2) <<
"Nulls: " << (chunk_stats.has_nulls ?
"True" :
"False");
720 auto new_metadata = std::make_shared<ChunkMetadata>();
723 encoder->getMetadata(new_metadata);
725 fragment->setChunkMetadata(column_id, new_metadata);
726 fragment->shadowChunkMetadataMap =
727 fragment->getChunkMetadataMapPhysicalCopy();
732 LOG(
WARNING) <<
"No chunk stats update found for fragment " << fragment->fragmentId
733 <<
", table " << physicalTableId_ <<
", "
734 <<
", column " << column_id;
739 FragmentInfo* InsertOrderFragmenter::getFragmentInfo(
const int fragment_id)
const {
740 auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
741 fragmentInfoVec_.end(),
742 [fragment_id](
const auto& fragment) ->
bool {
743 return fragment->fragmentId == fragment_id;
745 CHECK(fragment_it != fragmentInfoVec_.end());
746 return fragment_it->get();
749 bool InsertOrderFragmenter::isAddingNewColumns(
const InsertData& insert_data)
const {
750 bool all_columns_already_exist =
true, all_columns_are_new =
true;
751 for (
const auto column_id : insert_data.
columnIds) {
752 if (columnMap_.find(column_id) == columnMap_.end()) {
753 all_columns_already_exist =
false;
755 all_columns_are_new =
false;
759 bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
760 CHECK(either_all_exist_or_all_new);
761 return all_columns_are_new;
764 void InsertOrderFragmenter::insertChunks(
const InsertChunks& insert_chunk) {
768 insertChunksImpl(insert_chunk);
769 if (defaultInsertLevel_ ==
771 dataMgr_->checkpoint(
776 auto db_id = insert_chunk.
db_id;
777 auto table_epochs = catalog_->getTableEpochs(db_id, insert_chunk.
table_id);
781 catalog_->setTableEpochs(db_id, table_epochs);
786 void InsertOrderFragmenter::insertData(
InsertData& insert_data_struct) {
790 if (!isAddingNewColumns(insert_data_struct)) {
791 insertDataImpl(insert_data_struct);
793 addColumns(insert_data_struct);
795 if (defaultInsertLevel_ ==
797 dataMgr_->checkpoint(
802 auto table_epochs = catalog_->getTableEpochs(insert_data_struct.
databaseId,
807 catalog_->setTableEpochs(insert_data_struct.
databaseId, table_epochs);
812 void InsertOrderFragmenter::insertChunksNoCheckpoint(
const InsertChunks& insert_chunk) {
817 insertChunksImpl(insert_chunk);
820 void InsertOrderFragmenter::insertDataNoCheckpoint(
InsertData& insert_data_struct) {
825 if (!isAddingNewColumns(insert_data_struct)) {
826 insertDataImpl(insert_data_struct);
828 addColumns(insert_data_struct);
832 void InsertOrderFragmenter::addColumns(
const InsertData& insertDataStruct) {
835 size_t numRowsLeft = insertDataStruct.
numRows;
836 for (
const auto columnId : insertDataStruct.
columnIds) {
837 CHECK(columnMap_.end() == columnMap_.find(columnId));
838 const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
843 for (
auto const& fragmentInfo : fragmentInfoVec_) {
844 fragmentInfo->shadowChunkMetadataMap =
845 fragmentInfo->getChunkMetadataMapPhysicalCopy();
846 auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples();
847 size_t numRowsCanBeInserted;
848 for (
size_t i = 0; i < insertDataStruct.
columnIds.size(); i++) {
849 auto columnId = insertDataStruct.
columnIds[i];
850 auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
852 CHECK(columnMap_.find(columnId) != columnMap_.end());
854 ChunkKey chunkKey = chunkKeyPrefix_;
855 chunkKey.push_back(columnId);
856 chunkKey.push_back(fragmentInfo->fragmentId);
858 auto colMapIt = columnMap_.find(columnId);
859 auto& chunk = colMapIt->second;
860 if (chunk.isChunkOnDevice(
864 fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
865 dataMgr_->deleteChunksWithPrefix(chunkKey);
867 chunk.createChunkBuffer(
871 fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
876 auto size = colDesc->columnType.get_size();
878 std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
879 varLenColInfo_[columnId] = 0;
880 numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
881 dataCopy, numRowsToInsert, 0, maxChunkSize_,
true);
883 numRowsCanBeInserted = maxChunkSize_ / size;
887 if (numRowsCanBeInserted < numRowsToInsert) {
888 throw std::runtime_error(
"new column '" + colDesc->columnName +
889 "' wider than existing columns is not supported");
892 auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0,
true);
893 fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
897 std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
898 varLenColInfo_[columnId] = chunk.getBuffer()->size();
901 dataMgr_->deleteChunksWithPrefix(chunkKey);
905 numRowsLeft -= numRowsToInsert;
907 CHECK(0 == numRowsLeft);
908 }
catch (
const std::exception& e) {
909 for (
const auto columnId : insertDataStruct.
columnIds) {
910 columnMap_.erase(columnId);
915 for (
auto const& fragmentInfo : fragmentInfoVec_) {
916 fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
920 void InsertOrderFragmenter::dropColumns(
const std::vector<int>& columnIds) {
925 for (
auto const& fragmentInfo : fragmentInfoVec_) {
926 fragmentInfo->shadowChunkMetadataMap =
927 fragmentInfo->getChunkMetadataMapPhysicalCopy();
930 for (
const auto columnId : columnIds) {
931 auto cit = columnMap_.find(columnId);
932 if (columnMap_.end() != cit) {
933 columnMap_.erase(cit);
936 vector<int> fragPrefix = chunkKeyPrefix_;
937 fragPrefix.push_back(columnId);
938 dataMgr_->deleteChunksWithPrefix(fragPrefix);
940 for (
const auto& fragmentInfo : fragmentInfoVec_) {
941 auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
942 if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
943 fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
947 for (
const auto& fragmentInfo : fragmentInfoVec_) {
948 fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
952 bool InsertOrderFragmenter::hasDeletedRows(
const int delete_column_id) {
955 for (
auto const& fragment : fragmentInfoVec_) {
956 auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
957 CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
958 const auto& chunk_stats = chunk_meta_it->second->chunkStats;
959 if (chunk_stats.max.tinyintval == 1) {
966 void InsertOrderFragmenter::insertChunksIntoFragment(
968 const std::optional<int> delete_column_id,
970 const size_t num_rows_to_insert,
971 size_t& num_rows_inserted,
972 size_t& num_rows_left,
973 std::vector<size_t>& valid_row_indices,
974 const size_t start_fragment) {
977 auto insert_row_indices = valid_row_indices;
978 CHECK_GE(insert_row_indices.size(), num_rows_to_insert);
979 insert_row_indices.erase(insert_row_indices.begin() + num_rows_to_insert,
980 insert_row_indices.end());
981 CHECK_EQ(insert_row_indices.size(), num_rows_to_insert);
982 for (
auto& [column_id, chunk] : insert_chunks.
chunks) {
983 auto col_map_it = columnMap_.find(column_id);
984 CHECK(col_map_it != columnMap_.end());
986 col_map_it->second.appendEncodedDataAtIndices(*chunk, insert_row_indices);
987 auto var_len_col_info_it = varLenColInfo_.find(column_id);
988 if (var_len_col_info_it != varLenColInfo_.end()) {
989 var_len_col_info_it->second = col_map_it->second.getBuffer()->size();
990 CHECK_LE(var_len_col_info_it->second, maxChunkSize_);
993 if (hasMaterializedRowId_) {
994 size_t start_id = maxFragmentRows_ * current_fragment->
fragmentId +
996 std::vector<int64_t> row_id_data(num_rows_to_insert);
997 for (
size_t i = 0; i < num_rows_to_insert; ++i) {
998 row_id_data[i] = i + start_id;
1001 row_id_block.
numbersPtr =
reinterpret_cast<int8_t*
>(row_id_data.data());
1002 auto col_map_it = columnMap_.find(rowIdColId_);
1003 CHECK(col_map_it != columnMap_.end());
1005 row_id_block, num_rows_to_insert, num_rows_inserted);
1008 if (delete_column_id) {
1009 std::vector<int8_t> delete_data(num_rows_to_insert,
false);
1011 delete_block.
numbersPtr =
reinterpret_cast<int8_t*
>(delete_data.data());
1012 auto col_map_it = columnMap_.find(*delete_column_id);
1013 CHECK(col_map_it != columnMap_.end());
1015 col_map_it->second.appendData(
1016 delete_block, num_rows_to_insert, num_rows_inserted);
1020 fragmentInfoVec_.back()->getPhysicalNumTuples() + num_rows_to_insert;
1021 num_rows_left -= num_rows_to_insert;
1022 num_rows_inserted += num_rows_to_insert;
1023 for (
auto part_it = fragmentInfoVec_.begin() + start_fragment;
1024 part_it != fragmentInfoVec_.end();
1026 auto fragment_ptr = part_it->get();
1027 fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
1028 fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
1032 valid_row_indices.erase(valid_row_indices.begin(),
1033 valid_row_indices.begin() + num_rows_to_insert);
1036 void InsertOrderFragmenter::insertChunksImpl(
const InsertChunks& insert_chunks) {
1037 std::optional<int> delete_column_id{std::nullopt};
1038 for (
const auto& cit : columnMap_) {
1039 if (cit.second.getColumnDesc()->isDeletedCol) {
1040 delete_column_id = cit.second.getColumnDesc()->columnId;
1046 std::optional<size_t> num_rows{std::nullopt};
1047 for (
const auto& [column_id, chunk] : insert_chunks.
chunks) {
1048 auto buffer = chunk->getBuffer();
1050 CHECK(buffer->hasEncoder());
1051 if (!num_rows.has_value()) {
1052 num_rows = buffer->getEncoder()->getNumElems();
1054 CHECK_EQ(num_rows.value(), buffer->getEncoder()->getNumElems());
1059 size_t num_rows_left = valid_row_indices.size();
1060 size_t num_rows_inserted = 0;
1062 if (num_rows_left == 0) {
1070 if (fragmentInfoVec_.empty()) {
1071 current_fragment = createNewFragment(defaultInsertLevel_);
1073 current_fragment = fragmentInfoVec_.back().get();
1075 CHECK(current_fragment);
1077 size_t start_fragment = fragmentInfoVec_.size() - 1;
1079 while (num_rows_left > 0) {
1081 CHECK_LE(current_fragment->shadowNumTuples, maxFragmentRows_);
1082 size_t rows_left_in_current_fragment =
1083 maxFragmentRows_ - current_fragment->shadowNumTuples;
1093 if (rows_left_in_current_fragment == 0 || num_rows_to_insert == 0) {
1094 current_fragment = createNewFragment(defaultInsertLevel_);
1095 if (num_rows_inserted == 0) {
1098 rows_left_in_current_fragment = maxFragmentRows_;
1099 for (
auto& varLenColInfoIt : varLenColInfo_) {
1100 varLenColInfoIt.second = 0;
1112 CHECK_GT(num_rows_to_insert,
size_t(0));
1115 insertChunksIntoFragment(insert_chunks,
1124 numTuples_ += *num_rows;
1125 dropFragmentsToSizeNoInsertLock(maxRows_);
1128 void InsertOrderFragmenter::insertDataImpl(
InsertData& insert_data) {
1130 std::unique_ptr<int8_t[]> data_for_deleted_column;
1131 for (
const auto& cit : columnMap_) {
1132 if (cit.second.getColumnDesc()->isDeletedCol) {
1133 data_for_deleted_column.reset(
new int8_t[insert_data.
numRows]);
1134 memset(data_for_deleted_column.get(), 0, insert_data.
numRows);
1135 insert_data.
data.emplace_back(
DataBlockPtr{data_for_deleted_column.get()});
1136 insert_data.
columnIds.push_back(cit.second.getColumnDesc()->columnId);
1142 std::unordered_map<int, int> inverseInsertDataColIdMap;
1143 for (
size_t insertId = 0; insertId < insert_data.
columnIds.size(); ++insertId) {
1144 inverseInsertDataColIdMap.insert(
1145 std::make_pair(insert_data.
columnIds[insertId], insertId));
1148 size_t numRowsLeft = insert_data.
numRows;
1149 size_t numRowsInserted = 0;
1150 vector<DataBlockPtr> dataCopy =
1153 if (numRowsLeft <= 0) {
1161 if (fragmentInfoVec_.empty()) {
1162 currentFragment = createNewFragment(defaultInsertLevel_);
1164 currentFragment = fragmentInfoVec_.back().get();
1166 CHECK(currentFragment);
1168 size_t startFragment = fragmentInfoVec_.size() - 1;
1170 while (numRowsLeft > 0) {
1172 CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
1173 size_t rowsLeftInCurrentFragment =
1174 maxFragmentRows_ - currentFragment->shadowNumTuples;
1175 size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
1176 if (rowsLeftInCurrentFragment != 0) {
1177 for (
auto& varLenColInfoIt : varLenColInfo_) {
1178 CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
1179 size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
1180 auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
1181 if (insertIdIt != inverseInsertDataColIdMap.end()) {
1182 auto colMapIt = columnMap_.find(varLenColInfoIt.first);
1183 numRowsToInsert = std::min(numRowsToInsert,
1184 colMapIt->second.getNumElemsForBytesInsertData(
1185 dataCopy[insertIdIt->second],
1189 insert_data.
is_default[insertIdIt->second]));
1194 if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
1195 currentFragment = createNewFragment(defaultInsertLevel_);
1196 if (numRowsInserted == 0) {
1199 rowsLeftInCurrentFragment = maxFragmentRows_;
1200 for (
auto& varLenColInfoIt : varLenColInfo_) {
1201 varLenColInfoIt.second = 0;
1203 numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
1204 for (
auto& varLenColInfoIt : varLenColInfo_) {
1205 CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
1206 size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
1207 auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
1208 if (insertIdIt != inverseInsertDataColIdMap.end()) {
1209 auto colMapIt = columnMap_.find(varLenColInfoIt.first);
1210 numRowsToInsert = std::min(numRowsToInsert,
1211 colMapIt->second.getNumElemsForBytesInsertData(
1212 dataCopy[insertIdIt->second],
1216 insert_data.
is_default[insertIdIt->second]));
1221 CHECK_GT(numRowsToInsert,
size_t(0));
1227 for (
size_t i = 0; i < insert_data.
columnIds.size(); ++i) {
1228 int columnId = insert_data.
columnIds[i];
1229 auto colMapIt = columnMap_.find(columnId);
1230 CHECK(colMapIt != columnMap_.end());
1231 currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
1232 dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.
is_default[i]);
1233 auto varLenColInfoIt = varLenColInfo_.find(columnId);
1234 if (varLenColInfoIt != varLenColInfo_.end()) {
1235 varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
1238 if (hasMaterializedRowId_) {
1239 size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
1240 currentFragment->shadowNumTuples;
1241 auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
1242 for (
size_t i = 0; i < numRowsToInsert; ++i) {
1243 row_id_data[i] = i + startId;
1246 rowIdBlock.
numbersPtr =
reinterpret_cast<int8_t*
>(row_id_data.get());
1247 auto colMapIt = columnMap_.find(rowIdColId_);
1248 currentFragment->shadowChunkMetadataMap[rowIdColId_] =
1249 colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
1252 currentFragment->shadowNumTuples =
1253 fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
1254 numRowsLeft -= numRowsToInsert;
1255 numRowsInserted += numRowsToInsert;
1256 for (
auto partIt = fragmentInfoVec_.begin() + startFragment;
1257 partIt != fragmentInfoVec_.end();
1259 auto fragment_ptr = partIt->get();
1260 fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
1261 fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
1265 numTuples_ += insert_data.
numRows;
1266 dropFragmentsToSizeNoInsertLock(maxRows_);
1274 auto newFragmentInfo = std::make_unique<FragmentInfo>();
1275 newFragmentInfo->
fragmentId = maxFragmentId_;
1276 newFragmentInfo->shadowNumTuples = 0;
1277 newFragmentInfo->setPhysicalNumTuples(0);
1278 for (
const auto levelSize : dataMgr_->levelSizes_) {
1280 physicalTableId_, newFragmentInfo->fragmentId, levelSize));
1282 newFragmentInfo->physicalTableId = physicalTableId_;
1283 newFragmentInfo->shard = shard_;
1285 for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
1286 colMapIt != columnMap_.end();
1288 auto& chunk = colMapIt->second;
1295 tracked_in_memory_chunks_.emplace_back(std::make_unique<Chunk_NS::Chunk>(chunk));
1297 ChunkKey chunkKey = chunkKeyPrefix_;
1298 chunkKey.push_back(chunk.getColumnDesc()->columnId);
1299 chunkKey.push_back(maxFragmentId_);
1300 chunk.createChunkBuffer(dataMgr_,
1303 newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
1305 chunk.initEncoder();
1309 fragmentInfoVec_.push_back(std::move(newFragmentInfo));
1310 return fragmentInfoVec_.back().get();
1313 size_t InsertOrderFragmenter::getNumFragments() {
1315 return fragmentInfoVec_.size();
1323 bool fragmentsExist =
false;
1324 if (fragmentInfoVec_.empty()) {
1327 int maxFragmentId = 0;
1329 emptyFragmentInfo.
fragmentId = maxFragmentId;
1332 emptyFragmentInfo.
deviceIds.resize(dataMgr_->levelSizes_.size());
1334 emptyFragmentInfo.
shard = shard_;
1335 queryInfo.
fragments.push_back(emptyFragmentInfo);
1337 fragmentsExist =
true;
1339 fragmentInfoVec_.begin(),
1340 fragmentInfoVec_.end(),
1341 [&queryInfo](
const auto& fragment_owned_ptr) {
1342 queryInfo.
fragments.emplace_back(*fragment_owned_ptr);
1347 auto partIt = queryInfo.
fragments.begin();
1348 if (fragmentsExist) {
1349 while (partIt != queryInfo.
fragments.end()) {
1350 if (partIt->getPhysicalNumTuples() == 0) {
1355 partIt = queryInfo.
fragments.erase(partIt);
1358 partIt->getPhysicalNumTuples());
1369 void InsertOrderFragmenter::resetSizesFromFragments() {
1372 for (
const auto& fragment_info : fragmentInfoVec_) {
1373 numTuples_ += fragment_info->getPhysicalNumTuples();
1375 setLastFragmentVarLenColumnSizes();
1378 void InsertOrderFragmenter::alterColumnGeoType(
1381 src_dst_column_pairs) {
1383 !uses_foreign_storage_)
1384 <<
"`alterColumnTypeTransactional` only supported for regular tables";
1387 for (
const auto& [src_cd, dst_columns] : src_dst_column_pairs) {
1388 auto logical_geo_column = *dst_columns.begin();
1389 CHECK(logical_geo_column->columnType.is_geometry());
1395 for (
const auto& fragment_info : fragmentInfoVec_) {
1396 int device_id = fragment_info->deviceIds[
static_cast<int>(defaultInsertLevel_)];
1397 auto num_elements = fragment_info->chunkMetadataMap[src_cd->columnId]->numElements;
1401 std::list<const ColumnDescriptor*> columns = dst_columns;
1402 GeoAlterColumnContext alter_column_context{device_id,
1404 fragment_info.get(),
1406 *dst_columns.begin(),
1413 alter_column_context.readSourceData();
1415 alter_column_context.createScratchBuffers();
1417 ScopeGuard delete_temp_chunk = [&] { alter_column_context.deleteScratchBuffers(); };
1419 const bool geo_validate_geometry =
false;
1420 alter_column_context.encodeData(geo_validate_geometry);
1422 alter_column_context.putBuffersToDisk();
1427 void InsertOrderFragmenter::alterNonGeoColumnType(
1428 const std::list<const ColumnDescriptor*>& columns) {
1430 !uses_foreign_storage_)
1431 <<
"`alterColumnTypeTransactional` only supported for regular tables";
1435 for (
const auto dst_cd : columns) {
1436 auto col_it = columnMap_.find(dst_cd->columnId);
1437 CHECK(col_it != columnMap_.end());
1439 auto src_cd = col_it->second.getColumnDesc();
1440 CHECK_EQ(col_it->first, src_cd->columnId);
1447 for (
const auto& fragment_info : fragmentInfoVec_) {
1448 int device_id = fragment_info->deviceIds[
static_cast<int>(defaultInsertLevel_)];
1449 auto num_elements = fragment_info->chunkMetadataMap[src_cd->columnId]->numElements;
1451 NonGeoAlterColumnContext alter_column_context{device_id,
1453 fragment_info.get(),
1461 alter_column_context.readSourceData();
1463 alter_column_context.createScratchBuffers();
1465 ScopeGuard delete_temp_chunk = [&] { alter_column_context.deleteScratchBuffers(); };
1467 alter_column_context.reencodeData();
1469 alter_column_context.putBuffersToDisk();
1474 void InsertOrderFragmenter::setLastFragmentVarLenColumnSizes() {
1475 if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
1478 int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
1481 fragmentInfoVec_.back()->deviceIds[
static_cast<int>(defaultInsertLevel_)];
1482 for (
auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
1483 ChunkKey insertKey = chunkKeyPrefix_;
1484 insertKey.push_back(colIt->first);
1485 insertKey.push_back(lastFragmentId);
1486 colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
1487 auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
1488 if (varLenColInfoIt != varLenColInfo_.end()) {
1489 varLenColInfoIt->second = colIt->second.getBuffer()->size();
std::lock_guard< T > lock_guard
int32_t maxRollbackEpochs
bool g_use_table_device_offset
AbstractBuffer * getIndexBuf() const
std::vector< int > ChunkKey
ArrayElemTypeChunk scalar_temp_chunk_
void createChunkScratchBuffer(Chunk_NS::Chunk &chunk)
static void unpinChunk(Chunk &chunk)
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
const size_t num_elements_
class for a per-database catalog. also includes metadata for the current database and the current use...
CompareResult compare_column_descriptors(const ColumnDescriptor *lhs, const ColumnDescriptor *rhs)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
std::vector< int > deviceIds
Catalog_Namespace::Catalog * catalog_
void setIndexBuffer(AbstractBuffer *ib)
Fragmenter_Namespace::FragmentInfo * fragment_info_
const ChunkKey & chunk_key_prefix_
Chunk_NS::Chunk disk_level_src_chunk_
DEVICE void sort(ARGS &&...args)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
std::vector< bool > is_default
std::unique_ptr< BaseConvertEncoder > create_string_view_encoder(ConversionFactoryParam ¶m, const bool error_tracking_enabled, const bool geo_validate_geometry)
std::vector< FragmentInfo > fragments
std::vector< int > chunkKeyPrefix
const ColumnDescriptor * src_cd_
void createScratchBuffers()
const std::list< const ColumnDescriptor * > & dst_columns_
std::string show_chunk(const ChunkKey &key)
void setBuffer(AbstractBuffer *b)
int tableId
identifies the database into which the data is being inserted
std::shared_lock< T > shared_lock
size_t getPhysicalNumTuples() const
size_t numRows
a vector of column ids for the row(s) being inserted
const ColumnDescriptor * getColumnDesc() const
std::unique_ptr< data_conversion::BaseSource > source
const ColumnDescriptor * dst_cd_
AbstractBuffer * index_buffer_
Data_Namespace::DataMgr * data_mgr_
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
std::unique_lock< T > unique_lock
void createScratchBuffers()
data_conversion::ConversionFactoryParam param_
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
void deleteScratchBuffers()
void freeChunkScratchBuffer(Chunk_NS::Chunk &chunk)
NonGeoAlterColumnContext(int device_id, const ChunkKey &chunk_key_prefix, Fragmenter_Namespace::FragmentInfo *fragment_info, const ColumnDescriptor *src_cd, const ColumnDescriptor *dst_cd, const size_t num_elements, Data_Namespace::DataMgr *data_mgr, Catalog_Namespace::Catalog *catalog_, std::map< int, Chunk_NS::Chunk > &column_map)
std::map< int, Chunk_NS::Chunk > & column_map_
std::vector< DataBlockPtr > data
the number of rows being inserted
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
AbstractBuffer * getBuffer() const
ChunkKey get_chunk_key(const ChunkKey &prefix, int column_id, int fragment_id)
std::map< int, std::shared_ptr< Chunk_NS::Chunk > > chunks
Chunk_NS::Chunk mem_level_src_chunk_
std::vector< size_t > valid_row_indices
bool g_enable_watchdog false
void setPhysicalNumTuples(const size_t physNumTuples)
#define DROP_FRAGMENT_FACTOR
void deleteScratchBuffers()
void setPhysicalNumTuples(const size_t physNumTuples)
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
The data to be inserted using the fragment manager.
void create_array_elem_type_chunk(ArrayElemTypeChunk &array_chunk, const ColumnDescriptor *array_cd)
bool is_dict_encoded_string() const
bool is_varlen_indeed() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
std::unique_ptr< BaseSource > create_source(const Chunk_NS::Chunk &input, const int db_id)
void encodeData(const bool geo_validate_geometry)
BaseAlterColumnContext(int device_id, const ChunkKey &chunk_key_prefix, Fragmenter_Namespace::FragmentInfo *fragment_info, const ColumnDescriptor *src_cd, const ColumnDescriptor *dst_cd, const size_t num_elements, Data_Namespace::DataMgr *data_mgr, Catalog_Namespace::Catalog *catalog, std::map< int, Chunk_NS::Chunk > &column_map)
SQLTypeInfo get_elem_type() const
int32_t max_rollback_epochs
std::vector< int > columnIds
identifies the table into which the data is being inserted
ChunkMetadataMap shadowChunkMetadataMap
size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment, const size_t num_rows_left, const size_t num_rows_inserted, const std::unordered_map< int, size_t > &var_len_col_info, const size_t max_chunk_size, const InsertChunks &insert_chunks, std::map< int, Chunk_NS::Chunk > &column_map, const std::vector< size_t > &valid_row_indices)
GeoAlterColumnContext(int device_id, const ChunkKey &chunk_key_prefix, Fragmenter_Namespace::FragmentInfo *fragment_info, const ColumnDescriptor *src_cd, const ColumnDescriptor *dst_cd, const size_t num_elements, Data_Namespace::DataMgr *data_mgr, Catalog_Namespace::Catalog *catalog, std::map< int, Chunk_NS::Chunk > &column_map, const std::list< const ColumnDescriptor * > &columns)
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)