OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnarResults Class Reference

#include <ColumnarResults.h>

Public Types

using ReadFunction = std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)>
 
using WriteFunction = std::function< void(const ResultSet &, const size_t, const size_t, const size_t, const size_t, const ReadFunction &)>
 

Public Member Functions

 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)
 
 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const int8_t *one_col_buffer, const size_t num_rows, const SQLTypeInfo &target_type, const size_t executor_id, const size_t thread_idx)
 
const std::vector< int8_t * > & getColumnBuffers () const
 
const size_t size () const
 
const SQLTypeInfogetColumnType (const int col_id) const
 
bool isParallelConversion () const
 
bool isDirectColumnarConversionPossible () const
 

Static Public Member Functions

static std::unique_ptr
< ColumnarResults
mergeResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
 

Protected Attributes

std::vector< int8_t * > column_buffers_
 
size_t num_rows_
 

Private Member Functions

 ColumnarResults (const size_t num_rows, const std::vector< SQLTypeInfo > &target_types, const std::vector< size_t > &padded_target_sizes)
 
void writeBackCell (const TargetValue &col_val, const size_t row_idx, const SQLTypeInfo &type_info, int8_t *column_buf, std::mutex *write_mutex=nullptr)
 
void materializeAllColumnsDirectly (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsThroughIteration (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsGroupBy (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsProjection (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsTableFunction (const ResultSet &rows, const size_t num_columns)
 
void copyAllNonLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void materializeAllLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void locateAndCountEntries (const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
 
void compactAndCopyEntries (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithoutTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
template<typename DATA_TYPE >
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
 
std::vector< WriteFunctioninitWriteFunctions (const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
 
template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ReadFunctioninitReadFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
std::tuple< std::vector
< WriteFunction >, std::vector
< ReadFunction > > 
initAllConversionFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 

Private Attributes

const std::vector< SQLTypeInfotarget_types_
 
bool parallel_conversion_
 
bool direct_columnar_conversion_
 
size_t thread_idx_
 
std::shared_ptr< Executorexecutor_
 
std::vector< size_t > padded_target_sizes_
 

Detailed Description

Definition at line 61 of file ColumnarResults.h.

Member Typedef Documentation

using ColumnarResults::ReadFunction = std::function<int64_t(const ResultSet&, const size_t, const size_t, const size_t)>

Definition at line 98 of file ColumnarResults.h.

using ColumnarResults::WriteFunction = std::function<void(const ResultSet&, const size_t, const size_t, const size_t, const size_t, const ReadFunction&)>

Definition at line 107 of file ColumnarResults.h.

Constructor & Destructor Documentation

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const ResultSet rows,
const size_t  num_columns,
const std::vector< SQLTypeInfo > &  target_types,
const size_t  executor_id,
const size_t  thread_idx,
const bool  is_parallel_execution_enforced = false 
)

Definition at line 256 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, column_buffers_, anonymous_namespace{ColumnarResults.cpp}::computeTotalNofValuesForColumnArray(), anonymous_namespace{ColumnarResults.cpp}::computeTotalNofValuesForColumnGeoType(), anonymous_namespace{ColumnarResults.cpp}::computeTotalNofValuesForColumnTextEncodingNone(), DEBUG_TIMER, executor_, Executor::getExecutor(), getFlatBufferSize(), initializeFlatBuffer(), isDirectColumnarConversionPossible(), kARRAY, kENCODING_DICT, kENCODING_NONE, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, kTEXT, materializeAllColumnsDirectly(), materializeAllColumnsThroughIteration(), num_rows_, padded_target_sizes_, report::rows, target_types_, thread_idx_, and UNREACHABLE.

Referenced by mergeResults().

263  : column_buffers_(num_columns)
265  rows.isDirectColumnarConversionPossible()
266  ? rows.entryCount()
267  : rows.rowCount())
268  , target_types_(target_types)
269  , parallel_conversion_(is_parallel_execution_enforced
270  ? true
272  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
273  , thread_idx_(thread_idx)
275  auto timer = DEBUG_TIMER(__func__);
276  column_buffers_.resize(num_columns);
277  executor_ = Executor::getExecutor(executor_id);
278  CHECK(executor_);
279  CHECK_EQ(padded_target_sizes_.size(), target_types.size());
280 
281  for (size_t i = 0; i < num_columns; ++i) {
282  const auto& src_ti = rows.getColType(i);
283  // ti is initialized in columnarize_result() function in
284  // ColumnFetcher.cpp and it may differ from src_ti with respect to
285  // uses_flatbuffer attribute
286  const auto& ti = target_types_[i];
287 
288  if (rows.isZeroCopyColumnarConversionPossible(i)) {
289  CHECK_EQ(ti.usesFlatBuffer(), src_ti.usesFlatBuffer());
290  // The column buffer will be assigned in
291  // ColumnarResults::copyAllNonLazyColumns.
292  column_buffers_[i] = nullptr;
293  continue;
294  }
295  CHECK(!(src_ti.usesFlatBuffer() && ti.usesFlatBuffer()));
296  // When the source result set uses FlatBuffer layout, it must
297  // support zero-copy columnar conversion. Otherwise, the source
298  // result will be columnarized according to ti.usesFlatBuffer()
299  // state that is set in columnarize_result function in
300  // ColumnFetcher.cpp.
301  if (src_ti.usesFlatBuffer() && ti.usesFlatBuffer()) {
302  // If both source and target result sets use FlatBuffer layout,
303  // creating a columnar result should be using zero-copy columnar
304  // conversion.
305  UNREACHABLE();
306  } else if (ti.usesFlatBuffer()) {
307  int64_t values_count = -1;
308  switch (ti.get_type()) {
309  case kARRAY:
310  if (ti.get_subtype() == kTEXT && ti.get_compression() == kENCODING_NONE) {
311  throw std::runtime_error(
312  "Column<Array<TextEncodedNone>> support not implemented yet "
313  "(ColumnarResults)");
314  } else {
315  values_count = computeTotalNofValuesForColumnArray(rows, i);
316  }
317  break;
318  case kPOINT:
319  values_count = num_rows_;
320  break;
321  case kLINESTRING:
322  values_count =
325  rows, ti, i);
326  break;
327  case kPOLYGON:
328  values_count =
330  GeoPolyTargetValuePtr>(rows, ti, i);
331  break;
332  case kMULTIPOINT:
333  values_count =
336  rows, ti, i);
337  break;
338  case kMULTILINESTRING:
339  values_count =
342  rows, ti, i);
343  break;
344  case kMULTIPOLYGON:
345  values_count =
348  rows, ti, i);
349  break;
350  case kTEXT:
351  if (ti.get_compression() == kENCODING_NONE) {
352  values_count = computeTotalNofValuesForColumnTextEncodingNone(rows, i);
353  break;
354  }
355  if (ti.get_compression() == kENCODING_DICT) {
356  values_count = num_rows_;
357  break;
358  }
359  default:
360  UNREACHABLE() << "computing number of values not implemented for "
361  << ti.toString();
362  }
363  // TODO: include sizes count to optimize flatbuffer size
364  const int64_t flatbuffer_size = getFlatBufferSize(num_rows_, values_count, ti);
365  column_buffers_[i] = row_set_mem_owner->allocate(flatbuffer_size, thread_idx_);
367  initializeFlatBuffer(m, num_rows_, values_count, ti);
368  // The column buffer will be initialized either directly or
369  // through iteration.
370  // TODO: implement QE-808 resolution here.
371  } else {
372  if (ti.is_varlen()) {
374  }
375  // The column buffer will be initialized either directly or
376  // through iteration.
377  column_buffers_[i] =
378  row_set_mem_owner->allocate(num_rows_ * padded_target_sizes_[i], thread_idx_);
379  }
380  }
381 
382  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
383  materializeAllColumnsDirectly(rows, num_columns);
384  } else {
385  materializeAllColumnsThroughIteration(rows, num_columns);
386  }
387 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int8_t * > column_buffers_
std::vector< size_t > get_padded_target_sizes(const ResultSet &rows, const std::vector< SQLTypeInfo > &target_types)
#define UNREACHABLE()
Definition: Logger.h:338
int64_t computeTotalNofValuesForColumnTextEncodingNone(const ResultSet &rows, const size_t column_idx)
void initializeFlatBuffer(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1993
bool direct_columnar_conversion_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1600
tuple rows
Definition: report.py:114
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
int64_t getFlatBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1841
int64_t computeTotalNofValuesForColumnArray(const ResultSet &rows, const size_t column_idx)
int64_t computeTotalNofValuesForColumnGeoType(const ResultSet &rows, const SQLTypeInfo &ti, const size_t column_idx)
bool isDirectColumnarConversionPossible() const
Definition: sqltypes.h:79
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
std::shared_ptr< Executor > executor_
std::vector< size_t > padded_target_sizes_
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const int8_t *  one_col_buffer,
const size_t  num_rows,
const SQLTypeInfo target_type,
const size_t  executor_id,
const size_t  thread_idx 
)

Definition at line 389 of file ColumnarResults.cpp.

395  : column_buffers_(1)
396  , num_rows_(num_rows)
397  , target_types_{target_type}
398  , parallel_conversion_(false)
400  , thread_idx_(thread_idx) {
401  auto timer = DEBUG_TIMER(__func__);
402  const bool is_varlen =
403  target_type.is_array() ||
404  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
405  target_type.is_geometry();
406  if (is_varlen) {
408  }
409  executor_ = Executor::getExecutor(executor_id);
410  padded_target_sizes_.emplace_back(target_type.get_size());
411  CHECK(executor_);
412  const auto buf_size = num_rows * target_type.get_size();
413  column_buffers_[0] =
414  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
415  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
416 }
std::vector< int8_t * > column_buffers_
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
bool direct_columnar_conversion_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
std::shared_ptr< Executor > executor_
std::vector< size_t > padded_target_sizes_
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:597
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool is_string() const
Definition: sqltypes.h:561
const std::vector< SQLTypeInfo > target_types_
bool is_array() const
Definition: sqltypes.h:585
ColumnarResults::ColumnarResults ( const size_t  num_rows,
const std::vector< SQLTypeInfo > &  target_types,
const std::vector< size_t > &  padded_target_sizes 
)
inlineprivate

Definition at line 114 of file ColumnarResults.h.

117  : num_rows_(num_rows)
118  , target_types_(target_types)
119  , padded_target_sizes_(padded_target_sizes) {}
std::vector< size_t > padded_target_sizes_
const std::vector< SQLTypeInfo > target_types_

Member Function Documentation

void ColumnarResults::compactAndCopyEntries ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This function goes through all non-empty elements marked in the bitmap data structure, and store them back into output column buffers. The output column buffers are compacted without any holes in it.

TODO(Saman): if necessary, we can look into the distribution of non-empty entries and choose a different load-balanced strategy (assigning equal number of non-empties to each thread) as opposed to equal partitioning of the bitmap

Definition at line 1351 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, compactAndCopyEntriesWithoutTargetSkipping(), compactAndCopyEntriesWithTargetSkipping(), heavyai::GroupByBaselineHash, heavyai::GroupByPerfectHash, isDirectColumnarConversionPossible(), and gpu_enabled::partial_sum().

Referenced by materializeAllColumnsGroupBy().

1358  {
1360  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1361  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1362  CHECK_EQ(num_threads, non_empty_per_thread.size());
1363 
1364  // compute the exclusive scan over all non-empty totals
1365  std::vector<size_t> global_offsets(num_threads + 1, 0);
1366  std::partial_sum(non_empty_per_thread.begin(),
1367  non_empty_per_thread.end(),
1368  std::next(global_offsets.begin()));
1369 
1370  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
1371  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
1372  rows.getSupportedSingleSlotTargetBitmap();
1373 
1374  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
1375  // differently and accessed through result set's iterator
1376  if (num_single_slot_targets < num_columns) {
1378  bitmap,
1379  non_empty_per_thread,
1380  global_offsets,
1381  single_slot_targets_to_skip,
1382  slot_idx_per_target_idx,
1383  num_columns,
1384  entry_count,
1385  num_threads,
1386  size_per_thread);
1387  } else {
1389  bitmap,
1390  non_empty_per_thread,
1391  global_offsets,
1392  slot_idx_per_target_idx,
1393  num_columns,
1394  entry_count,
1395  num_threads,
1396  size_per_thread);
1397  }
1398 }
GroupByPerfectHash
Definition: enums.h:58
#define CHECK_EQ(x, y)
Definition: Logger.h:301
tuple rows
Definition: report.py:114
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
bool isDirectColumnarConversionPossible() const
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithoutTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, all targets are assumed to be single-slot and thus can be directly columnarized.

Definition at line 1530 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, executor_, g_enable_non_kernel_time_query_interrupt, heavyai::GroupByBaselineHash, heavyai::GroupByPerfectHash, QueryExecutionError::hasErrorCode(), initAllConversionFunctions(), isDirectColumnarConversionPossible(), report::rows, and UNLIKELY.

Referenced by compactAndCopyEntries().

1539  {
1541  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1542  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1543 
1544  const auto [write_functions, read_functions] =
1545  initAllConversionFunctions(rows, slot_idx_per_target_idx);
1546  CHECK_EQ(write_functions.size(), num_columns);
1547  CHECK_EQ(read_functions.size(), num_columns);
1548  auto do_work = [&rows,
1549  &bitmap,
1550  &global_offsets,
1551  &num_columns,
1552  &slot_idx_per_target_idx,
1553  &write_functions = write_functions,
1554  &read_functions = read_functions](size_t& entry_idx,
1555  size_t& non_empty_idx,
1556  const size_t total_non_empty,
1557  const size_t local_idx,
1558  const size_t thread_idx,
1559  const size_t end_idx) {
1560  if (non_empty_idx >= total_non_empty) {
1561  // all non-empty entries has been written back
1562  entry_idx = end_idx;
1563  return;
1564  }
1565  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
1566  if (bitmap.get(local_idx, thread_idx)) {
1567  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
1568  write_functions[column_idx](rows,
1569  entry_idx,
1570  output_buffer_row_idx,
1571  column_idx,
1572  slot_idx_per_target_idx[column_idx],
1573  read_functions[column_idx]);
1574  }
1575  non_empty_idx++;
1576  }
1577  };
1578  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
1579  const size_t start_index,
1580  const size_t end_index,
1581  const size_t thread_idx) {
1582  const size_t total_non_empty = non_empty_per_thread[thread_idx];
1583  size_t non_empty_idx = 0;
1584  size_t local_idx = 0;
1586  for (size_t entry_idx = start_index; entry_idx < end_index;
1587  entry_idx++, local_idx++) {
1588  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1589  executor_->checkNonKernelTimeInterrupted())) {
1590  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1591  }
1592  do_work(
1593  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1594  }
1595  } else {
1596  for (size_t entry_idx = start_index; entry_idx < end_index;
1597  entry_idx++, local_idx++) {
1598  do_work(
1599  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1600  }
1601  }
1602  };
1603 
1604  std::vector<std::future<void>> compaction_threads;
1605  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1606  const size_t start_entry = thread_idx * size_per_thread;
1607  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1608  compaction_threads.push_back(std::async(
1609  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
1610  }
1611 
1612  try {
1613  for (auto& child : compaction_threads) {
1614  child.wait();
1615  }
1616  } catch (QueryExecutionError& e) {
1617  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
1618  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1619  }
1620  throw e;
1621  } catch (...) {
1622  throw;
1623  }
1624 }
GroupByPerfectHash
Definition: enums.h:58
#define CHECK_EQ(x, y)
Definition: Logger.h:301
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
tuple rows
Definition: report.py:114
future< Result > async(Fn &&fn, Args &&...args)
bool hasErrorCode(ErrorCode const ec) const
Definition: ErrorHandling.h:65
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< bool > &  targets_to_skip,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, multi-slot targets (e.g., AVG) are treated with the existing result set's iterations, but everything else is directly columnarized.

Definition at line 1406 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, column_buffers_, executor_, g_enable_non_kernel_time_query_interrupt, ColumnBitmap::get(), heavyai::GroupByBaselineHash, heavyai::GroupByPerfectHash, QueryExecutionError::hasErrorCode(), initAllConversionFunctions(), isDirectColumnarConversionPossible(), report::rows, target_types_, UNLIKELY, and writeBackCell().

Referenced by compactAndCopyEntries().

1416  {
1418  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1419  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1420 
1421  const auto [write_functions, read_functions] =
1422  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
1423  CHECK_EQ(write_functions.size(), num_columns);
1424  CHECK_EQ(read_functions.size(), num_columns);
1425  std::mutex write_mutex;
1426  auto do_work = [this,
1427  &bitmap,
1428  &rows,
1429  &slot_idx_per_target_idx,
1430  &global_offsets,
1431  &targets_to_skip,
1432  &num_columns,
1433  &write_mutex,
1434  &write_functions = write_functions,
1435  &read_functions = read_functions](size_t& non_empty_idx,
1436  const size_t total_non_empty,
1437  const size_t local_idx,
1438  size_t& entry_idx,
1439  const size_t thread_idx,
1440  const size_t end_idx) {
1441  if (non_empty_idx >= total_non_empty) {
1442  // all non-empty entries has been written back
1443  entry_idx = end_idx;
1444  }
1445  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
1446  if (bitmap.get(local_idx, thread_idx)) {
1447  // targets that are recovered from the result set iterators:
1448  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
1449  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
1450  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
1451  auto& type_info = target_types_[column_idx];
1452  writeBackCell(crt_row[column_idx],
1453  output_buffer_row_idx,
1454  type_info,
1455  column_buffers_[column_idx],
1456  &write_mutex);
1457  }
1458  }
1459  // targets that are copied directly without any translation/decoding from
1460  // result set
1461  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
1462  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
1463  continue;
1464  }
1465  write_functions[column_idx](rows,
1466  entry_idx,
1467  output_buffer_row_idx,
1468  column_idx,
1469  slot_idx_per_target_idx[column_idx],
1470  read_functions[column_idx]);
1471  }
1472  non_empty_idx++;
1473  }
1474  };
1475 
1476  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
1477  const size_t start_index,
1478  const size_t end_index,
1479  const size_t thread_idx) {
1480  const size_t total_non_empty = non_empty_per_thread[thread_idx];
1481  size_t non_empty_idx = 0;
1482  size_t local_idx = 0;
1484  for (size_t entry_idx = start_index; entry_idx < end_index;
1485  entry_idx++, local_idx++) {
1486  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1487  executor_->checkNonKernelTimeInterrupted())) {
1488  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1489  }
1490  do_work(
1491  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
1492  }
1493  } else {
1494  for (size_t entry_idx = start_index; entry_idx < end_index;
1495  entry_idx++, local_idx++) {
1496  do_work(
1497  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
1498  }
1499  }
1500  };
1501 
1502  std::vector<std::future<void>> compaction_threads;
1503  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1504  const size_t start_entry = thread_idx * size_per_thread;
1505  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1506  compaction_threads.push_back(std::async(
1507  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
1508  }
1509 
1510  try {
1511  for (auto& child : compaction_threads) {
1512  child.wait();
1513  }
1514  } catch (QueryExecutionError& e) {
1515  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
1516  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1517  }
1518  throw e;
1519  } catch (...) {
1520  throw;
1521  }
1522 }
GroupByPerfectHash
Definition: enums.h:58
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int8_t * > column_buffers_
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
tuple rows
Definition: report.py:114
future< Result > async(Fn &&fn, Args &&...args)
bool hasErrorCode(ErrorCode const ec) const
Definition: ErrorHandling.h:65
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
bool get(const size_t index, const size_t bank_index) const
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const SQLTypeInfo &type_info, int8_t *column_buf, std::mutex *write_mutex=nullptr)
std::shared_ptr< Executor > executor_
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::copyAllNonLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

Definition at line 1097 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, column_buffers_, isDirectColumnarConversionPossible(), heavyai::TableFunction, target_types_, and UNREACHABLE.

Referenced by materializeAllColumnsProjection(), and materializeAllColumnsTableFunction().

1100  {
1102  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
1103  // Saman: make sure when this lazy_fetch_info is empty
1104  if (lazy_fetch_info.empty()) {
1105  return true;
1106  } else {
1107  return !lazy_fetch_info[col_idx].is_lazily_fetched;
1108  }
1109  };
1110 
1111  // parallelized by assigning each column to a thread
1112  std::vector<std::future<void>> direct_copy_threads;
1113  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
1114  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
1115  CHECK(!column_buffers_[col_idx]);
1116  // The name of the method implies a copy but this is not a copy!!
1117  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
1118  } else if (is_column_non_lazily_fetched(col_idx)) {
1119  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
1121  if (rows.getColType(col_idx).usesFlatBuffer() &&
1122  target_types_[col_idx].usesFlatBuffer()) {
1123  // If both source and target result sets use FlatBuffer
1124  // layout, creating a columnar result should be using
1125  // zero-copy columnar conversion.
1126  UNREACHABLE();
1127  }
1128  direct_copy_threads.push_back(std::async(
1130  [&rows, this](const size_t column_index) {
1131  size_t column_size = rows.getColumnarBufferSize(column_index);
1132  rows.copyColumnIntoBuffer(
1133  column_index, column_buffers_[column_index], column_size);
1134  },
1135  col_idx));
1136  }
1137  }
1138 
1139  for (auto& child : direct_copy_threads) {
1140  child.wait();
1141  }
1142 }
std::vector< int8_t * > column_buffers_
#define UNREACHABLE()
Definition: Logger.h:338
TableFunction
Definition: enums.h:58
tuple rows
Definition: report.py:114
future< Result > async(Fn &&fn, Args &&...args)
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:291
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::vector<int8_t*>& ColumnarResults::getColumnBuffers ( ) const
inline

Definition at line 82 of file ColumnarResults.h.

References column_buffers_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

82 { return column_buffers_; }
std::vector< int8_t * > column_buffers_

+ Here is the caller graph for this function:

const SQLTypeInfo& ColumnarResults::getColumnType ( const int  col_id) const
inline

Definition at line 86 of file ColumnarResults.h.

References CHECK_GE, CHECK_LT, and target_types_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

86  {
87  CHECK_GE(col_id, 0);
88  CHECK_LT(static_cast<size_t>(col_id), target_types_.size());
89  return target_types_[col_id];
90  }
#define CHECK_GE(x, y)
Definition: Logger.h:306
#define CHECK_LT(x, y)
Definition: Logger.h:303
const std::vector< SQLTypeInfo > target_types_

+ Here is the caller graph for this function:

std::tuple< std::vector< ColumnarResults::WriteFunction >, std::vector< ColumnarResults::ReadFunction > > ColumnarResults::initAllConversionFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

This function goes through all target types in the output, and chooses appropriate write and read functions per target. The goal is then to simply use these functions for each row and per target. Read functions are used to read each cell's data content (particular target in a row), and write functions are used to properly write back the cell's content into the output column buffers.

Definition at line 1917 of file ColumnarResults.cpp.

References CHECK, heavyai::GroupByBaselineHash, heavyai::GroupByPerfectHash, initWriteFunctions(), and isDirectColumnarConversionPossible().

Referenced by compactAndCopyEntriesWithoutTargetSkipping(), and compactAndCopyEntriesWithTargetSkipping().

1920  {
1922  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1923  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1924 
1925  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1926  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1927  if (rows.didOutputColumnar()) {
1928  return std::make_tuple(
1929  std::move(write_functions),
1930  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1931  rows, slot_idx_per_target_idx, targets_to_skip));
1932  } else {
1933  return std::make_tuple(
1934  std::move(write_functions),
1935  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1936  rows, slot_idx_per_target_idx, targets_to_skip));
1937  }
1938  } else {
1939  if (rows.didOutputColumnar()) {
1940  return std::make_tuple(
1941  std::move(write_functions),
1942  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1943  rows, slot_idx_per_target_idx, targets_to_skip));
1944  } else {
1945  return std::make_tuple(
1946  std::move(write_functions),
1947  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1948  rows, slot_idx_per_target_idx, targets_to_skip));
1949  }
1950  }
1951 }
GroupByPerfectHash
Definition: enums.h:58
tuple rows
Definition: report.py:114
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ColumnarResults::ReadFunction > ColumnarResults::initReadFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initializes a set of read funtions to properly access the contents of the result set's storage buffer. Each particular read function is chosen based on the data type and data size used to store that target in the result set's storage buffer. These functions are then used for each row in the result set.

Definition at line 1819 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, heavyai::GroupByBaselineHash, anonymous_namespace{ColumnarResults.cpp}::invalid_read_func(), isDirectColumnarConversionPossible(), kDOUBLE, kFLOAT, target_types_, and UNREACHABLE.

1822  {
1824  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1825  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1826 
1827  std::vector<ReadFunction> read_functions;
1828  read_functions.reserve(target_types_.size());
1829 
1830  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1831  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1832  // for targets that should be skipped, we use a placeholder function that should
1833  // never be called. The CHECKs inside it make sure that never happens.
1834  read_functions.emplace_back(invalid_read_func);
1835  continue;
1836  }
1837 
1838  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1839  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1840  // for key columns only
1841  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1842  if (target_types_[target_idx].is_fp()) {
1843  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1844  switch (target_types_[target_idx].get_type()) {
1845  case kFLOAT:
1846  read_functions.emplace_back(
1847  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1848  break;
1849  case kDOUBLE:
1850  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1851  break;
1852  default:
1853  UNREACHABLE()
1854  << "Invalid data type encountered (BaselineHash, floating point key).";
1855  break;
1856  }
1857  } else {
1858  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1859  case 8:
1860  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1861  break;
1862  case 4:
1863  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1864  break;
1865  default:
1866  UNREACHABLE()
1867  << "Invalid data type encountered (BaselineHash, integer key).";
1868  }
1869  }
1870  continue;
1871  }
1872  }
1873  if (target_types_[target_idx].is_fp()) {
1874  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1875  case 8:
1876  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1877  break;
1878  case 4:
1879  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1880  break;
1881  default:
1882  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1883  break;
1884  }
1885  } else {
1886  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1887  case 8:
1888  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1889  break;
1890  case 4:
1891  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1892  break;
1893  case 2:
1894  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1895  break;
1896  case 1:
1897  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1898  break;
1899  default:
1900  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1901  break;
1902  }
1903  }
1904  }
1905  return read_functions;
1906 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define UNREACHABLE()
Definition: Logger.h:338
tuple rows
Definition: report.py:114
bool isDirectColumnarConversionPossible() const
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

std::vector< ColumnarResults::WriteFunction > ColumnarResults::initWriteFunctions ( const ResultSet rows,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initialize a set of write functions per target (i.e., column). Target types' logical size are used to categorize the correct write function per target. These functions are then used for every row in the result set.

Definition at line 1631 of file ColumnarResults.cpp.

References CHECK, heavyai::GroupByBaselineHash, heavyai::GroupByPerfectHash, isDirectColumnarConversionPossible(), run_benchmark_import::result, target_types_, and UNREACHABLE.

Referenced by initAllConversionFunctions().

1633  {
1635  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1636  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1637 
1638  std::vector<WriteFunction> result;
1639  result.reserve(target_types_.size());
1640 
1641  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1642  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1643  result.emplace_back([](const ResultSet& rows,
1644  const size_t input_buffer_entry_idx,
1645  const size_t output_buffer_entry_idx,
1646  const size_t target_idx,
1647  const size_t slot_idx,
1648  const ReadFunction& read_function) {
1649  UNREACHABLE() << "Invalid write back function used.";
1650  });
1651  continue;
1652  }
1653 
1654  if (target_types_[target_idx].is_fp()) {
1655  switch (target_types_[target_idx].get_size()) {
1656  case 8:
1657  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
1658  this,
1659  std::placeholders::_1,
1660  std::placeholders::_2,
1661  std::placeholders::_3,
1662  std::placeholders::_4,
1663  std::placeholders::_5,
1664  std::placeholders::_6));
1665  break;
1666  case 4:
1667  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
1668  this,
1669  std::placeholders::_1,
1670  std::placeholders::_2,
1671  std::placeholders::_3,
1672  std::placeholders::_4,
1673  std::placeholders::_5,
1674  std::placeholders::_6));
1675  break;
1676  default:
1677  UNREACHABLE() << "Invalid target type encountered.";
1678  break;
1679  }
1680  } else {
1681  switch (target_types_[target_idx].get_size()) {
1682  case 8:
1683  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
1684  this,
1685  std::placeholders::_1,
1686  std::placeholders::_2,
1687  std::placeholders::_3,
1688  std::placeholders::_4,
1689  std::placeholders::_5,
1690  std::placeholders::_6));
1691  break;
1692  case 4:
1693  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
1694  this,
1695  std::placeholders::_1,
1696  std::placeholders::_2,
1697  std::placeholders::_3,
1698  std::placeholders::_4,
1699  std::placeholders::_5,
1700  std::placeholders::_6));
1701  break;
1702  case 2:
1703  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
1704  this,
1705  std::placeholders::_1,
1706  std::placeholders::_2,
1707  std::placeholders::_3,
1708  std::placeholders::_4,
1709  std::placeholders::_5,
1710  std::placeholders::_6));
1711  break;
1712  case 1:
1713  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
1714  this,
1715  std::placeholders::_1,
1716  std::placeholders::_2,
1717  std::placeholders::_3,
1718  std::placeholders::_4,
1719  std::placeholders::_5,
1720  std::placeholders::_6));
1721  break;
1722  default:
1723  UNREACHABLE() << "Invalid target type encountered.";
1724  break;
1725  }
1726  }
1727  }
1728  return result;
1729 }
GroupByPerfectHash
Definition: enums.h:58
#define UNREACHABLE()
Definition: Logger.h:338
tuple rows
Definition: report.py:114
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
bool isDirectColumnarConversionPossible() const
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ColumnarResults::isDirectColumnarConversionPossible ( ) const
inline
bool ColumnarResults::isParallelConversion ( ) const
inline

Definition at line 92 of file ColumnarResults.h.

References parallel_conversion_.

Referenced by materializeAllColumnsGroupBy(), and materializeAllColumnsThroughIteration().

92 { return parallel_conversion_; }

+ Here is the caller graph for this function:

void ColumnarResults::locateAndCountEntries ( const ResultSet rows,
ColumnBitmap bitmap,
std::vector< size_t > &  non_empty_per_thread,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
) const
private

This function goes through all the keys in the result set, and count the total number of non-empty keys. It also store the location of non-empty keys in a bitmap data structure for later faster access.

Definition at line 1278 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, executor_, g_enable_non_kernel_time_query_interrupt, heavyai::GroupByBaselineHash, heavyai::GroupByPerfectHash, QueryExecutionError::hasErrorCode(), isDirectColumnarConversionPossible(), report::rows, ColumnBitmap::set(), and UNLIKELY.

Referenced by materializeAllColumnsGroupBy().

1283  {
1285  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1286  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1287  CHECK_EQ(num_threads, non_empty_per_thread.size());
1288  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
1289  const size_t local_idx,
1290  const size_t entry_idx,
1291  const size_t thread_idx) {
1292  if (!rows.isRowAtEmpty(entry_idx)) {
1293  total_non_empty++;
1294  bitmap.set(local_idx, thread_idx, true);
1295  }
1296  };
1297  auto locate_and_count_func =
1298  [&do_work, &non_empty_per_thread, this](
1299  size_t start_index, size_t end_index, size_t thread_idx) {
1300  size_t total_non_empty = 0;
1301  size_t local_idx = 0;
1303  for (size_t entry_idx = start_index; entry_idx < end_index;
1304  entry_idx++, local_idx++) {
1305  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1306  executor_->checkNonKernelTimeInterrupted())) {
1307  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1308  }
1309  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
1310  }
1311  } else {
1312  for (size_t entry_idx = start_index; entry_idx < end_index;
1313  entry_idx++, local_idx++) {
1314  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
1315  }
1316  }
1317  non_empty_per_thread[thread_idx] = total_non_empty;
1318  };
1319 
1320  std::vector<std::future<void>> conversion_threads;
1321  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1322  const size_t start_entry = thread_idx * size_per_thread;
1323  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1324  conversion_threads.push_back(std::async(
1325  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
1326  }
1327 
1328  try {
1329  for (auto& child : conversion_threads) {
1330  child.wait();
1331  }
1332  } catch (QueryExecutionError& e) {
1333  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
1334  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1335  }
1336  throw e;
1337  } catch (...) {
1338  throw;
1339  }
1340 }
GroupByPerfectHash
Definition: enums.h:58
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void set(const size_t index, const size_t bank_index, const bool val)
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
tuple rows
Definition: report.py:114
future< Result > async(Fn &&fn, Args &&...args)
bool hasErrorCode(ErrorCode const ec) const
Definition: ErrorHandling.h:65
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsDirectly ( const ResultSet rows,
const size_t  num_columns 
)
private

This function materializes all columns from the main storage and all appended storages and form a single continguous column for each output column. Depending on whether the column is lazily fetched or not, it will treat them differently.

NOTE: this function should only be used when the result set is columnar and completely compacted (e.g., in columnar projections).

Definition at line 1029 of file ColumnarResults.cpp.

References CHECK, heavyai::GroupByBaselineHash, heavyai::GroupByPerfectHash, isDirectColumnarConversionPossible(), materializeAllColumnsGroupBy(), materializeAllColumnsProjection(), materializeAllColumnsTableFunction(), heavyai::Projection, heavyai::TableFunction, and UNREACHABLE.

Referenced by ColumnarResults().

1030  {
1032  switch (rows.getQueryDescriptionType()) {
1034  materializeAllColumnsProjection(rows, num_columns);
1035  break;
1036  }
1039  break;
1040  }
1043  materializeAllColumnsGroupBy(rows, num_columns);
1044  break;
1045  }
1046  default:
1047  UNREACHABLE()
1048  << "Direct columnar conversion for this query type is not supported yet.";
1049  }
1050 }
GroupByPerfectHash
Definition: enums.h:58
void materializeAllColumnsTableFunction(const ResultSet &rows, const size_t num_columns)
#define UNREACHABLE()
Definition: Logger.h:338
Projection
Definition: enums.h:58
TableFunction
Definition: enums.h:58
tuple rows
Definition: report.py:114
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
bool isDirectColumnarConversionPossible() const
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsGroupBy ( const ResultSet rows,
const size_t  num_columns 
)
private

This function is to directly columnarize a result set for group by queries. Its main difference with the traditional alternative is that it directly reads non-empty entries from the result set, and then writes them into output column buffers, rather than using the result set's iterators.

Definition at line 1243 of file ColumnarResults.cpp.

References CHECK, compactAndCopyEntries(), cpu_threads(), heavyai::GroupByBaselineHash, heavyai::GroupByPerfectHash, isDirectColumnarConversionPossible(), isParallelConversion(), and locateAndCountEntries().

Referenced by materializeAllColumnsDirectly().

1244  {
1246  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1247  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1248 
1249  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
1250  const size_t entry_count = rows.entryCount();
1251  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
1252 
1253  // step 1: compute total non-empty elements and store a bitmap per thread
1254  std::vector<size_t> non_empty_per_thread(num_threads,
1255  0); // number of non-empty entries per thread
1256 
1257  ColumnBitmap bitmap(size_per_thread, num_threads);
1258 
1260  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
1261 
1262  // step 2: go through the generated bitmap and copy/decode corresponding entries
1263  // into the output buffer
1265  bitmap,
1266  non_empty_per_thread,
1267  num_columns,
1268  entry_count,
1269  num_threads,
1270  size_per_thread);
1271 }
GroupByPerfectHash
Definition: enums.h:58
bool isParallelConversion() const
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
tuple rows
Definition: report.py:114
bool isDirectColumnarConversionPossible() const
GroupByBaselineHash
Definition: enums.h:58
#define CHECK(condition)
Definition: Logger.h:291
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsProjection ( const ResultSet rows,
const size_t  num_columns 
)
private

This function handles materialization for two types of columns in columnar projections:

  1. for all non-lazy columns, it directly copies the results from the result set's storage into the output column buffers
  2. for all lazy fetched columns, it uses result set's iterators to decode the proper values before storing them into the output column buffers

Definition at line 1059 of file ColumnarResults.cpp.

References CHECK, copyAllNonLazyColumns(), isDirectColumnarConversionPossible(), materializeAllLazyColumns(), and heavyai::Projection.

Referenced by materializeAllColumnsDirectly().

1060  {
1061  CHECK(rows.query_mem_desc_.didOutputColumnar());
1063  (rows.query_mem_desc_.getQueryDescriptionType() ==
1065 
1066  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
1067 
1068  // We can directly copy each non-lazy column's content
1069  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
1070 
1071  // Only lazy columns are iterated through first and then materialized
1072  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
1073 }
Projection
Definition: enums.h:58
tuple rows
Definition: report.py:114
bool isDirectColumnarConversionPossible() const
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:291
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsTableFunction ( const ResultSet rows,
const size_t  num_columns 
)
private

Definition at line 1075 of file ColumnarResults.cpp.

References CHECK, copyAllNonLazyColumns(), isDirectColumnarConversionPossible(), and heavyai::TableFunction.

Referenced by materializeAllColumnsDirectly().

1076  {
1077  CHECK(rows.query_mem_desc_.didOutputColumnar());
1079  (rows.query_mem_desc_.getQueryDescriptionType() ==
1081 
1082  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
1083  // Lazy fetching is not currently allowed for table function outputs
1084  for (const auto& col_lazy_fetch_info : lazy_fetch_info) {
1085  CHECK(!col_lazy_fetch_info.is_lazily_fetched);
1086  }
1087  // We can directly copy each non-lazy column's content
1088  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
1089 }
TableFunction
Definition: enums.h:58
tuple rows
Definition: report.py:114
bool isDirectColumnarConversionPossible() const
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsThroughIteration ( const ResultSet rows,
const size_t  num_columns 
)
private

This function iterates through the result set (using the getRowAtNoTranslation and getNextRow family of functions) and writes back the results into output column buffers.

Definition at line 466 of file ColumnarResults.cpp.

References threading_serial::async(), column_buffers_, cpu_threads(), executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::hasErrorCode(), isParallelConversion(), makeIntervals(), num_rows_, report::rows, target_types_, UNLIKELY, and writeBackCell().

Referenced by ColumnarResults().

467  {
468  if (isParallelConversion()) {
469  std::atomic<size_t> row_idx{0};
470  const size_t worker_count = cpu_threads();
471  std::vector<std::future<void>> conversion_threads;
472  std::mutex write_mutex;
473  const auto do_work =
474  [num_columns, &rows, &row_idx, &write_mutex, this](const size_t i) {
475  const auto crt_row = rows.getRowAtNoTranslations(i);
476  if (!crt_row.empty()) {
477  auto cur_row_idx = row_idx.fetch_add(1);
478  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
479  auto& type_info = target_types_[col_idx];
480  writeBackCell(crt_row[col_idx],
481  cur_row_idx,
482  type_info,
483  column_buffers_[col_idx],
484  &write_mutex);
485  }
486  }
487  };
488  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
489  conversion_threads.push_back(std::async(
491  [&do_work, this](const size_t start, const size_t end) {
493  size_t local_idx = 0;
494  for (size_t i = start; i < end; ++i, ++local_idx) {
495  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
496  executor_->checkNonKernelTimeInterrupted())) {
497  throw QueryExecutionError(ErrorCode::INTERRUPTED);
498  }
499  do_work(i);
500  }
501  } else {
502  for (size_t i = start; i < end; ++i) {
503  do_work(i);
504  }
505  }
506  },
507  interval.begin,
508  interval.end));
509  }
510 
511  try {
512  for (auto& child : conversion_threads) {
513  child.wait();
514  }
515  } catch (QueryExecutionError& e) {
516  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
517  throw QueryExecutionError(ErrorCode::INTERRUPTED);
518  }
519  throw e;
520  } catch (...) {
521  throw;
522  }
523 
524  num_rows_ = row_idx;
525  rows.setCachedRowCount(num_rows_);
526  return;
527  }
528  bool done = false;
529  size_t row_idx = 0;
530  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
531  const auto crt_row = rows.getNextRow(false, false);
532  if (crt_row.empty()) {
533  done = true;
534  return;
535  }
536  for (size_t i = 0; i < num_columns; ++i) {
537  auto& type_info = target_types_[i];
538  writeBackCell(crt_row[i], row_idx, type_info, column_buffers_[i]);
539  }
540  ++row_idx;
541  };
543  while (!done) {
544  if (UNLIKELY((row_idx & 0xFFFF) == 0 &&
545  executor_->checkNonKernelTimeInterrupted())) {
546  throw QueryExecutionError(ErrorCode::INTERRUPTED);
547  }
548  do_work();
549  }
550  } else {
551  while (!done) {
552  do_work();
553  }
554  }
555 
556  rows.moveToBegin();
557 }
bool isParallelConversion() const
std::vector< int8_t * > column_buffers_
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
tuple rows
Definition: report.py:114
future< Result > async(Fn &&fn, Args &&...args)
bool hasErrorCode(ErrorCode const ec) const
Definition: ErrorHandling.h:65
#define UNLIKELY(x)
Definition: likely.h:25
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const SQLTypeInfo &type_info, int8_t *column_buf, std::mutex *write_mutex=nullptr)
std::shared_ptr< Executor > executor_
int cpu_threads()
Definition: thread_count.h:25
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

For all lazy fetched columns, we should iterate through the column's content and properly materialize it.

This function is parallelized through dividing total rows among all existing threads. Since there's no invalid element in the result set (e.g., columnar projections), the output buffer will have as many rows as there are in the result set, removing the need for atomicly incrementing the output buffer position.

Definition at line 1153 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, column_buffers_, cpu_threads(), executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::hasErrorCode(), isDirectColumnarConversionPossible(), makeIntervals(), report::rows, heavyai::TableFunction, target_types_, UNLIKELY, result_set::use_parallel_algorithms(), and writeBackCell().

Referenced by materializeAllColumnsProjection().

1156  {
1158  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
1160  std::mutex write_mutex;
1161  const auto do_work_just_lazy_columns = [num_columns, &rows, &write_mutex, this](
1162  const size_t row_idx,
1163  const std::vector<bool>& targets_to_skip) {
1164  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
1165  for (size_t i = 0; i < num_columns; ++i) {
1166  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
1167  auto& type_info = target_types_[i];
1168  writeBackCell(crt_row[i], row_idx, type_info, column_buffers_[i], &write_mutex);
1169  }
1170  }
1171  };
1172 
1173  const auto contains_lazy_fetched_column =
1174  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
1175  for (auto& col_info : lazy_fetch_info) {
1176  if (col_info.is_lazily_fetched) {
1177  return true;
1178  }
1179  }
1180  return false;
1181  };
1182 
1183  // parallelized by assigning a chunk of rows to each thread)
1184  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
1185  if (contains_lazy_fetched_column(lazy_fetch_info)) {
1186  const size_t worker_count =
1188  std::vector<std::future<void>> conversion_threads;
1189  std::vector<bool> targets_to_skip;
1190  if (skip_non_lazy_columns) {
1191  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
1192  targets_to_skip.reserve(num_columns);
1193  for (size_t i = 0; i < num_columns; i++) {
1194  // we process lazy columns (i.e., skip non-lazy columns)
1195  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
1196  }
1197  }
1198  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
1199  conversion_threads.push_back(std::async(
1201  [&do_work_just_lazy_columns, &targets_to_skip, this](const size_t start,
1202  const size_t end) {
1204  size_t local_idx = 0;
1205  for (size_t i = start; i < end; ++i, ++local_idx) {
1206  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1207  executor_->checkNonKernelTimeInterrupted())) {
1208  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1209  }
1210  do_work_just_lazy_columns(i, targets_to_skip);
1211  }
1212  } else {
1213  for (size_t i = start; i < end; ++i) {
1214  do_work_just_lazy_columns(i, targets_to_skip);
1215  }
1216  }
1217  },
1218  interval.begin,
1219  interval.end));
1220  }
1221 
1222  try {
1223  for (auto& child : conversion_threads) {
1224  child.wait();
1225  }
1226  } catch (QueryExecutionError& e) {
1227  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
1228  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1229  }
1230  throw e;
1231  } catch (...) {
1232  throw;
1233  }
1234  }
1235 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int8_t * > column_buffers_
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1600
TableFunction
Definition: enums.h:58
tuple rows
Definition: report.py:114
future< Result > async(Fn &&fn, Args &&...args)
bool hasErrorCode(ErrorCode const ec) const
Definition: ErrorHandling.h:65
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const SQLTypeInfo &type_info, int8_t *column_buf, std::mutex *write_mutex=nullptr)
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:291
int cpu_threads()
Definition: thread_count.h:25
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< ColumnarResults > ColumnarResults::mergeResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const std::vector< std::unique_ptr< ColumnarResults >> &  sub_results 
)
static

Definition at line 418 of file ColumnarResults.cpp.

References gpu_enabled::accumulate(), CHECK_EQ, ColumnarResults(), logger::init(), padded_target_sizes_, run_benchmark_import::result, and target_types_.

Referenced by ColumnFetcher::getAllTableColumnFragments().

420  {
421  // TODO: this method requires a safe guard when trying to merge
422  // columns using FlatBuffer layout.
423  if (sub_results.empty()) {
424  return nullptr;
425  }
426  const auto total_row_count = std::accumulate(
427  sub_results.begin(),
428  sub_results.end(),
429  size_t(0),
430  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
431  return init + result->size();
432  });
433  std::unique_ptr<ColumnarResults> merged_results(
434  new ColumnarResults(total_row_count,
435  sub_results[0]->target_types_,
436  sub_results[0]->padded_target_sizes_));
437  const auto col_count = sub_results[0]->column_buffers_.size();
438  const auto nonempty_it = std::find_if(
439  sub_results.begin(),
440  sub_results.end(),
441  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
442  if (nonempty_it == sub_results.end()) {
443  return nullptr;
444  }
445  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
446  const auto byte_width = merged_results->padded_target_sizes_[col_idx];
447  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
448  merged_results->column_buffers_.push_back(write_ptr);
449  for (auto& rs : sub_results) {
450  CHECK_EQ(col_count, rs->column_buffers_.size());
451  if (!rs->size()) {
452  continue;
453  }
454  CHECK_EQ(byte_width, rs->padded_target_sizes_[col_idx]);
455  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
456  write_ptr += rs->size() * byte_width;
457  }
458  }
459  return merged_results;
460 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< size_t > padded_target_sizes_
const std::vector< SQLTypeInfo > target_types_
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const size_t ColumnarResults::size ( ) const
inline

Definition at line 84 of file ColumnarResults.h.

References num_rows_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

84 { return num_rows_; }

+ Here is the caller graph for this function:

void ColumnarResults::writeBackCell ( const TargetValue col_val,
const size_t  row_idx,
const SQLTypeInfo type_info,
int8_t *  column_buf,
std::mutex *  write_mutex = nullptr 
)
inlineprivate

Definition at line 869 of file ColumnarResults.cpp.

References SQLTypeInfoLite::BIGINT, SQLTypeInfoLite::BOOLEAN, CHECK, SQLTypeInfoLite::DOUBLE, SQLTypeInfoLite::FLOAT, SQLTypeInfo::get_compression(), SQLTypeInfo::get_size(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), SQLTypeInfoLite::INT, SQLTypeInfo::is_array(), SQLTypeInfo::is_geometry(), SQLTypeInfo::is_text_encoding_none(), FlatBufferManager::isFlatBuffer(), kENCODING_NONE, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, kTEXT, SQLTypeInfoLite::SMALLINT, SQLTypeInfoLite::subtype, SQLTypeInfoLite::TEXT, SQLTypeInfoLite::TINYINT, anonymous_namespace{ColumnarResults.cpp}::toBuffer(), SQLTypeInfo::toString(), UNREACHABLE, SQLTypeInfo::usesFlatBuffer(), writeBackCellGeoNestedArray(), writeBackCellGeoPoint(), and writeBackCellTextEncodingNone().

Referenced by compactAndCopyEntriesWithTargetSkipping(), materializeAllColumnsThroughIteration(), and materializeAllLazyColumns().

873  {
874  if (!type_info.usesFlatBuffer()) {
875  toBuffer(col_val, type_info, column_buf + type_info.get_size() * row_idx);
876  return;
877  }
879  FlatBufferManager m{column_buf};
880  if (type_info.is_geometry() && type_info.get_type() == kPOINT) {
881  writeBackCellGeoPoint(m, row_idx, type_info, col_val, write_mutex);
882  return;
883  }
884  const SQLTypeInfoLite* ti_lite =
885  reinterpret_cast<const SQLTypeInfoLite*>(m.get_user_data_buffer());
886  CHECK(ti_lite);
887  if (type_info.is_array()) {
888  if (type_info.get_subtype() == kTEXT &&
889  type_info.get_compression() == kENCODING_NONE) {
890  throw std::runtime_error(
891  "Column<Array<TextEncodedNone>> support not implemented yet (writeBackCell)");
892  }
893  switch (ti_lite->subtype) {
895  writeBackCellArrayScalar<double, double>(m, row_idx, col_val, write_mutex);
896  break;
898  writeBackCellArrayScalar<float, float>(m, row_idx, col_val, write_mutex);
899  break;
902  writeBackCellArrayScalar<int8_t, int64_t>(m, row_idx, col_val, write_mutex);
903  break;
905  writeBackCellArrayScalar<int16_t, int64_t>(m, row_idx, col_val, write_mutex);
906  break;
909  writeBackCellArrayScalar<int32_t, int64_t>(m, row_idx, col_val, write_mutex);
910  break;
912  writeBackCellArrayScalar<int64_t, int64_t>(m, row_idx, col_val, write_mutex);
913  break;
914  default:
915  UNREACHABLE();
916  }
917  } else if (type_info.is_text_encoding_none()) {
918  writeBackCellTextEncodingNone(m, row_idx, col_val, write_mutex);
919  } else if (type_info.is_geometry()) {
920  switch (type_info.get_type()) {
921  case kLINESTRING: {
926  /*is_multi=*/false>(
927  m, row_idx, type_info, col_val, write_mutex);
928  break;
929  }
930  case kPOLYGON: {
935  /*is_multi=*/false>(
936  m, row_idx, type_info, col_val, write_mutex);
937  break;
938  }
939  case kMULTIPOINT: {
944  /*is_multi=*/true>(
945  m, row_idx, type_info, col_val, write_mutex);
946  break;
947  }
948  case kMULTILINESTRING: {
953  /*is_multi=*/true>(
954  m, row_idx, type_info, col_val, write_mutex);
955  break;
956  }
957  case kMULTIPOLYGON: {
962  /*is_true=*/false>(
963  m, row_idx, type_info, col_val, write_mutex);
964  break;
965  }
966  default:
967  UNREACHABLE() << "writeBackCell not implemented for " << type_info.toString();
968  }
969  } else {
970  UNREACHABLE();
971  }
972 }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
void writeBackCellGeoPoint(FlatBufferManager &m, const size_t row_idx, const SQLTypeInfo &type_info, const TargetValue &col_val, std::mutex *write_mutex)
void writeBackCellTextEncodingNone(FlatBufferManager &m, const size_t row_idx, const TargetValue &col_val, std::mutex *write_mutex)
SQLTypes subtype
Definition: sqltypes_lite.h:54
void writeBackCellGeoNestedArray(FlatBufferManager &m, const int64_t index, const SQLTypeInfo &ti, const TargetValue &col_val, std::mutex *write_mutex)
#define UNREACHABLE()
Definition: Logger.h:338
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
bool usesFlatBuffer() const
Definition: sqltypes.h:1083
std::string toString() const
Definition: sqltypes.h:525
Definition: sqltypes.h:79
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
int64_t toBuffer(const TargetValue &col_val, const SQLTypeInfo &type_info, int8_t *buf)
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:597
bool is_text_encoding_none() const
Definition: sqltypes.h:614
HOST static DEVICE bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:528
bool is_array() const
Definition: sqltypes.h:585

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename DATA_TYPE >
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

A set of write functions to be used to directly write into final column_buffers_. The read_from_function is used to read from the input result set's storage NOTE: currently only used for direct columnarizations

Definition at line 980 of file ColumnarResults.cpp.

References column_buffers_, anonymous_namespace{ColumnarResults.cpp}::fixed_encoding_nullable_val(), and target_types_.

985  {
986  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
987  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
988  target_types_[target_idx]));
989  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
990  val;
991 }
std::vector< int8_t * > column_buffers_
tuple rows
Definition: report.py:114
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 994 of file ColumnarResults.cpp.

999  {
1000  const int32_t ival =
1001  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
1002  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
1003  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
1004 }
std::vector< int8_t * > column_buffers_
tuple rows
Definition: report.py:114
template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 1007 of file ColumnarResults.cpp.

1013  {
1014  const int64_t ival =
1015  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
1016  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
1017  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
1018 }
std::vector< int8_t * > column_buffers_
tuple rows
Definition: report.py:114

Member Data Documentation

bool ColumnarResults::direct_columnar_conversion_
private

Definition at line 205 of file ColumnarResults.h.

Referenced by isDirectColumnarConversionPossible().

size_t ColumnarResults::num_rows_
protected

Definition at line 111 of file ColumnarResults.h.

Referenced by ColumnarResults(), materializeAllColumnsThroughIteration(), and size().

std::vector<size_t> ColumnarResults::padded_target_sizes_
private

Definition at line 209 of file ColumnarResults.h.

Referenced by ColumnarResults(), and mergeResults().

bool ColumnarResults::parallel_conversion_
private

Definition at line 204 of file ColumnarResults.h.

Referenced by isParallelConversion().

size_t ColumnarResults::thread_idx_
private

Definition at line 207 of file ColumnarResults.h.

Referenced by ColumnarResults().


The documentation for this class was generated from the following files: