72 const std::optional<Fragmenter_Namespace::ChunkUpdateStats>&
update_stats) {
77 CHECK(update_stats->chunk);
78 CHECK(update_stats->chunk->getBuffer());
79 CHECK(update_stats->chunk->getBuffer()->getEncoder());
81 auto chunk_metadata = std::make_shared<ChunkMetadata>();
82 update_stats->chunk->getBuffer()->getEncoder()->getMetadata(chunk_metadata);
83 auto cd = update_stats.value().chunk->getColumnDesc();
84 if (cd->columnType.is_fp()) {
86 if (cd->columnType.get_type() ==
kDOUBLE) {
87 min = chunk_metadata->chunkStats.min.doubleval;
88 max = chunk_metadata->chunkStats.max.doubleval;
89 }
else if (cd->columnType.get_type() ==
kFLOAT) {
90 min = chunk_metadata->chunkStats.min.floatval;
91 max = chunk_metadata->chunkStats.max.floatval;
182 bool varlen_update_required)
208 using OffsetVector = std::vector<uint64_t>;
209 using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
210 using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
213 auto callback = [
this, &update_parameters](
216 std::vector<const ColumnDescriptor*> columnDescriptors;
217 std::vector<TargetMetaInfo> sourceMetaInfos;
219 const auto& catalog = update_parameters.
getCatalog();
225 columnDescriptors.push_back(target_column);
230 auto* fragmenter = td->fragmenter.get();
233 fragmenter->updateColumns(
244 table_update_metadata.fragments_with_deleted_rows[td->tableId].emplace(
245 update_log.getFragmentId());
252 CHECK(rs->didOutputColumnar());
253 CHECK(rs->isDirectColumnarConversionPossible());
255 CHECK_EQ(rs->colCount(), size_t(1));
260 const auto& catalog = update_parameters.
getCatalog();
263 const auto table_lock =
269 const auto cd = catalog.getMetadataForColumn(
272 auto chunk_metadata =
273 fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
274 CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
275 ChunkKey chunk_key{catalog.getCurrentDB().dbId,
278 fragment_info.fragmentId};
280 &catalog.getDataMgr(),
284 chunk_metadata->second->numBytes,
285 chunk_metadata->second->numElements);
287 auto chunk_buffer = chunk->getBuffer();
290 auto encoder = chunk_buffer->getEncoder();
294 rs.get(), 0, cd->columnType, rs->rowCount());
295 auto buffer =
reinterpret_cast<int8_t*
>(owned_buffer.get());
297 const auto new_chunk_metadata =
298 encoder->appendData(buffer, rs->rowCount(), cd->columnType,
false, 0);
299 CHECK(new_chunk_metadata);
301 auto fragmenter = td->fragmenter.get();
306 auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
311 fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
312 fragment->shadowChunkMetadataMap =
313 fragment->getChunkMetadataMapPhysicalCopy();
315 auto& data_mgr = catalog.getDataMgr();
316 if (data_mgr.gpusPresent()) {
318 data_mgr.deleteChunksWithPrefix(chunk_key,
324 auto callback = [
this, &update_parameters](
329 if (rows_per_column == 0) {
333 OffsetVector column_offsets(rows_per_column);
334 ScalarTargetValueVector scalar_target_values(rows_per_column);
340 complete_entry_block_size = entries_per_column;
341 partial_row_block_size = 0;
345 std::atomic<size_t> row_idx{0};
348 [&update_parameters, &column_offsets, &scalar_target_values, &row_idx](
349 auto get_entry_at_func,
350 uint64_t column_index,
351 uint64_t entry_start,
352 uint64_t entry_count) -> uint64_t {
353 uint64_t entries_processed = 0;
354 for (uint64_t entry_index = entry_start;
355 entry_index < (entry_start + entry_count);
357 const auto& row = get_entry_at_func(entry_index);
363 size_t row_index = row_idx.fetch_add(1);
367 auto terminal_column_iter = std::prev(row.end());
368 const auto frag_offset_scalar_tv =
369 boost::get<ScalarTargetValue>(&*terminal_column_iter);
370 CHECK(frag_offset_scalar_tv);
372 column_offsets[row_index] =
373 static_cast<uint64_t
>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
374 scalar_target_values[row_index] =
375 boost::get<ScalarTargetValue>(row[column_index]);
377 return entries_processed;
381 [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
382 return (thread_index * complete_entry_block_size);
385 const auto& catalog = update_parameters.
getCatalog();
386 auto const* table_descriptor =
390 if (!table_descriptor) {
392 if (
auto proj_node = dynamic_cast<const RelProject*>(input_source_node)) {
393 if (proj_node->hasPushedDownWindowExpr() ||
394 proj_node->hasWindowFunctionExpr()) {
395 table_id = proj_node->getModifiedTableDescriptor()->tableId;
396 table_descriptor = catalog.getMetadataForTable(table_id);
400 CHECK(table_descriptor);
407 RowProcessingFuturesVector entry_processing_futures;
408 entry_processing_futures.reserve(usable_threads);
410 auto get_entry_at_func = [&update_log,
411 &column_index](
const size_t entry_index) {
419 for (
unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
420 entry_processing_futures.emplace_back(
422 std::forward<decltype(process_rows)>(process_rows),
426 complete_entry_block_size));
428 if (partial_row_block_size) {
429 entry_processing_futures.emplace_back(
431 std::forward<decltype(process_rows)>(process_rows),
434 get_row_index(usable_threads),
435 partial_row_block_size));
438 for (
auto& t : entry_processing_futures) {
442 CHECK(row_idx == rows_per_column);
443 const auto fragmenter = table_descriptor->fragmenter;
445 auto const* target_column = catalog.getMetadataForColumn(
447 CHECK(target_column);
449 fragmenter->updateColumn(&catalog,
454 scalar_target_values,
459 table_update_metadata.columns_for_metadata_update[target_column].emplace(
470 using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
474 const auto& catalog = delete_parameters.
getCatalog();
477 auto rs = update_log.getResultSet();
478 CHECK(rs->didOutputColumnar());
479 CHECK(rs->isDirectColumnarConversionPossible());
480 CHECK_EQ(rs->colCount(), size_t(1));
483 CHECK_EQ(rs->rowCount(), update_log.getRowCount());
485 const ChunkKey lock_chunk_key{catalog.getCurrentDB().dbId, logical_table_id};
486 const auto table_lock =
489 auto& fragment_info = update_log.getFragmentInfo();
490 const auto td = catalog.getMetadataForTable(update_log.getPhysicalTableId());
492 const auto cd = catalog.getDeletedColumn(td);
495 auto chunk_metadata =
496 fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
497 CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
498 ChunkKey chunk_key{catalog.getCurrentDB().dbId,
501 fragment_info.fragmentId};
503 &catalog.getDataMgr(),
507 chunk_metadata->second->numBytes,
508 chunk_metadata->second->numElements);
510 auto chunk_buffer = chunk->getBuffer();
513 auto encoder = chunk_buffer->getEncoder();
517 rs.get(), 0, cd->columnType, rs->rowCount());
518 auto buffer =
reinterpret_cast<int8_t*
>(owned_buffer.get());
520 const auto new_chunk_metadata =
521 encoder->appendData(buffer, rs->rowCount(), cd->columnType,
false, 0);
523 auto fragmenter = td->fragmenter.get();
528 auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
533 fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
534 fragment->shadowChunkMetadataMap =
535 fragment->getChunkMetadataMapPhysicalCopy();
537 auto& data_mgr = catalog.getDataMgr();
538 if (data_mgr.gpusPresent()) {
540 data_mgr.deleteChunksWithPrefix(chunk_key,
546 auto callback = [
this, &delete_parameters](
551 if (rows_per_column == 0) {
561 complete_row_block_size = rows_per_column;
562 partial_row_block_size = 0;
566 std::atomic<size_t> row_idx{0};
568 auto process_rows = [&update_log, &victim_offsets, &row_idx](
569 uint64_t entry_start, uint64_t entry_count) -> uint64_t {
570 uint64_t entries_processed = 0;
572 for (uint64_t entry_index = entry_start;
573 entry_index < (entry_start + entry_count);
575 auto const row(update_log.
getEntryAt(entry_index));
582 size_t row_index = row_idx.fetch_add(1);
584 auto terminal_column_iter = std::prev(row.end());
585 const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
588 uint64_t fragment_offset =
589 static_cast<uint64_t
>(*(boost::get<int64_t>(scalar_tv)));
590 victim_offsets[row_index] = fragment_offset;
592 return entries_processed;
596 [complete_row_block_size](uint64_t thread_index) -> uint64_t {
597 return thread_index * complete_row_block_size;
600 RowProcessingFuturesVector row_processing_futures;
601 row_processing_futures.reserve(usable_threads);
603 for (
unsigned i = 0; i < (unsigned)usable_threads; i++) {
604 row_processing_futures.emplace_back(
606 std::forward<decltype(process_rows)>(process_rows),
608 complete_row_block_size));
610 if (partial_row_block_size) {
611 row_processing_futures.emplace_back(
613 std::forward<decltype(process_rows)>(process_rows),
614 get_row_index(usable_threads),
615 partial_row_block_size));
618 for (
auto& t : row_processing_futures) {
622 const auto& catalog = delete_parameters.
getCatalog();
623 auto const* table_descriptor =
625 CHECK(table_descriptor);
627 auto* fragmenter = table_descriptor->fragmenter.get();
630 auto const* deleted_column_desc = catalog.getDeletedColumn(table_descriptor);
631 CHECK(deleted_column_desc);
632 fragmenter->updateColumn(&catalog,
641 table_update_metadata.fragments_with_deleted_rows[table_descriptor->tableId]
655 const auto padded_size = rs->getPaddedSlotWidthBytes(col_idx);
660 auto rs_buffer_size = padded_size * row_count;
661 auto rs_buffer = std::make_unique<int8_t[]>(rs_buffer_size);
662 rs->copyColumnIntoBuffer(col_idx, rs_buffer.get(), rs_buffer_size);
664 if (type_size < padded_size) {
668 auto src_ptr = rs_buffer.get();
669 auto dst_ptr = rs_buffer.get();
670 if (column_type.
is_fp()) {
672 CHECK(padded_size ==
sizeof(
double));
673 for (
size_t i = 0; i < row_count; i++) {
674 const auto old_val = *
reinterpret_cast<double*
>(may_alias_ptr(src_ptr));
675 auto new_val =
static_cast<float>(old_val);
676 std::memcpy(dst_ptr, &new_val, type_size);
677 dst_ptr += type_size;
678 src_ptr += padded_size;
682 for (
size_t i = 0; i < row_count; i++) {
683 std::memcpy(dst_ptr, src_ptr, type_size);
684 dst_ptr += type_size;
685 src_ptr += padded_size;
SQLTypeInfo getColumnType(const size_t col_idx) const
UpdateValuesStats new_values_stats
auto isVarlenUpdateRequired() const
std::vector< int > ChunkKey
StorageIOFacility::TransactionLog transaction_tracker_
HOST DEVICE int get_size() const
class for a per-database catalog. also includes metadata for the current database and the current use...
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const Catalog_Namespace::Catalog & getCatalog() const
std::vector< TargetMetaInfo > UpdateTargetTypeList
const Catalog_Namespace::Catalog & catalog_
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
UpdateTargetColumnNamesList update_column_names_
bool should_recompute_metadata(const std::optional< Fragmenter_Namespace::ChunkUpdateStats > &update_stats)
bool is_chunk_min_max_updated(const Fragmenter_Namespace::ChunkUpdateStats &update_stats, int64_t min, int64_t max)
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
virtual ~TransactionParameters()=default
bool g_enable_auto_metadata_update
int normalized_cpu_threads() const
HOST DEVICE SQLTypes get_type() const
UpdateTransactionParameters & operator=(UpdateTransactionParameters const &other)=delete
std::function< void(const UpdateLogForFragment &, TableUpdateMetadata &)> Callback
std::vector< std::string > UpdateTargetColumnNamesList
std::vector< uint64_t > UpdateTargetOffsetList
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
future< Result > async(Fn &&fn, Args &&...args)
std::vector< TargetValue > getEntryAt(const size_t index) const override
int get_logical_size() const
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
const DBMetadata & getCurrentDB() const
auto getUpdateColumnCount() const
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
FragmentInfoType const & getFragmentInfo() const
DeleteTransactionParameters & operator=(DeleteTransactionParameters const &other)=delete
DeleteTransactionParameters(const TableDescriptorType *table_descriptor, const Catalog_Namespace::Catalog &catalog)
UpdateTargetTypeList const & targets_meta_
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
const RelAlgNode * input_source_node_
void checkpointWithAutoRollback(const int logical_table_id) const
std::unique_ptr< TransactionLog > TransactionLogPtr
std::function< bool(std::string const &)> ColumnValidationFunction
auto const & getUpdateColumnNames() const
bool table_is_temporary(const TableDescriptor *const td)
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
auto getTargetsMetaInfoSize() const
Data_Namespace::MemoryLevel persistenceLevel
void setInputSourceNode(const RelAlgNode *input_source_node)
auto const * getTableDescriptor() const
TableDescriptorType const * table_descriptor_
UpdateLogForFragment::Callback UpdateCallback
UpdelRoll ModifyTransactionTracker
StorageIOFacility::TransactionLog & getTransactionTracker()
std::vector< uint64_t > DeleteVictimOffsetList
auto tableIsTemporary() const
bool is_dict_encoded_string() const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
const RelAlgNode * getInputSourceNode() const
TransactionParameters(const TableDescriptorType *table_descriptor, const Catalog_Namespace::Catalog &catalog)
auto getResultSet() const
auto const & getTargetsMetaInfo() const
void finalizeTransaction(const Catalog_Namespace::Catalog &catalog)
size_t const getRowCount() const override
bool varlen_update_required_
StorageIOFacility(Executor *executor)
UpdateValuesStats old_values_stats
UpdateTransactionParameters(TableDescriptorType const *table_descriptor, const Catalog_Namespace::Catalog &catalog, UpdateTargetColumnNamesList const &update_column_names, UpdateTargetTypeList const &target_types, bool varlen_update_required)
size_t const getEntryCount() const override
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue