20 #include "../Fragmenter/Fragmenter.h"
22 #include <tbb/parallel_for.h>
23 #include <tbb/task_arena.h>
39 return table_info_copy;
45 const std::vector<const TableDescriptor*>& shard_tables) {
46 size_t total_number_of_tuples{0};
49 CHECK(shard_table->fragmenter);
50 const auto& shard_metainfo = shard_table->fragmenter->getFragmentsForQuery();
51 total_number_of_tuples += shard_metainfo.getPhysicalNumTuples();
53 shard_metainfo.fragments.size());
55 shard_metainfo.fragments.begin(),
56 shard_metainfo.fragments.end());
59 return table_info_all_shards;
64 const auto it =
cache_.find(table_key);
66 const auto& table_info = it->second;
71 const auto td =
cat->getMetadataForTable(table_key.
table_id);
73 const auto shard_tables =
cat->getPhysicalTablesDescriptors(td);
93 std::vector<Fragmenter_Namespace::FragmentInfo>
result;
96 auto& fragment = result.front();
97 fragment.fragmentId = 0;
98 fragment.deviceIds.resize(3);
99 fragment.resultSet = rows.get();
100 fragment.resultSetMutex.reset(
new std::mutex());
108 const std::vector<InputDescriptor>& input_descs,
109 Executor* executor) {
110 const auto temporary_tables = executor->getTemporaryTables();
111 std::unordered_map<shared::TableKey, size_t> info_cache;
112 for (
const auto& input_desc : input_descs) {
113 const auto& table_key = input_desc.getTableKey();
114 const auto cached_index_it = info_cache.find(table_key);
115 if (cached_index_it != info_cache.end()) {
116 CHECK_LT(cached_index_it->second, table_infos.size());
117 table_infos.push_back(
118 {table_key,
copy_table_info(table_infos[cached_index_it->second].info)});
123 auto table_id = table_key.table_id;
125 CHECK(temporary_tables);
126 const auto it = temporary_tables->find(table_id);
128 <<
"Failed to find previous query result for node " << -table_id;
132 table_infos.push_back({table_key, executor->getTableInfo(table_key)});
134 CHECK(!table_infos.empty());
135 info_cache.insert(std::make_pair(table_key, table_infos.size() - 1));
141 template <
typename T>
143 std::shared_ptr<ChunkMetadata>& chunk_metadata,
144 const T* values_buffer,
145 const size_t values_count,
147 T min_val{std::numeric_limits<T>::max()};
148 T max_val{std::numeric_limits<T>::lowest()};
149 bool has_nulls{
false};
150 constexpr
size_t parallel_stats_compute_threshold = 20000UL;
151 if (values_count < parallel_stats_compute_threshold) {
152 for (
size_t row_idx = 0; row_idx < values_count; ++row_idx) {
153 const T cell_val = values_buffer[row_idx];
154 if (cell_val == null_val) {
158 if (cell_val < min_val) {
161 if (cell_val > max_val) {
166 const size_t max_thread_count = std::thread::hardware_concurrency();
168 const size_t min_grain_size = max_inputs_per_thread / 2;
169 const size_t num_threads =
170 std::min(max_thread_count,
171 ((values_count + max_inputs_per_thread - 1) / max_inputs_per_thread));
173 std::vector<T> threads_local_mins(num_threads, std::numeric_limits<T>::max());
174 std::vector<T> threads_local_maxes(num_threads, std::numeric_limits<T>::lowest());
175 std::vector<bool> threads_local_has_nulls(num_threads,
false);
176 tbb::task_arena limited_arena(num_threads);
178 limited_arena.execute([&] {
180 tbb::blocked_range<size_t>(0, values_count, min_grain_size),
181 [&](
const tbb::blocked_range<size_t>& r) {
182 const size_t start_idx = r.begin();
183 const size_t end_idx = r.end();
184 T local_min_val = std::numeric_limits<T>::max();
185 T local_max_val = std::numeric_limits<T>::lowest();
186 bool local_has_nulls =
false;
187 for (
size_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
188 const T cell_val = values_buffer[row_idx];
189 if (cell_val == null_val) {
190 local_has_nulls =
true;
193 if (cell_val < local_min_val) {
194 local_min_val = cell_val;
196 if (cell_val > local_max_val) {
197 local_max_val = cell_val;
200 size_t thread_idx = tbb::this_task_arena::current_thread_index();
201 if (local_min_val < threads_local_mins[thread_idx]) {
202 threads_local_mins[thread_idx] = local_min_val;
204 if (local_max_val > threads_local_maxes[thread_idx]) {
205 threads_local_maxes[thread_idx] = local_max_val;
207 if (local_has_nulls) {
208 threads_local_has_nulls[thread_idx] =
true;
211 tbb::simple_partitioner());
214 for (
size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
215 if (threads_local_mins[thread_idx] < min_val) {
216 min_val = threads_local_mins[thread_idx];
218 if (threads_local_maxes[thread_idx] > max_val) {
219 max_val = threads_local_maxes[thread_idx];
221 has_nulls |= threads_local_has_nulls[thread_idx];
224 chunk_metadata->fillChunkStats(min_val, max_val, has_nulls);
228 CHECK(rows->getQueryMemDesc().getQueryDescriptionType() ==
230 CHECK(rows->didOutputColumnar());
231 CHECK(!(rows->areAnyColumnsLazyFetched()));
232 const size_t col_count = rows->colCount();
233 const auto row_count = rows->entryCount();
237 for (
size_t col_idx = 0; col_idx < col_count; ++col_idx) {
238 std::shared_ptr<ChunkMetadata> chunk_metadata = std::make_shared<ChunkMetadata>();
239 const int8_t* columnar_buffer =
const_cast<int8_t*
>(rows->getColumnarBuffer(col_idx));
240 const auto col_sql_type_info = rows->getColType(col_idx);
249 bool is_array = col_sql_type_info.is_array();
250 bool is_geometry = col_sql_type_info.is_geometry();
251 const auto col_type =
252 (is_array ? col_sql_type_info.get_subtype()
253 : (is_geometry ? col_sql_type_info.get_elem_type().get_type()
254 : col_sql_type_info.get_type()));
255 const auto col_type_info =
256 ((is_array || is_geometry) ? col_sql_type_info.get_elem_type()
257 : col_sql_type_info);
259 chunk_metadata->sqlType = col_type_info;
260 chunk_metadata->numElements = row_count;
262 const int8_t* values_buffer{
nullptr};
263 size_t values_count{0};
269 switch (col_sql_type_info.get_type()) {
273 values_count = row_count * 2;
274 values_buffer = m.get_values();
280 values_count = m.getValuesCount();
281 values_buffer = m.getValuesBuffer();
288 CHECK(m.isNestedArray());
289 values_count = m.getValuesCount();
290 values_buffer = m.getValuesBuffer();
293 chunk_metadata->numBytes = row_count * col_type_info.get_size();
294 values_count = row_count;
295 values_buffer = columnar_buffer;
298 if (col_type !=
kTEXT) {
302 CHECK_EQ(col_type_info.get_size(),
sizeof(int32_t));
317 reinterpret_cast<const int16_t*>(values_buffer),
325 reinterpret_cast<const int32_t*>(values_buffer),
333 reinterpret_cast<const int64_t*>(values_buffer),
342 reinterpret_cast<const float*>(values_buffer),
349 reinterpret_cast<const double*>(values_buffer),
356 chunk_metadata_map.emplace(col_idx, chunk_metadata);
358 return chunk_metadata_map;
373 if (rows->definitelyHasNoRows()) {
375 std::vector<std::unique_ptr<Encoder>> decoders;
376 for (
size_t i = 0; i < rows->colCount(); ++i) {
379 metadata_map.emplace(i, decoders.back()->getMetadata(rows->getColType(i)));
386 std::vector<std::vector<std::unique_ptr<Encoder>>> dummy_encoders;
387 const size_t worker_count =
389 for (
size_t worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
390 dummy_encoders.emplace_back();
391 for (
size_t i = 0; i < rows->colCount(); ++i) {
392 const auto& col_ti = rows->getColType(i);
398 if (rows->getQueryMemDesc().getQueryDescriptionType() ==
404 std::vector<SQLTypeInfo> row_col_ti;
405 std::vector<Number64> col_null_vals(rows->colCount());
406 for (
size_t i = 0; i < rows->colCount(); i++) {
407 auto const col_ti = rows->getColType(i);
408 row_col_ti.push_back(col_ti);
411 }
else if (col_ti.is_fp()) {
414 throw std::runtime_error(col_ti.get_type_name() +
415 " is not supported in temporary table.");
420 const auto do_work = [
rows, &row_col_ti, &col_null_vals](
421 const std::vector<TargetValue>& crt_row,
422 std::vector<std::unique_ptr<Encoder>>& dummy_encoders) {
423 for (
size_t i = 0; i < rows->colCount(); ++i) {
424 const auto& col_ti = row_col_ti[i];
425 const auto& col_val = crt_row[i];
426 const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
427 CHECK(scalar_col_val);
429 const auto i64_p = boost::get<int64_t>(scalar_col_val);
431 dummy_encoders[i]->updateStats(*i64_p, *i64_p == col_null_vals[i].as_int64);
433 CHECK(col_ti.is_fp());
434 switch (col_ti.get_type()) {
436 const auto float_p = boost::get<float>(scalar_col_val);
438 dummy_encoders[i]->updateStats(*float_p,
439 *float_p == col_null_vals[i].as_double);
443 const auto double_p = boost::get<double>(scalar_col_val);
445 dummy_encoders[i]->updateStats(*double_p,
446 *double_p == col_null_vals[i].as_double);
458 const size_t entry_count = rows->entryCount();
460 tbb::blocked_range<size_t>(0, entry_count),
461 [&do_work, &rows, &dummy_encoders](
const tbb::blocked_range<size_t>& range) {
462 const size_t worker_idx = tbb::this_task_arena::current_thread_index();
463 for (
size_t i = range.begin(); i < range.end(); ++i) {
464 const auto crt_row = rows->getRowAtNoTranslations(i);
465 if (!crt_row.empty()) {
466 do_work(crt_row, dummy_encoders[worker_idx]);
474 auto crt_row = rows->getNextRow(
false,
false);
475 if (crt_row.empty()) {
478 do_work(crt_row, dummy_encoders[0]);
484 for (
size_t worker_idx = 1; worker_idx < worker_count; ++worker_idx) {
485 CHECK_LT(worker_idx, dummy_encoders.size());
486 const auto& worker_encoders = dummy_encoders[worker_idx];
487 for (
size_t i = 0; i < rows->colCount(); ++i) {
488 dummy_encoders[0][i]->reduceStats(*worker_encoders[i]);
492 for (
size_t i = 0; i < rows->colCount(); ++i) {
494 metadata_map.emplace(i, dummy_encoders[0][i]->getMetadata(rows->getColType(i)));
501 const auto temporary_tables = executor->getTemporaryTables();
502 CHECK(temporary_tables);
503 auto it = temporary_tables->find(table_key.
table_id);
504 if (it != temporary_tables->end()) {
508 const auto table_info = executor->getTableInfo(table_key);
509 return table_info.fragments.size();
514 const std::vector<InputDescriptor>& input_descs,
515 Executor* executor) {
516 std::vector<InputTableInfo> table_infos;
522 Executor* executor) {
523 std::vector<InputTableInfo> table_infos;
530 bool need_to_compute_metadata =
true;
538 if (enable_chunk_metadata_cache) {
539 std::optional<ChunkMetadataMap> cached =
540 executor->getResultSetRecyclerHolder().getCachedChunkMetadata(
544 need_to_compute_metadata =
false;
547 if (need_to_compute_metadata) {
550 executor->getResultSetRecyclerHolder().putChunkMetadataToCache(
564 for (
const auto& [column_id, chunk_metadata] : chunkMetadataMap) {
565 metadata_map[column_id] = std::make_shared<ChunkMetadata>(*chunk_metadata);
571 std::unique_ptr<std::lock_guard<std::mutex>> lock;
572 if (resultSetMutex) {
573 lock.reset(
new std::lock_guard<std::mutex>(*resultSetMutex));
575 CHECK_EQ(!!resultSet, !!resultSetMutex);
576 if (resultSet && !synthesizedNumTuplesIsValid) {
577 numTuples = resultSet->rowCount();
578 synthesizedNumTuplesIsValid =
true;
584 if (!fragments.empty() && fragments.front().resultSet) {
585 return fragments.front().getNumTuples();
591 if (!fragments.empty() && fragments.front().resultSet) {
592 return fragments.front().resultSet->entryCount();
598 if (!fragments.empty() && fragments.front().resultSet) {
599 return fragments.front().resultSet->entryCount();
601 size_t fragment_num_tupples_upper_bound = 0;
602 for (
const auto& fragment : fragments) {
603 fragment_num_tupples_upper_bound =
604 std::max(fragment.getNumTuples(), fragment_num_tupples_upper_bound);
606 return fragment_num_tupples_upper_bound;
size_t getNumTuples() const
ChunkMetadataMap getChunkMetadataMapPhysicalCopy() const
static Encoder * Create(Data_Namespace::AbstractBuffer *buffer, const SQLTypeInfo sqlType)
std::vector< InputDescriptor > input_descs
std::shared_ptr< ResultSet > ResultSetPtr
std::vector< FragmentInfo > fragments
std::vector< int > chunkKeyPrefix
bool g_enable_data_recycler
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
#define LOG_IF(severity, condition)
bool g_use_chunk_metadata_cache
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
ChunkMetadataMap chunkMetadataMap
bool use_parallel_algorithms(const ResultSet &rows)
size_t getPhysicalNumTuples() const
const size_t max_inputs_per_thread
static SysCatalog & instance()
size_t getNumTuples() const
size_t getFragmentNumTuplesUpperBound() const
const ChunkMetadataMap & getChunkMetadataMap() const
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
HOST DEVICE EncodingType get_compression() const
constexpr float inline_fp_null_value< float >()
constexpr double inline_fp_null_value< double >()
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
bool g_enable_filter_push_down
size_t getNumTuplesUpperBound() const
#define DEBUG_TIMER(name)
void setPhysicalNumTuples(const size_t physNumTuples)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool synthesizedMetadataIsValid
static constexpr ExecutorId UNITARY_EXECUTOR_ID
HOST static DEVICE bool isFlatBuffer(const void *buffer)
DEVICE void swap(ARGS &&...args)
static int64_t getBufferSize(const void *buffer)