22 #include <boost/variant.hpp>
23 #include <boost/variant/get.hpp>
40 namespace Fragmenter_Namespace {
43 for (
auto& t : threads) {
58 const int fragment_id,
59 const std::vector<uint64_t>& frag_offsets,
69 std::vector<ScalarTargetValue>(1, rhs_value),
79 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
80 for (
int cid = 1, nc = 0; nc < td->
nColumns; ++cid) {
83 if (!cd->isVirtualCol) {
93 chunk_meta_it->second->numBytes,
94 chunk_meta_it->second->numElements);
95 chunks.push_back(chunk);
112 template <
typename BUFFER_DATA_TYPE,
typename INSERT_DATA_TYPE>
115 std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
133 auto insert_value =
static_cast<INSERT_DATA_TYPE
>(buffer_value);
140 insertData.
data.push_back(dataBlock);
155 column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
175 insertData.
data.push_back(dataBlock);
195 size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
212 column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
222 size_t src_value_size =
225 (*column_data_)[row] = std::string((
const char*)src_value_ptr, src_value_size);
231 insertData.
data.push_back(dataBlock);
236 template <
typename BUFFER_DATA_TYPE>
238 using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
248 reinterpret_cast<int64_t*>(
checked_malloc(num_rows *
sizeof(int64_t))));
256 auto insert_value =
static_cast<int64_t
>(buffer_value);
263 insertData.
data.push_back(dataBlock);
271 const int fragmentId,
272 const std::vector<TargetMetaInfo> sourceMetaInfo,
273 const std::vector<const ColumnDescriptor*> columnDescriptors,
275 const size_t indexOffFragmentOffsetColumn,
278 Executor* executor) {
285 size_t num_rows = sourceDataProvider.
getRowCount();
295 auto& fragment = *fragment_ptr;
296 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
297 get_chunks(catalog, td, fragment, memoryLevel, chunks);
298 std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
299 columnDescriptors.size());
300 std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
301 size_t indexOfDeletedColumn{0};
302 std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
303 for (
size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
304 auto chunk = chunks[indexOfChunk];
305 const auto chunk_cd = chunk->getColumnDesc();
307 if (chunk_cd->isDeletedCol) {
308 indexOfDeletedColumn = chunk_cd->columnId;
309 deletedChunk = chunk;
313 auto targetColumnIt = std::find_if(columnDescriptors.begin(),
314 columnDescriptors.end(),
316 return cd->columnId == chunk_cd->columnId;
319 if (targetColumnIt != columnDescriptors.end()) {
320 auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
322 auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
323 auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
330 targetDescriptor->columnType,
331 !targetDescriptor->columnType.get_notnull(),
334 sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
335 ? executor->getStringDictionaryProxy(
336 sourceDataMetaInfo.get_type_info().getStringDictKey(),
337 executor->getRowSetMemoryOwner(),
340 auto converter = factory.
create(param);
341 sourceDataConverters[indexOfTargetColumn] = std::move(converter);
343 if (targetDescriptor->columnType.is_geometry()) {
346 switch (targetDescriptor->columnType.get_type()) {
368 if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
369 std::unique_ptr<ChunkToInsertDataConverter> converter;
371 if (chunk_cd->columnType.is_fixlen_array()) {
373 std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
374 }
else if (chunk_cd->columnType.is_string()) {
375 converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
376 }
else if (chunk_cd->columnType.is_geometry()) {
378 converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
380 converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
383 chunkConverters.push_back(std::move(converter));
385 }
else if (chunk_cd->columnType.is_date_in_days()) {
393 std::unique_ptr<ChunkToInsertDataConverter> converter;
394 const size_t physical_size = chunk_cd->columnType.get_size();
395 if (physical_size == 2) {
397 std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
398 }
else if (physical_size == 4) {
400 std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
404 chunkConverters.push_back(std::move(converter));
406 std::unique_ptr<ChunkToInsertDataConverter> converter;
408 int logical_size = logical_type.
get_size();
409 int physical_size = chunk_cd->columnType.get_size();
413 logical_size = physical_size;
416 if (8 == physical_size) {
417 converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
418 num_rows, chunk.get());
419 }
else if (4 == physical_size) {
420 if (8 == logical_size) {
421 converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
422 num_rows, chunk.get());
424 converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
425 num_rows, chunk.get());
427 }
else if (2 == chunk_cd->columnType.get_size()) {
428 if (8 == logical_size) {
429 converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
430 num_rows, chunk.get());
431 }
else if (4 == logical_size) {
432 converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
433 num_rows, chunk.get());
435 converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
436 num_rows, chunk.get());
438 }
else if (1 == chunk_cd->columnType.get_size()) {
439 if (8 == logical_size) {
440 converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
441 num_rows, chunk.get());
442 }
else if (4 == logical_size) {
443 converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
444 num_rows, chunk.get());
445 }
else if (2 == logical_size) {
446 converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
447 num_rows, chunk.get());
449 converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
450 num_rows, chunk.get());
456 chunkConverters.push_back(std::move(converter));
465 bool* deletedChunkBuffer =
466 reinterpret_cast<bool*
>(deletedChunk->getBuffer()->getMemoryPtr());
468 std::atomic<size_t> row_idx{0};
470 auto row_converter = [&sourceDataProvider,
471 &sourceDataConverters,
472 &indexOffFragmentOffsetColumn,
475 &row_idx](
size_t indexOfEntry) ->
void {
477 const auto row = sourceDataProvider.
getEntryAt(indexOfEntry);
482 size_t indexOfRow = row_idx.fetch_add(1);
484 for (
size_t col = 0; col < sourceDataConverters.size(); col++) {
485 if (sourceDataConverters[col]) {
486 const auto& mapd_variant = row[col];
487 sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
492 indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
493 auto indexInChunkBuffer = *
checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
496 for (
size_t idx = 0; idx < chunkConverters.size(); idx++) {
497 chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
501 deletedChunkBuffer[indexInChunkBuffer] =
true;
504 bool can_go_parallel = num_rows > 20000;
506 if (can_go_parallel) {
508 std::vector<std::future<void>> worker_threads;
511 stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
512 i < num_worker_threads && start_entry < num_entries;
513 ++i, start_entry += stride) {
514 const auto end_entry = std::min(start_entry + stride, num_rows);
517 [&row_converter](
const size_t start,
const size_t end) {
518 for (
size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
519 row_converter(indexOfRow);
526 for (
auto& child : worker_threads) {
531 for (
size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
532 row_converter(entryIdx);
540 for (
size_t i = 0; i < chunkConverters.size(); i++) {
541 chunkConverters[i]->addDataBlocksToInsertData(insert_data);
545 for (
size_t i = 0; i < sourceDataConverters.size(); i++) {
546 if (sourceDataConverters[i]) {
547 sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
552 insert_data.
numRows = num_rows;
558 updelRoll.
getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
559 chunkMetadata->chunkStats.max.boolval = 1;
563 if (!deletedChunk->getBuffer()->hasEncoder()) {
564 deletedChunk->initEncoder();
566 deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(
true),
false);
568 if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
571 deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
573 deletedChunk->getBuffer()->setUpdated();
579 int64_t
const updated_val,
580 int64_t
const old_val,
581 NullSentinelSupplier s = NullSentinelSupplier()) {
605 double const updated_val,
606 double const old_val,
607 NullSentinelSupplier s = NullSentinelSupplier()) {
643 const int fragment_id,
644 const std::vector<uint64_t>& frag_offsets,
645 const std::vector<ScalarTargetValue>& rhs_values,
654 const auto nrow = frag_offsets.size();
655 const auto n_rhs_values = rhs_values.size();
659 CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
662 auto& fragment = *fragment_ptr;
663 auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->
columnId);
664 CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
672 chunk_meta_it->second->numBytes,
673 chunk_meta_it->second->numElements);
675 std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
678 std::vector<std::future<void>> threads;
680 const auto segsz = (nrow + ncore - 1) / ncore;
681 auto dbuf = chunk->getBuffer();
682 auto dbuf_addr = dbuf->getMemoryPtr();
685 for (
size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
701 lhs_type, &decimalOverflowValidator);
704 lhs_type, &dateDaysOverflowValidator);
712 stringDict = dictDesc->stringDict.get();
716 for (
size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
717 const auto roffs = frag_offsets[r];
719 auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
732 if (
const auto vp = boost::get<int64_t>(sv)) {
735 if (
nullptr == dictDesc) {
736 throw std::runtime_error(
737 "UPDATE does not support cast from string literal to string "
740 auto stringDict = dictDesc->stringDict.get();
747 if (
const auto vp = boost::get<int64_t>(sv)) {
750 throw std::runtime_error(
"UPDATE does not support cast to string.");
753 get_scalar<int64_t>(data_ptr, lhs_type, old_val);
756 if (lhs_type.is_date_in_days()) {
759 put_scalar<int64_t>(data_ptr, lhs_type, v, cd->
columnName, &rhs_type);
760 if (lhs_type.is_decimal()) {
761 nullAwareDecimalOverflowValidator.
validate<int64_t>(v);
763 get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
764 int64_t target_value = (v == inline_int_null_value<int64_t>() &&
765 lhs_type.get_notnull() ==
false)
769 lhs_type, update_stats_per_thread[c], target_value, old_val);
770 auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
771 auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
772 if (positive_v_and_negative_d || negative_v_and_positive_d) {
773 throw std::runtime_error(
781 if (lhs_type.is_date_in_days()) {
783 if (lhs_type.get_size() == 2) {
784 nullAwareDateOverflowValidator.
validate<int16_t>(v);
786 nullAwareDateOverflowValidator.
validate<int32_t>(v);
789 get_scalar<int64_t>(data_ptr, lhs_type, days);
791 int64_t target_value = (v == inline_int_null_value<int64_t>() &&
792 lhs_type.get_notnull() ==
false)
793 ? NullSentinelSupplier()(lhs_type, v)
796 lhs_type, update_stats_per_thread[c], target_value, old_val);
798 int64_t target_value;
805 lhs_type, update_stats_per_thread[c], target_value, old_val);
810 update_stats_per_thread[c],
817 }
else if (
const auto vp = boost::get<double>(sv)) {
820 throw std::runtime_error(
"UPDATE does not support cast to string.");
823 get_scalar<double>(data_ptr, lhs_type, old_val);
824 put_scalar<double>(data_ptr, lhs_type, v, cd->
columnName);
825 if (lhs_type.is_integer()) {
827 lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
828 }
else if (lhs_type.is_fp()) {
830 lhs_type, update_stats_per_thread[c],
double(v),
double(old_val));
832 UNREACHABLE() <<
"Unexpected combination of a non-floating or integer "
833 "LHS with a floating RHS.";
835 }
else if (
const auto vp = boost::get<float>(sv)) {
838 throw std::runtime_error(
"UPDATE does not support cast to string.");
841 get_scalar<float>(data_ptr, lhs_type, old_val);
842 put_scalar<float>(data_ptr, lhs_type, v, cd->
columnName);
843 if (lhs_type.is_integer()) {
845 lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
847 update_metadata(lhs_type, update_stats_per_thread[c],
double(v), old_val);
849 }
else if (
const auto vp = boost::get<NullableString>(sv)) {
850 const auto s = boost::get<std::string>(vp);
851 const auto sval = s ? *s : std::string(
"");
853 decltype(stringDict->getOrAdd(sval)) sidx;
856 sidx = stringDict->getOrAdd(sval);
859 get_scalar<int64_t>(data_ptr, lhs_type, old_val);
860 put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->
columnName);
862 lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
863 }
else if (sval.size() > 0) {
864 auto dval = std::atof(sval.data());
866 dval = sval ==
"t" || sval ==
"true" || sval ==
"T" || sval ==
"True";
867 }
else if (lhs_type.
is_time()) {
868 throw std::runtime_error(
869 "Date/Time/Timestamp update not supported through translated "
874 get_scalar<double>(data_ptr, lhs_type, old_val);
875 put_scalar<double>(data_ptr, lhs_type, dval, cd->
columnName);
877 lhs_type, update_stats_per_thread[c],
double(dval), old_val);
880 get_scalar<int64_t>(data_ptr, lhs_type, old_val);
881 put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->
columnName);
883 lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
887 update_stats_per_thread[c].new_values_stats.has_null =
true;
904 if (deleted_offsets.size() > 0) {
905 compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
911 for (
size_t c = 0; c < ncore; ++c) {
913 update_stats_per_thread[c].new_values_stats);
915 update_stats_per_thread[c].old_values_stats);
918 CHECK_GT(fragment.shadowNumTuples,
size_t(0));
923 update_stats.
chunk = chunk;
930 std::shared_ptr<Chunk_NS::Chunk> chunk,
935 auto buffer = chunk->getBuffer();
938 auto encoder = buffer->getEncoder();
939 auto update_stats = [&encoder](
auto min,
auto max,
auto has_null) {
940 static_assert(std::is_same<decltype(min), decltype(max)>::value,
941 "Type mismatch on min/max");
943 encoder->updateStats(decltype(min)(),
true);
948 encoder->updateStats(min,
false);
949 encoder->updateStats(max,
false);
956 }
else if (lhs_type.is_fp()) {
960 }
else if (lhs_type.is_decimal()) {
962 (int64_t)(new_values_stats.
max_double * pow(10, lhs_type.get_scale())),
964 }
else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
965 !(lhs_type.is_string() &&
kENCODING_DICT != lhs_type.get_compression())) {
971 auto chunk_metadata =
973 buffer->getEncoder()->getMetadata(chunk_metadata);
981 auto& fragmentInfo = *key.second;
982 fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
983 fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
984 fragmentInfo.shadowNumTuples = updel_roll.
getNumTuple(key);
985 fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
992 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
994 for (
int col_id = 1, ncol = 0; ncol < td->
nColumns; ++col_id) {
997 if (!cd->isVirtualCol) {
1007 chunk_meta_it->second->numBytes,
1008 chunk_meta_it->second->numElements);
1009 chunks.push_back(chunk);
1018 const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1019 const auto data_buffer = chunk->getBuffer();
1020 const auto data_addr = data_buffer->getMemoryPtr();
1021 const size_t nrows_in_chunk = data_buffer->size();
1023 const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1024 std::vector<std::vector<uint64_t>> deleted_offsets;
1025 deleted_offsets.resize(ncore);
1026 std::vector<std::future<void>> threads;
1027 for (
size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1029 const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1030 const auto ithread = rbegin / segsz;
1031 CHECK(ithread < deleted_offsets.size());
1032 deleted_offsets[ithread].reserve(segsz);
1033 for (
size_t r = rbegin; r < rend; ++r) {
1035 deleted_offsets[ithread].push_back(r);
1041 std::vector<uint64_t> all_deleted_offsets;
1042 for (
size_t i = 0; i < ncore; ++i) {
1043 all_deleted_offsets.insert(
1044 all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1046 return all_deleted_offsets;
1049 template <
typename T>
1057 const auto is_null = get_scalar<T>(data_addr, col_type, v);
1059 has_null = has_null || (can_be_null &&
is_null);
1067 const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1068 const size_t nrows_to_keep,
1070 auto cd = chunk->getColumnDesc();
1072 auto data_buffer = chunk->getBuffer();
1073 auto chunkMetadata =
1075 chunkMetadata->numElements = nrows_to_keep;
1076 chunkMetadata->numBytes = data_buffer->size();
1082 const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1083 const std::vector<uint64_t>& frag_offsets) {
1084 const auto cd = chunk->getColumnDesc();
1085 const auto& col_type = cd->columnType;
1086 auto data_buffer = chunk->getBuffer();
1087 auto data_addr = data_buffer->getMemoryPtr();
1089 col_type.is_fixlen_array() ? col_type.get_size() :
get_element_size(col_type);
1090 int64_t irow_of_blk_to_keep = 0;
1091 int64_t irow_of_blk_to_fill = 0;
1092 size_t nbytes_fix_data_to_keep = 0;
1093 auto nrows_to_vacuum = frag_offsets.size();
1095 for (
size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1096 auto is_last_one = irow == nrows_to_vacuum;
1097 auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1098 auto maddr_to_vacuum = data_addr;
1099 int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1100 if (nrows_to_keep > 0) {
1101 auto nbytes_to_keep = nrows_to_keep * element_size;
1102 if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1104 memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1105 maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1108 irow_of_blk_to_fill += nrows_to_keep;
1109 nbytes_fix_data_to_keep += nbytes_to_keep;
1111 irow_of_blk_to_keep = irow_to_vacuum + 1;
1113 return nbytes_fix_data_to_keep;
1120 const std::vector<uint64_t>& frag_offsets,
1122 size_t fragment_row_count) {
1123 if (is_varlen_array) {
1124 size_t first_non_deleted_row_index{0};
1125 for (
auto deleted_offset : frag_offsets) {
1126 if (first_non_deleted_row_index < deleted_offset) {
1129 first_non_deleted_row_index++;
1132 CHECK_LT(first_non_deleted_row_index, fragment_row_count);
1133 if (first_non_deleted_row_index == 0) {
1136 return index_array[0];
1140 if (index_array[first_non_deleted_row_index + 1] < 0) {
1141 size_t first_non_zero_offset{0};
1142 for (
size_t i = 0; i <= first_non_deleted_row_index; i++) {
1143 if (index_array[i] != 0) {
1144 first_non_zero_offset = index_array[i];
1148 CHECK_GT(first_non_zero_offset, static_cast<size_t>(0));
1150 first_non_zero_offset);
1162 const std::vector<uint64_t>& frag_offsets,
1164 size_t fragment_row_count) {
1165 std::set<size_t> null_array_indexes;
1167 size_t frag_offset_index{0};
1168 size_t vacuum_offset{0};
1169 for (
size_t i = 0; i < fragment_row_count; i++) {
1170 if (frag_offset_index < frag_offsets.size() &&
1171 i == frag_offsets[frag_offset_index]) {
1172 frag_offset_index++;
1174 }
else if (index_array[i + 1] < 0) {
1175 null_array_indexes.emplace(i - vacuum_offset);
1179 return null_array_indexes;
1185 auto offset = index_array[index];
1188 CHECK(is_varlen_array);
1196 const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1197 const std::vector<uint64_t>& frag_offsets) {
1198 auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1199 auto data_buffer = chunk->getBuffer();
1201 auto index_buffer = chunk->getIndexBuf();
1202 CHECK(index_buffer);
1203 auto data_addr = data_buffer->getMemoryPtr();
1204 auto indices_addr = index_buffer->getMemoryPtr();
1205 CHECK(indices_addr);
1207 int64_t irow_of_blk_to_keep = 0;
1208 int64_t irow_of_blk_to_fill = 0;
1210 size_t null_padding =
1211 get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1212 size_t nbytes_var_data_to_keep = null_padding;
1214 chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1215 auto nrows_to_vacuum = frag_offsets.size();
1216 for (
size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1217 auto is_last_one = irow == nrows_to_vacuum;
1218 auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1219 auto maddr_to_vacuum = data_addr;
1220 int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1221 if (nrows_to_keep > 0) {
1222 auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1223 auto deleted_row_start_offset =
1225 auto kept_row_start_offset =
1227 auto nbytes_to_keep =
1228 (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1229 kept_row_start_offset;
1230 if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1231 if (nbytes_to_keep > 0) {
1234 memmove(data_addr + ibyte_var_data_to_keep,
1235 data_addr + kept_row_start_offset,
1239 const auto base_offset = kept_row_start_offset;
1240 for (int64_t i = 0; i < nrows_to_keep; ++i) {
1241 auto update_index = irow_of_blk_to_keep + i;
1243 index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1246 nbytes_var_data_to_keep += nbytes_to_keep;
1247 maddr_to_vacuum = indices_addr;
1249 constexpr
static auto index_element_size =
sizeof(
StringOffsetT);
1250 nbytes_to_keep = nrows_to_keep * index_element_size;
1251 if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1253 memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1254 maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1257 irow_of_blk_to_fill += nrows_to_keep;
1259 irow_of_blk_to_keep = irow_to_vacuum + 1;
1263 index_array[0] = null_padding;
1264 auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1265 index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1266 if (!is_varlen_array) {
1267 CHECK(null_array_indexes.empty());
1269 for (
auto index : null_array_indexes) {
1270 index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1272 return nbytes_var_data_to_keep;
1277 const int fragment_id,
1278 const std::vector<uint64_t>& frag_offsets,
1282 auto& fragment = *fragment_ptr;
1284 const auto ncol = chunks.size();
1286 std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1289 std::vector<std::future<void>> threads;
1290 auto nrows_to_vacuum = frag_offsets.size();
1291 auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1292 auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1294 for (
size_t ci = 0; ci < chunks.size(); ++ci) {
1295 auto chunk = chunks[ci];
1296 const auto cd = chunk->getColumnDesc();
1297 const auto& col_type = cd->columnType;
1298 auto data_buffer = chunk->getBuffer();
1299 auto index_buffer = chunk->getIndexBuf();
1300 auto data_addr = data_buffer->getMemoryPtr();
1301 auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() :
nullptr;
1303 bool is_varlen = col_type.is_varlen_indeed();
1305 auto fixlen_vacuum =
1306 [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1307 size_t nbytes_fix_data_to_keep;
1308 if (nrows_to_keep == 0) {
1309 nbytes_fix_data_to_keep = 0;
1314 data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1315 data_buffer->setSize(nbytes_fix_data_to_keep);
1316 data_buffer->setUpdated();
1320 auto daddr = data_addr;
1321 auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1323 data_buffer->getEncoder()->resetChunkStats();
1324 for (
size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1325 if (col_type.is_fixlen_array()) {
1329 encoder->updateMetadata((int8_t*)daddr);
1330 }
else if (col_type.is_fp()) {
1333 update_stats_per_thread[ci].new_values_stats.has_null,
1334 update_stats_per_thread[ci].new_values_stats.min_double,
1335 update_stats_per_thread[ci].new_values_stats.max_double);
1339 update_stats_per_thread[ci].new_values_stats.has_null,
1340 update_stats_per_thread[ci].new_values_stats.min_int64t,
1341 update_stats_per_thread[ci].new_values_stats.max_int64t);
1346 auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1347 size_t nbytes_var_data_to_keep;
1348 if (nrows_to_keep == 0) {
1349 nbytes_var_data_to_keep = 0;
1354 data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1355 data_buffer->setSize(nbytes_var_data_to_keep);
1356 data_buffer->setUpdated();
1358 index_buffer->setSize(
sizeof(*index_array) *
1359 (nrows_to_keep ? 1 + nrows_to_keep : 0));
1360 index_buffer->setUpdated();
1377 updel_roll.
setNumTuple({td, &fragment}, nrows_to_keep);
1378 for (
size_t ci = 0; ci < chunks.size(); ++ci) {
1379 auto chunk = chunks[ci];
1380 auto cd = chunk->getColumnDesc();
1381 if (!cd->columnType.is_fixlen_array()) {
1385 if (cd->columnType.is_date_in_days()) {
1386 auto&
stats = update_stats_per_thread[ci].new_values_stats;
1393 update_stats_per_thread[ci].new_values_stats,
1403 if (
nullptr == catalog) {
1419 dirty_chunks.clear();
1424 updateFragmenterAndCleanupChunks();
1431 CHECK(table_descriptor);
1432 auto table_id = table_descriptor->tableId;
1438 dirty_chunks.clear();
1441 updateFragmenterAndCleanupChunks();
1446 for (
auto& cm : chunk_metadata_map_per_fragment) {
1447 cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *
this);
1452 for (
const auto& [chunk_key, chunk] : dirty_chunks) {
1457 dirty_chunks.clear();
1461 if (
nullptr == catalog) {
1468 if (is_varlen_update) {
1470 auto table_epochs = catalog->
getTableEpochs(databaseId, logicalTableId);
1472 dirty_chunks.clear();
1478 for (
const auto& [chunk_key, chunk] : dirty_chunks) {
1480 chunk->setBuffer(
nullptr);
1487 int32_t fragment_id) {
1491 chunk->getColumnDesc()->tableId,
1492 chunk->getColumnDesc()->columnId,
1494 dirty_chunks[chunk_key] = chunk;
1502 if (chunk_metadata_map_per_fragment.count(key) == 0) {
1503 chunk_metadata_map_per_fragment[key] =
1506 if (num_tuples.count(key) == 0) {
1515 initializeUnsetMetadata(key.first, fragment_info);
1517 auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1518 CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1519 auto chunk_metadata_it = metadata_map_it->second.find(column_id);
1520 CHECK(chunk_metadata_it != metadata_map_it->second.end());
1521 return chunk_metadata_it->second;
1526 auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1527 CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1528 return metadata_map_it->second;
1533 auto it = num_tuples.find(key);
1534 CHECK(it != num_tuples.end());
1540 num_tuples[key] = num_tuple;
std::shared_ptr< Chunk_NS::Chunk > chunk
UpdateValuesStats new_values_stats
Data_Namespace::MemoryLevel memoryLevel
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
AbstractBuffer * getIndexBuf() const
std::vector< int > ChunkKey
void setNumTuple(const MetaDataKey &key, size_t num_tuple)
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
ChunkMetadataMap getChunkMetadataMapPhysicalCopy() const
int64_t fragment_rows_count
HOST DEVICE int get_size() const
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
size_t fixed_array_length_
bool is_varlen_array() const
Catalog_Namespace::Catalog * catalog_
class for a per-database catalog. also includes metadata for the current database and the current use...
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
std::vector< ArrayDatum > * arraysPtr
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Data_Namespace::DataMgr & getDataMgr() const
void checkpoint(const int db_id, const int tb_id)
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
~StringChunkConverter() override
HOST DEVICE int get_scale() const
const ColumnDescriptor * column_descriptor_
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
const Chunk_NS::Chunk * chunk_
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
std::vector< bool > is_default
size_t getPhysicalNumTuples() const
bool g_enable_auto_metadata_update
size_t get_element_size(const Type element_type)
void updateFragmenterAndCleanupChunks()
heavyai::shared_mutex fragmentInfoMutex_
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
~ArrayChunkConverter() override
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
std::shared_lock< T > shared_lock
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
size_t numRows
a vector of column ids for the row(s) being inserted
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const Chunk_NS::Chunk * chunk_
const ColumnDescriptor * getColumnDesc() const
int8_t * data_buffer_addr_
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
~DateChunkConverter() override
CONSTEXPR DEVICE bool is_null(const T &value)
const Catalog_Namespace::Catalog * catalog
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
void * checked_malloc(const size_t size)
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
bool is_timeinterval() const
const Chunk_NS::Chunk * chunk_
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
std::unique_lock< T > unique_lock
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getDatabaseId() const
~FixedLenArrayChunkConverter() override
int getLogicalTableId(const int physicalTableId) const
void initializeUnsetMetadata(const TableDescriptor *td, Fragmenter_Namespace::FragmentInfo &fragment_info)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
ColumnDataPtr column_data_
virtual ~ChunkToInsertDataConverter()
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void checkpoint(const int logicalTableId) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
HOST DEVICE EncodingType get_compression() const
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
AbstractBuffer * getBuffer() const
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
Data_Namespace::MemoryLevel persistenceLevel
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
ArrayOffsetT * index_buffer_addr_
boost::variant< std::string, void * > NullableString
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
HOST DEVICE int get_comp_param() const
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
size_t getNumTuple(const MetaDataKey &key) const
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
static bool unconditionalVacuum_
~ScalarChunkConverter() override
const StringOffsetT * index_buffer_addr_
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int64_t updated_rows_count
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
void setTableEpochsLogExceptions(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
void free(AbstractBuffer *buffer)
HOST DEVICE bool get_notnull() const
void set_minmax(T &min, T &max, T const val)
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
std::vector< int > columnIds
identifies the table into which the data is being inserted
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
UpdateValuesStats old_values_stats
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const int8_t * data_buffer_addr_
const Chunk_NS::Chunk * chunk_
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
ColumnDataPtr column_data_