31 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
33 const size_t thread_idx,
34 const size_t executor_id,
39 std::vector<SQLTypeInfo> col_types;
40 for (
size_t i = 0; i < result->colCount(); ++i) {
41 const auto& src_ti = result->getColType(i);
42 CHECK_EQ(result->checkSlotUsesFlatBufferFormat(i), src_ti.usesFlatBuffer());
44 ti.setUsesFlatBuffer(src_ti.supportsFlatBuffer());
45 col_types.push_back(ti);
48 row_set_mem_owner, *result, result->colCount(), col_types, executor_id, thread_idx);
52 switch (memoryLevel) {
66 :
executor_(executor), columnarized_table_cache_(column_cache) {}
78 const size_t thread_idx,
79 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
81 static std::mutex columnar_conversion_mutex;
88 CHECK(!cd || !(cd->isVirtualCol));
89 const int8_t* col_buff =
nullptr;
100 column_key.column_id,
104 executor->getDataMgr(),
108 chunk_meta_it->second->numBytes,
109 chunk_meta_it->second->numElements);
110 chunks_owner.push_back(chunk);
112 auto ab = chunk->getBuffer();
113 CHECK(ab->getMemoryPtr());
114 col_buff =
reinterpret_cast<int8_t*
>(ab->getMemoryPtr());
118 std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
121 if (column_cache.empty() || !column_cache.count(table_key)) {
122 column_cache.insert(std::make_pair(
124 std::unordered_map<
int, std::shared_ptr<const ColumnarResults>>()));
126 auto& frag_id_to_result = column_cache[table_key];
127 if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
128 frag_id_to_result.insert(std::make_pair(
131 executor->row_set_mem_owner_,
134 executor->executor_id_,
137 col_frag = column_cache[table_key][frag_id].get();
142 executor->getDataMgr(),
162 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
166 const size_t thread_idx,
167 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
168 std::vector<std::shared_ptr<void>>& malloc_owner,
170 CHECK(!fragments.empty());
172 size_t col_chunks_buff_sz =
sizeof(
struct JoinChunk) * fragments.size();
174 auto col_chunks_buff =
reinterpret_cast<int8_t*
>(
175 malloc_owner.emplace_back(
checked_malloc(col_chunks_buff_sz), free).get());
176 auto join_chunk_array =
reinterpret_cast<struct
JoinChunk*
>(col_chunks_buff);
179 size_t num_chunks = 0;
180 for (
auto& frag : fragments) {
182 executor->checkNonKernelTimeInterrupted()) {
196 num_elems += elem_count;
207 return {col_chunks_buff,
211 static_cast<size_t>(elem_sz)};
218 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
219 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
220 std::list<ChunkIter>& chunk_iter_holder,
224 const auto fragments_it = all_tables_fragments.find(table_key);
225 CHECK(fragments_it != all_tables_fragments.end());
226 const auto fragments = fragments_it->second;
227 const auto& fragment = (*fragments)[frag_id];
228 if (fragment.isEmptyPhysicalFragment()) {
231 std::shared_ptr<Chunk_NS::Chunk> chunk;
232 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
233 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
237 const auto col_type =
239 const bool is_real_string =
240 col_type.is_string() && col_type.get_compression() ==
kENCODING_NONE;
241 const bool is_varlen =
246 table_key.
db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
247 std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
257 chunk_meta_it->second->numBytes,
258 chunk_meta_it->second->numElements);
260 chunk_holder.push_back(chunk);
264 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
265 chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
266 auto& chunk_iter = chunk_iter_holder.back();
268 return reinterpret_cast<int8_t*
>(&chunk_iter);
270 auto ab = chunk->getBuffer();
272 auto& row_set_mem_owner =
executor_->getRowSetMemoryOwner();
273 row_set_mem_owner->addVarlenInputBuffer(ab);
278 chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter),
sizeof(
ChunkIter));
279 return chunk_iter_gpu;
282 auto ab = chunk->getBuffer();
283 CHECK(ab->getMemoryPtr());
284 return ab->getMemoryPtr();
291 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
295 const size_t thread_idx)
const {
296 const auto fragments_it = all_tables_fragments.find(table_key);
297 CHECK(fragments_it != all_tables_fragments.end());
298 const auto fragments = fragments_it->second;
299 const auto frag_count = fragments->size();
300 std::vector<std::unique_ptr<ColumnarResults>> column_frags;
308 for (
size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
310 executor_->checkNonKernelTimeInterrupted()) {
313 std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
314 std::list<ChunkIter> chunk_iter_holder;
315 const auto& fragment = (*fragments)[frag_id];
316 if (fragment.isEmptyPhysicalFragment()) {
319 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
320 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
322 static_cast<int>(frag_id),
324 all_tables_fragments,
330 column_frags.push_back(
331 std::make_unique<ColumnarResults>(
executor_->row_set_mem_owner_,
333 fragment.getNumTuples(),
334 chunk_meta_it->second->sqlType,
338 auto merged_results =
340 table_column = merged_results.get();
343 table_column = column_it->second.get();
359 const size_t thread_idx)
const {
375 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
376 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
377 std::list<ChunkIter>& chunk_iter_holder,
381 const size_t thread_idx)
const {
383 const auto fragments_it = all_tables_fragments.find(table_key);
384 CHECK(fragments_it != all_tables_fragments.end());
385 const auto fragments = fragments_it->second;
386 const auto frag_count = fragments->size();
392 bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
393 size_t total_num_tuples = 0;
394 size_t total_data_buf_size = 0;
395 size_t total_idx_buf_size = 0;
409 if (linearized_iter_it->second.find(device_id) !=
410 linearized_iter_it->second.end()) {
417 return chunk_iter_gpu;
426 std::shared_ptr<Chunk_NS::Chunk> chunk;
427 std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
428 std::list<ChunkIter> local_chunk_iter_holder;
429 std::list<size_t> local_chunk_num_tuples;
432 for (
size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
433 const auto& fragment = (*fragments)[frag_id];
434 if (fragment.isEmptyPhysicalFragment()) {
437 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
438 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
440 table_key.
db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
446 chunk_meta_it->second->numBytes,
447 chunk_meta_it->second->numElements);
448 local_chunk_holder.push_back(chunk);
449 auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
450 local_chunk_iter_holder.push_back(chunk_iter);
451 local_chunk_num_tuples.push_back(fragment.getNumTuples());
452 total_num_tuples += fragment.getNumTuples();
453 total_data_buf_size += chunk->getBuffer()->size();
454 std::ostringstream oss;
455 oss <<
"Load chunk for col_name: " << chunk->getColumnDesc()->columnName
456 <<
", col_id: " << chunk->getColumnDesc()->columnId <<
", Frag-" << frag_id
457 <<
", numTuples: " << fragment.getNumTuples()
458 <<
", data_size: " << chunk->getBuffer()->size();
459 if (chunk->getIndexBuf()) {
460 auto idx_buf_size = chunk->getIndexBuf()->size() -
sizeof(
ArrayOffsetT);
461 oss <<
", index_size: " << idx_buf_size;
462 total_idx_buf_size += idx_buf_size;
464 VLOG(2) << oss.str();
468 auto& col_ti = cd->columnType;
478 if (col_ti.is_array()) {
479 if (col_ti.is_fixlen_array()) {
480 VLOG(2) <<
"Linearize fixed-length multi-frag array column (col_id: "
481 << cd->columnId <<
", col_name: " << cd->columnName
483 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
488 local_chunk_iter_holder,
489 local_chunk_num_tuples,
499 CHECK(col_ti.is_varlen_array());
500 VLOG(2) <<
"Linearize variable-length multi-frag array column (col_id: "
501 << cd->columnId <<
", col_name: " << cd->columnName
503 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
508 local_chunk_iter_holder,
509 local_chunk_num_tuples,
520 if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
521 VLOG(2) <<
"Linearize variable-length multi-frag non-encoded text column (col_id: "
522 << cd->columnId <<
", col_name: " << cd->columnName
524 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
529 local_chunk_iter_holder,
530 local_chunk_num_tuples,
542 if (!col_ti.is_fixlen_array()) {
545 auto merged_data_buffer =
res.first;
546 auto merged_index_buffer =
res.second;
549 auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(
550 merged_data_buffer, merged_index_buffer, cd,
false);
555 *(local_chunk_iter_holder.rbegin()),
560 chunk_holder.push_back(merged_chunk);
561 chunk_iter_holder.push_back(merged_chunk_iter);
564 auto merged_chunk_iter_ptr =
reinterpret_cast<int8_t*
>(&(chunk_iter_holder.back()));
567 return merged_chunk_iter_ptr;
570 CHECK(device_allocator);
577 chunk_iter_gpu, merged_chunk_iter_ptr,
sizeof(
ChunkIter));
578 return chunk_iter_gpu;
584 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
585 std::list<ChunkIter>& chunk_iter_holder,
586 std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
587 std::list<ChunkIter>& local_chunk_iter_holder,
588 std::list<size_t>& local_chunk_num_tuples,
592 const size_t total_data_buf_size,
593 const size_t total_idx_buf_size,
594 const size_t total_num_tuples,
596 const size_t thread_idx)
const {
608 bool has_cached_merged_idx_buf =
false;
609 bool has_cached_merged_data_buf =
false;
613 int64_t linearization_time_ms = 0;
619 auto& cd_cache = cached_data_buf_cache_it->second;
620 auto cached_data_buf_it = cd_cache.find(device_id);
621 if (cached_data_buf_it != cd_cache.end()) {
622 has_cached_merged_data_buf =
true;
623 merged_data_buffer = cached_data_buf_it->second;
624 VLOG(2) <<
"Recycle merged data buffer for linearized chunks (memory_level: "
629 executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
630 VLOG(2) <<
"Allocate " << total_data_buf_size
631 <<
" bytes of data buffer space for linearized chunks (memory_level: "
634 cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
639 executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
640 VLOG(2) <<
"Allocate " << total_data_buf_size
641 <<
" bytes of data buffer space for linearized chunks (memory_level: "
644 m.insert(std::make_pair(device_id, merged_data_buffer));
648 auto cached_index_buf_it =
651 has_cached_merged_idx_buf =
true;
652 merged_index_buffer_in_cpu = cached_index_buf_it->second;
654 <<
"Recycle merged temporary idx buffer for linearized chunks (memory_level: "
657 auto idx_buf_size = total_idx_buf_size +
sizeof(
ArrayOffsetT);
658 merged_index_buffer_in_cpu =
660 VLOG(2) <<
"Allocate " << idx_buf_size
661 <<
" bytes of temporary idx buffer space on CPU for linearized chunks";
664 std::make_pair(cd->
columnId, merged_index_buffer_in_cpu));
669 size_t sum_data_buf_size = 0;
670 size_t cur_sum_num_tuples = 0;
671 size_t total_idx_size_modifier = 0;
672 auto chunk_holder_it = local_chunk_holder.begin();
673 auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
674 auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
675 bool null_padded_first_elem =
false;
676 bool null_padded_last_val =
false;
683 for (; chunk_holder_it != local_chunk_holder.end();
684 chunk_holder_it++, chunk_num_tuple_it++) {
686 auto target_chunk = chunk_holder_it->get();
687 auto target_chunk_data_buffer = target_chunk->getBuffer();
688 auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
689 auto target_idx_buf_ptr =
690 reinterpret_cast<ArrayOffsetT*
>(target_chunk_idx_buffer->getMemoryPtr());
691 auto cur_chunk_num_tuples = *chunk_num_tuple_it;
693 size_t cur_idx = cur_chunk_num_tuples;
695 while (original_offset < 0) {
696 original_offset = target_idx_buf_ptr[--cur_idx];
698 ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
699 if (new_offset < 0) {
700 throw std::runtime_error(
701 "Linearization of a variable-length column having chunk size larger than 2GB "
702 "not supported yet");
704 sum_data_buf_size += target_chunk_data_buffer->size();
706 chunk_holder_it = local_chunk_holder.begin();
707 chunk_num_tuple_it = local_chunk_num_tuples.begin();
708 sum_data_buf_size = 0;
710 for (; chunk_holder_it != local_chunk_holder.end();
711 chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
713 executor_->checkNonKernelTimeInterrupted()) {
716 auto target_chunk = chunk_holder_it->get();
717 auto target_chunk_data_buffer = target_chunk->getBuffer();
718 auto cur_chunk_num_tuples = *chunk_num_tuple_it;
719 auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
720 auto target_idx_buf_ptr =
721 reinterpret_cast<ArrayOffsetT*
>(target_chunk_idx_buffer->getMemoryPtr());
722 auto idx_buf_size = target_chunk_idx_buffer->size() -
sizeof(
ArrayOffsetT);
723 auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
724 auto target_data_buffer_size = target_chunk_data_buffer->size();
731 if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
732 null_padded_first_elem =
true;
738 if (!has_cached_merged_data_buf) {
739 merged_data_buffer->
append(target_data_buffer_start_ptr,
740 target_data_buffer_size,
745 if (!has_cached_merged_idx_buf) {
747 merged_index_buffer_in_cpu->
append(target_chunk_idx_buffer->getMemoryPtr(),
755 if (cur_sum_num_tuples > 0) {
756 if (null_padded_last_val) {
759 idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
762 std::vector<std::future<void>> conversion_threads;
763 std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
764 std::vector<size_t>());
765 bool is_parallel_modification =
false;
766 std::vector<size_t> null_padded_row_idx_vec;
767 const auto do_work = [&cur_sum_num_tuples,
769 &null_padded_first_elem,
773 const bool is_parallel_modification,
774 std::vector<size_t>* null_padded_row_idx_vec) {
775 for (
size_t i = start; i < end; i++) {
776 if (
LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
777 if (null_padded_first_elem) {
779 idx_buf_ptr[cur_sum_num_tuples + i] -=
782 idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
789 null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
794 is_parallel_modification =
true;
796 makeIntervals(
size_t(0), cur_chunk_num_tuples, worker_count)) {
797 conversion_threads.push_back(
802 is_parallel_modification,
803 &null_padded_row_idx_vecs[interval.index]));
805 for (
auto& child : conversion_threads) {
808 for (
auto& v : null_padded_row_idx_vecs) {
809 std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
813 cur_chunk_num_tuples,
814 is_parallel_modification,
815 &null_padded_row_idx_vec);
817 if (!null_padded_row_idx_vec.empty()) {
820 std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
821 for (
auto& padded_null_row_idx : null_padded_row_idx_vec) {
822 if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
823 idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
825 idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
831 cur_sum_num_tuples += cur_chunk_num_tuples;
832 sum_data_buf_size += target_chunk_data_buffer->size();
833 if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
834 null_padded_last_val =
true;
836 null_padded_last_val =
false;
838 if (null_padded_first_elem) {
840 null_padded_first_elem =
false;
842 if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
843 auto merged_index_buffer_ptr =
845 merged_index_buffer_ptr[total_num_tuples] =
846 total_data_buf_size -
847 total_idx_size_modifier;
853 size_t buf_size = total_idx_buf_size +
sizeof(
ArrayOffsetT);
856 int8_t* src, int8_t*
dest,
size_t buf_size,
MemoryLevel memory_level) {
858 memcpy((
void*)dest, src, buf_size);
871 auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
872 auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
873 if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
874 merged_index_buffer = merged_idx_buf_it->second;
876 merged_index_buffer =
877 executor_->getDataMgr()->alloc(memory_level, device_id, buf_size);
882 merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
885 merged_index_buffer =
886 executor_->getDataMgr()->alloc(memory_level, device_id, buf_size);
892 m.insert(std::make_pair(device_id, merged_index_buffer));
897 merged_index_buffer = merged_index_buffer_in_cpu;
900 CHECK(merged_index_buffer);
901 linearization_time_ms +=
timer_stop(clock_begin);
902 VLOG(2) <<
"Linearization has been successfully done, elapsed time: "
903 << linearization_time_ms <<
" ms.";
904 return {merged_data_buffer, merged_index_buffer};
909 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
910 std::list<ChunkIter>& chunk_iter_holder,
911 std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
912 std::list<ChunkIter>& local_chunk_iter_holder,
913 std::list<size_t>& local_chunk_num_tuples,
917 const size_t total_data_buf_size,
918 const size_t total_idx_buf_size,
919 const size_t total_num_tuples,
921 const size_t thread_idx)
const {
922 int64_t linearization_time_ms = 0;
926 bool has_cached_merged_data_buf =
false;
932 auto& cd_cache = cached_data_buf_cache_it->second;
933 auto cached_data_buf_it = cd_cache.find(device_id);
934 if (cached_data_buf_it != cd_cache.end()) {
935 has_cached_merged_data_buf =
true;
936 merged_data_buffer = cached_data_buf_it->second;
937 VLOG(2) <<
"Recycle merged data buffer for linearized chunks (memory_level: "
942 executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
943 VLOG(2) <<
"Allocate " << total_data_buf_size
944 <<
" bytes of data buffer space for linearized chunks (memory_level: "
947 cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
952 executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
953 VLOG(2) <<
"Allocate " << total_data_buf_size
954 <<
" bytes of data buffer space for linearized chunks (memory_level: "
957 m.insert(std::make_pair(device_id, merged_data_buffer));
961 if (!has_cached_merged_data_buf) {
962 size_t sum_data_buf_size = 0;
963 auto chunk_holder_it = local_chunk_holder.begin();
964 auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
965 for (; chunk_holder_it != local_chunk_holder.end();
966 chunk_holder_it++, chunk_iter_holder_it++) {
970 auto target_chunk = chunk_holder_it->get();
971 auto target_chunk_data_buffer = target_chunk->getBuffer();
972 merged_data_buffer->
append(target_chunk_data_buffer->getMemoryPtr(),
973 target_chunk_data_buffer->size(),
976 sum_data_buf_size += target_chunk_data_buffer->size();
979 CHECK_EQ(total_data_buf_size, sum_data_buf_size);
981 linearization_time_ms +=
timer_stop(clock_begin);
982 VLOG(2) <<
"Linearization has been successfully done, elapsed time: "
983 << linearization_time_ms <<
" ms.";
984 return {merged_data_buffer,
nullptr};
994 if (!columnar_results) {
998 CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
1000 const auto& col_ti = columnar_results->
getColumnType(col_id);
1002 if (col_ti.usesFlatBuffer()) {
1006 num_bytes = columnar_results->
size() * col_ti.get_size();
1008 CHECK(device_allocator);
1009 auto gpu_col_buffer = device_allocator->
alloc(num_bytes);
1010 device_allocator->
copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1011 return gpu_col_buffer;
1013 return col_buffers[col_id];
1017 const int device_id,
1018 int8_t* chunk_iter_ptr)
const {
1022 auto iter_device_it = chunk_iter_it->second.find(device_id);
1023 if (iter_device_it == chunk_iter_it->second.end()) {
1024 VLOG(2) <<
"Additional merged chunk_iter for col_desc (tbl: "
1026 <<
"), device_id: " << device_id;
1027 chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1031 iter_m.emplace(device_id, chunk_iter_ptr);
1032 VLOG(2) <<
"New merged chunk_iter for col_desc (tbl: "
1034 <<
"), device_id: " << device_id;
1040 const int device_id)
const {
1043 auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1044 if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1045 VLOG(2) <<
"Recycle merged chunk_iter for col_desc (tbl: "
1047 <<
"), device_id: " << device_id;
1048 return dev_iter_map_it->second;
1057 bool is_true_varlen_type,
1058 const size_t total_num_tuples)
const {
1060 if (is_true_varlen_type) {
1072 merged_chunk_iter.
num_elems = total_num_tuples;
1073 merged_chunk_iter.
skip = chunk_iter.
skip;
1076 return merged_chunk_iter;
1082 const auto data_mgr =
executor_->getDataMgr();
1086 for (
auto& kv2 : kv.second) {
1087 data_mgr->free(kv2.second);
1094 for (
auto& kv2 : kv.second) {
1095 data_mgr->free(kv2.second);
1104 const auto data_mgr =
executor_->getDataMgr();
1107 data_mgr->free(kv.second);
1117 const int device_id,
1119 const size_t thread_idx)
const {
1126 table_key, std::unordered_map<
int, std::shared_ptr<const ColumnarResults>>()));
1130 if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1131 frag_id_to_result.insert(
1132 std::make_pair(frag_id,
1133 std::shared_ptr<const ColumnarResults>(
1145 result, col_id,
executor_->getDataMgr(), memory_level, device_id, device_allocator);
size_t getNumTuples() const
std::vector< int > ChunkKey
HOST DEVICE int get_size() const
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex varlen_chunk_fetch_mutex_
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
virtual int8_t * getMemoryPtr()=0
std::mutex columnar_fetch_mutex_
DEVICE void sort(ARGS &&...args)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
TypeR::rep timer_stop(Type clock_begin)
virtual int8_t * alloc(const size_t num_bytes)=0
Constants for Builtin SQL Types supported by HEAVY.AI.
std::shared_ptr< ResultSet > ResultSetPtr
bool isEmptyPhysicalFragment() const
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
bool g_enable_non_kernel_time_query_interrupt
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
std::mutex linearization_mutex_
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getAllTableColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
ColumnFetcher(Executor *executor, ColumnCacheMap &column_cache)
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
#define INJECT_TIMER(DESC)
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
const size_t size() const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
const ChunkMetadataMap & getChunkMetadataMap() const
MergedChunk linearizeFixedLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
int64_t getBufferSize() const
std::mutex chunk_list_mutex_
const SQLTypeInfo & get_type_info() const
void freeTemporaryCpuLinearizedIdxBuf()
const shared::ColumnKey & getColumnKey() const
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk's pointer and element count on either CPU or GPU.
MergedChunk linearizeVarLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
const int8_t * linearizeColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define DEBUG_TIMER(name)
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
HOST static DEVICE bool isFlatBuffer(const void *buffer)
const std::vector< int8_t * > & getColumnBuffers() const
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
ColumnCacheMap & columnarized_table_cache_
size_t g_enable_parallel_linearization
const SQLTypeInfo & getColumnType(const int col_id) const