OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 #include "ErrorHandling.h"
20 #include "Execute.h"
21 #include "Geospatial/Compression.h"
22 #include "Geospatial/Types.h"
23 #include "Shared/Intervals.h"
24 #include "Shared/likely.h"
25 #include "Shared/sqltypes.h"
26 #include "Shared/thread_count.h"
27 
28 #include <tbb/parallel_reduce.h>
29 #include <atomic>
30 #include <future>
31 #include <numeric>
32 
33 namespace {
34 
35 inline int64_t fixed_encoding_nullable_val(const int64_t val,
36  const SQLTypeInfo& type_info) {
37  if (type_info.get_compression() != kENCODING_NONE) {
38  CHECK(type_info.get_compression() == kENCODING_FIXED ||
39  type_info.get_compression() == kENCODING_DICT);
40  auto logical_ti = get_logical_type_info(type_info);
41  if (val == inline_int_null_val(logical_ti)) {
42  return inline_fixed_encoding_null_val(type_info);
43  }
44  }
45  return val;
46 }
47 
48 std::vector<size_t> get_padded_target_sizes(
49  const ResultSet& rows,
50  const std::vector<SQLTypeInfo>& target_types) {
51  std::vector<size_t> padded_target_sizes;
52  // We have to check that the result set is valid as one entry point
53  // to columnar results constructs effectively a fake result set.
54  // In these cases it should be safe to assume that we can use the type
55  // target widths
56  if (!rows.hasValidBuffer() ||
57  rows.getQueryMemDesc().getColCount() < target_types.size()) {
58  for (const auto& target_type : target_types) {
59  padded_target_sizes.emplace_back(target_type.get_size());
60  }
61  return padded_target_sizes;
62  }
63 
64  // If here we have a valid result set, so use it's QMD padded widths
65  const auto col_context = rows.getQueryMemDesc().getColSlotContext();
66  for (size_t col_idx = 0; col_idx < target_types.size(); col_idx++) {
67  // Lazy fetch columns will have 0 as a padded with, so use the type's
68  // logical width for those
69  const auto idx = col_context.getSlotsForCol(col_idx).front();
70  const size_t padded_slot_width =
71  static_cast<size_t>(rows.getPaddedSlotWidthBytes(idx));
72  padded_target_sizes.emplace_back(
73  padded_slot_width == 0UL ? target_types[col_idx].get_size() : padded_slot_width);
74  }
75  return padded_target_sizes;
76 }
77 
78 int64_t toBuffer(const TargetValue& col_val, const SQLTypeInfo& type_info, int8_t* buf) {
79  CHECK(!type_info.is_geometry());
80  if (type_info.is_array()) {
81  const auto array_col_val = boost::get<ArrayTargetValue>(&col_val);
82  CHECK(array_col_val);
83  const auto& vec = array_col_val->get();
84  int64_t offset = 0;
85  const auto elem_type_info = type_info.get_elem_type();
86  for (const auto& item : vec) {
87  offset += toBuffer(item, elem_type_info, buf + offset);
88  }
89  return offset;
90  } else if (type_info.is_fp()) {
91  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
92  switch (type_info.get_type()) {
93  case kFLOAT: {
94  auto float_p = boost::get<float>(scalar_col_val);
95  *((float*)buf) = static_cast<float>(*float_p);
96  return 4;
97  }
98  case kDOUBLE: {
99  auto double_p = boost::get<double>(scalar_col_val);
100  *((double*)buf) = static_cast<double>(*double_p);
101  return 8;
102  }
103  default:
104  UNREACHABLE();
105  }
106  } else {
107  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
108  CHECK(scalar_col_val);
109  auto i64_p = boost::get<int64_t>(scalar_col_val);
110  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
111  switch (type_info.get_size()) {
112  case 1:
113  *buf = static_cast<int8_t>(val);
114  return 1;
115  case 2:
116  *((int16_t*)buf) = static_cast<int16_t>(val);
117  return 2;
118  case 4:
119  *((int32_t*)buf) = static_cast<int32_t>(val);
120  return 4;
121  case 8:
122  *((int64_t*)buf) = static_cast<int64_t>(val);
123  return 8;
124  default:
125  UNREACHABLE();
126  }
127  }
128  return 0;
129 }
130 
131 /*
132  computeTotalNofValuesForColumn<Type> functions compute the total
133  number of values that exists in a result set. The total number of
134  values defines the maximal number of values that a FlatBuffer
135  storage will be able to hold.
136 
137  A "value" is defined as the largest fixed-size element in a column
138  structure.
139 
140  For instance, for a column of scalars or a column of
141  (varlen) arrays of scalars, the "value" is a scalar value. For a
142  column of geo-types (points, multipoints, etc), the "value" is a
143  Point (a Point is a two-tuple of coordinate values). For a column
144  of TextEncodingNone and column of arrays of TextEncodingNone, the
145  "value" is a byte value.
146  */
147 
148 int64_t computeTotalNofValuesForColumnArray(const ResultSet& rows,
149  const size_t column_idx) {
150  return tbb::parallel_reduce(
151  tbb::blocked_range<int64_t>(0, rows.entryCount()),
152  static_cast<int64_t>(0),
153  [&](tbb::blocked_range<int64_t> r, int64_t running_count) {
154  for (int i = r.begin(); i < r.end(); ++i) {
155  const auto crt_row = rows.getRowAtNoTranslations(i);
156  if (crt_row.empty()) {
157  continue;
158  }
159  const auto arr_tv = boost::get<ArrayTargetValue>(&crt_row[column_idx]);
160  CHECK(arr_tv);
161  if (arr_tv->is_initialized()) {
162  const auto& vec = arr_tv->get();
163  running_count += vec.size();
164  }
165  }
166  return running_count;
167  },
168  std::plus<int64_t>());
169 }
170 
171 template <typename TargetValue, typename TargetValuePtr>
173  const SQLTypeInfo& ti,
174  const size_t column_idx) {
175  return tbb::parallel_reduce(
176  tbb::blocked_range<int64_t>(0, rows.entryCount()),
177  static_cast<int64_t>(0),
178  [&](tbb::blocked_range<int64_t> r, int64_t running_count) {
179  for (int i = r.begin(); i < r.end(); ++i) {
180  const auto crt_row = rows.getRowAtNoTranslations(i);
181  if (crt_row.empty()) {
182  continue;
183  }
184  if (const auto tv = boost::get<ScalarTargetValue>(&crt_row[column_idx])) {
185  const auto ns = boost::get<NullableString>(tv);
186  CHECK(ns);
187  const auto s_ptr = boost::get<std::string>(ns);
188  if (s_ptr) {
189  // We count the number of commas in WKT representation
190  // (e.g. POLYGON ((0 0,4 0,4 4,0 4,0 0),(1 1,1 2,2 2,2
191  // 1,1 1))) to get the number of points it contains.
192  // This method is usable for any geo type.
193  running_count += std::count(s_ptr->begin(), s_ptr->end(), ',') + 1;
194  }
195  } else if (const auto tv =
196  boost::get<GeoTargetValuePtr>(&crt_row[column_idx])) {
197  const auto s = boost::get<TargetValuePtr>(tv);
198  CHECK(s);
199  VarlenDatum* d = s->coords_data.get();
200  if (d != nullptr) {
201  running_count +=
202  d->length /
203  (ti.get_compression() == kENCODING_GEOINT ? sizeof(int32_t)
204  : sizeof(double)) /
205  2;
206  } // else s is NULL
207  } else if (const auto tv = boost::get<GeoTargetValue>(&crt_row[column_idx])) {
208  if (tv->get_ptr() != nullptr) {
209  const auto s = boost::get<TargetValue>(tv->get());
210  std::vector<double>* d = s.coords.get();
211  CHECK(d);
212  running_count += d->size();
213  } // else s is NULL
214  } else {
215  UNREACHABLE();
216  }
217  }
218  return running_count;
219  },
220  std::plus<int64_t>());
221 }
222 
224  const size_t column_idx) {
225  return tbb::parallel_reduce(
226  tbb::blocked_range<int64_t>(0, rows.entryCount()),
227  static_cast<int64_t>(0),
228  [&](tbb::blocked_range<int64_t> r, int64_t running_count) {
229  for (int i = r.begin(); i < r.end(); ++i) {
230  // Apparently, ResultSet permutation vector may be sparse
231  // (len(permutation) > entryCount), so we cannot ignore the
232  // permutation vector when iterating over all entries.
233  const auto crt_row = rows.getRowAtNoTranslations(i);
234  if (crt_row.empty()) {
235  continue;
236  }
237  const auto col_val = crt_row[column_idx];
238  if (const auto tv = boost::get<ScalarTargetValue>(&col_val)) {
239  const auto ns = boost::get<NullableString>(tv);
240  CHECK(ns);
241  const auto s_ptr = boost::get<std::string>(ns);
242  if (s_ptr) {
243  running_count += s_ptr->size();
244  }
245  } else {
246  UNREACHABLE();
247  }
248  }
249  return running_count;
250  },
251  std::plus<int64_t>());
252 }
253 
254 } // namespace
255 
256 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
257  const ResultSet& rows,
258  const size_t num_columns,
259  const std::vector<SQLTypeInfo>& target_types,
260  const size_t executor_id,
261  const size_t thread_idx,
262  const bool is_parallel_execution_enforced)
263  : column_buffers_(num_columns)
264  , num_rows_(result_set::use_parallel_algorithms(rows) ||
265  rows.isDirectColumnarConversionPossible()
266  ? rows.entryCount()
267  : rows.rowCount())
268  , target_types_(target_types)
269  , parallel_conversion_(is_parallel_execution_enforced
270  ? true
271  : result_set::use_parallel_algorithms(rows))
272  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
273  , thread_idx_(thread_idx)
274  , padded_target_sizes_(get_padded_target_sizes(rows, target_types)) {
275  auto timer = DEBUG_TIMER(__func__);
276  column_buffers_.resize(num_columns);
277  executor_ = Executor::getExecutor(executor_id);
278  CHECK(executor_);
279  CHECK_EQ(padded_target_sizes_.size(), target_types.size());
280 
281  for (size_t i = 0; i < num_columns; ++i) {
282  const auto& src_ti = rows.getColType(i);
283  // ti is initialized in columnarize_result() function in
284  // ColumnFetcher.cpp and it may differ from src_ti with respect to
285  // uses_flatbuffer attribute
286  const auto& ti = target_types_[i];
287 
288  if (rows.isZeroCopyColumnarConversionPossible(i)) {
289  CHECK_EQ(ti.usesFlatBuffer(), src_ti.usesFlatBuffer());
290  // The column buffer will be assigned in
291  // ColumnarResults::copyAllNonLazyColumns.
292  column_buffers_[i] = nullptr;
293  continue;
294  }
295  CHECK(!(src_ti.usesFlatBuffer() && ti.usesFlatBuffer()));
296  // When the source result set uses FlatBuffer layout, it must
297  // support zero-copy columnar conversion. Otherwise, the source
298  // result will be columnarized according to ti.usesFlatBuffer()
299  // state that is set in columnarize_result function in
300  // ColumnFetcher.cpp.
301  if (src_ti.usesFlatBuffer() && ti.usesFlatBuffer()) {
302  // If both source and target result sets use FlatBuffer layout,
303  // creating a columnar result should be using zero-copy columnar
304  // conversion.
305  UNREACHABLE();
306  } else if (ti.usesFlatBuffer()) {
307  int64_t values_count = -1;
308  switch (ti.get_type()) {
309  case kARRAY:
310  if (ti.get_subtype() == kTEXT && ti.get_compression() == kENCODING_NONE) {
311  throw std::runtime_error(
312  "Column<Array<TextEncodedNone>> support not implemented yet "
313  "(ColumnarResults)");
314  } else {
315  values_count = computeTotalNofValuesForColumnArray(rows, i);
316  }
317  break;
318  case kPOINT:
319  values_count = num_rows_;
320  break;
321  case kLINESTRING:
322  values_count =
325  rows, ti, i);
326  break;
327  case kPOLYGON:
328  values_count =
330  GeoPolyTargetValuePtr>(rows, ti, i);
331  break;
332  case kMULTIPOINT:
333  values_count =
336  rows, ti, i);
337  break;
338  case kMULTILINESTRING:
339  values_count =
342  rows, ti, i);
343  break;
344  case kMULTIPOLYGON:
345  values_count =
348  rows, ti, i);
349  break;
350  case kTEXT:
351  if (ti.get_compression() == kENCODING_NONE) {
352  values_count = computeTotalNofValuesForColumnTextEncodingNone(rows, i);
353  break;
354  }
355  if (ti.get_compression() == kENCODING_DICT) {
356  values_count = num_rows_;
357  break;
358  }
359  default:
360  UNREACHABLE() << "computing number of values not implemented for "
361  << ti.toString();
362  }
363  // TODO: include sizes count to optimize flatbuffer size
364  const int64_t flatbuffer_size = getFlatBufferSize(num_rows_, values_count, ti);
365  column_buffers_[i] = row_set_mem_owner->allocate(flatbuffer_size, thread_idx_);
367  initializeFlatBuffer(m, num_rows_, values_count, ti);
368  // The column buffer will be initialized either directly or
369  // through iteration.
370  // TODO: implement QE-808 resolution here.
371  } else {
372  if (ti.is_varlen()) {
374  }
375  // The column buffer will be initialized either directly or
376  // through iteration.
377  column_buffers_[i] =
378  row_set_mem_owner->allocate(num_rows_ * padded_target_sizes_[i], thread_idx_);
379  }
380  }
381 
382  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
383  materializeAllColumnsDirectly(rows, num_columns);
384  } else {
385  materializeAllColumnsThroughIteration(rows, num_columns);
386  }
387 }
388 
389 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
390  const int8_t* one_col_buffer,
391  const size_t num_rows,
392  const SQLTypeInfo& target_type,
393  const size_t executor_id,
394  const size_t thread_idx)
395  : column_buffers_(1)
396  , num_rows_(num_rows)
397  , target_types_{target_type}
398  , parallel_conversion_(false)
399  , direct_columnar_conversion_(false)
400  , thread_idx_(thread_idx) {
401  auto timer = DEBUG_TIMER(__func__);
402  const bool is_varlen =
403  target_type.is_array() ||
404  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
405  target_type.is_geometry();
406  if (is_varlen) {
408  }
409  executor_ = Executor::getExecutor(executor_id);
410  padded_target_sizes_.emplace_back(target_type.get_size());
411  CHECK(executor_);
412  const auto buf_size = num_rows * target_type.get_size();
413  column_buffers_[0] =
414  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
415  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
416 }
417 
418 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
419  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
420  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
421  // TODO: this method requires a safe guard when trying to merge
422  // columns using FlatBuffer layout.
423  if (sub_results.empty()) {
424  return nullptr;
425  }
426  const auto total_row_count = std::accumulate(
427  sub_results.begin(),
428  sub_results.end(),
429  size_t(0),
430  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
431  return init + result->size();
432  });
433  std::unique_ptr<ColumnarResults> merged_results(
434  new ColumnarResults(total_row_count,
435  sub_results[0]->target_types_,
436  sub_results[0]->padded_target_sizes_));
437  const auto col_count = sub_results[0]->column_buffers_.size();
438  const auto nonempty_it = std::find_if(
439  sub_results.begin(),
440  sub_results.end(),
441  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
442  if (nonempty_it == sub_results.end()) {
443  return nullptr;
444  }
445  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
446  const auto byte_width = merged_results->padded_target_sizes_[col_idx];
447  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
448  merged_results->column_buffers_.push_back(write_ptr);
449  for (auto& rs : sub_results) {
450  CHECK_EQ(col_count, rs->column_buffers_.size());
451  if (!rs->size()) {
452  continue;
453  }
454  CHECK_EQ(byte_width, rs->padded_target_sizes_[col_idx]);
455  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
456  write_ptr += rs->size() * byte_width;
457  }
458  }
459  return merged_results;
460 }
461 
467  const size_t num_columns) {
468  if (isParallelConversion()) {
469  std::atomic<size_t> row_idx{0};
470  const size_t worker_count = cpu_threads();
471  std::vector<std::future<void>> conversion_threads;
472  std::mutex write_mutex;
473  const auto do_work =
474  [num_columns, &rows, &row_idx, &write_mutex, this](const size_t i) {
475  const auto crt_row = rows.getRowAtNoTranslations(i);
476  if (!crt_row.empty()) {
477  auto cur_row_idx = row_idx.fetch_add(1);
478  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
479  auto& type_info = target_types_[col_idx];
480  writeBackCell(crt_row[col_idx],
481  cur_row_idx,
482  type_info,
483  column_buffers_[col_idx],
484  &write_mutex);
485  }
486  }
487  };
488  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
489  conversion_threads.push_back(std::async(
491  [&do_work, this](const size_t start, const size_t end) {
493  size_t local_idx = 0;
494  for (size_t i = start; i < end; ++i, ++local_idx) {
495  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
496  executor_->checkNonKernelTimeInterrupted())) {
497  throw QueryExecutionError(ErrorCode::INTERRUPTED);
498  }
499  do_work(i);
500  }
501  } else {
502  for (size_t i = start; i < end; ++i) {
503  do_work(i);
504  }
505  }
506  },
507  interval.begin,
508  interval.end));
509  }
510 
511  try {
512  for (auto& child : conversion_threads) {
513  child.wait();
514  }
515  } catch (QueryExecutionError& e) {
516  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
517  throw QueryExecutionError(ErrorCode::INTERRUPTED);
518  }
519  throw e;
520  } catch (...) {
521  throw;
522  }
523 
524  num_rows_ = row_idx;
525  rows.setCachedRowCount(num_rows_);
526  return;
527  }
528  bool done = false;
529  size_t row_idx = 0;
530  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
531  const auto crt_row = rows.getNextRow(false, false);
532  if (crt_row.empty()) {
533  done = true;
534  return;
535  }
536  for (size_t i = 0; i < num_columns; ++i) {
537  auto& type_info = target_types_[i];
538  writeBackCell(crt_row[i], row_idx, type_info, column_buffers_[i]);
539  }
540  ++row_idx;
541  };
543  while (!done) {
544  if (UNLIKELY((row_idx & 0xFFFF) == 0 &&
545  executor_->checkNonKernelTimeInterrupted())) {
546  throw QueryExecutionError(ErrorCode::INTERRUPTED);
547  }
548  do_work();
549  }
550  } else {
551  while (!done) {
552  do_work();
553  }
554  }
555 
556  rows.moveToBegin();
557 }
558 
559 template <size_t NDIM,
560  typename GeospatialGeoType,
561  typename GeoTypeTargetValue,
562  typename GeoTypeTargetValuePtr,
563  bool is_multi>
565  const int64_t index,
566  const SQLTypeInfo& ti,
567  const TargetValue& col_val,
568  std::mutex* write_mutex) {
569  const SQLTypeInfoLite* ti_lite =
570  reinterpret_cast<const SQLTypeInfoLite*>(m.get_user_data_buffer());
571  CHECK(ti_lite);
572  if (ti_lite->is_geoint()) {
574  } else {
576  }
577  FlatBufferManager::Status status{};
578  if (const auto tv = boost::get<ScalarTargetValue>(&col_val)) {
579  const auto ns = boost::get<NullableString>(tv);
580  CHECK(ns);
581  const auto s_ptr = boost::get<std::string>(ns);
582  if (s_ptr == nullptr || *s_ptr == "NULL") {
583  auto lock_scope =
584  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
585  : std::unique_lock<std::mutex>(*write_mutex));
586  status = m.setNull(index);
587  } else {
588  std::vector<double> coords;
589  std::vector<double> bounds;
590  std::vector<int32_t> ring_sizes;
591  std::vector<int32_t> poly_rings;
592  int64_t approx_nof_coords = 2 * std::count(s_ptr->begin(), s_ptr->end(), ',');
593  coords.reserve(approx_nof_coords);
594  bounds.reserve(4);
595  const auto gdal_wkt_ls = GeospatialGeoType(*s_ptr);
596  if constexpr (NDIM == 1) {
597  gdal_wkt_ls.getColumns(coords, bounds);
598  } else if constexpr (NDIM == 2) {
599  int64_t approx_nof_rings = std::count(s_ptr->begin(), s_ptr->end(), '(') - 1;
600  ring_sizes.reserve(approx_nof_rings);
601  gdal_wkt_ls.getColumns(coords, ring_sizes, bounds);
602  } else if constexpr (NDIM == 3) {
603  int64_t approx_nof_rings = std::count(s_ptr->begin(), s_ptr->end(), '(') - 1;
604  ring_sizes.reserve(approx_nof_rings);
605  poly_rings.reserve(approx_nof_rings);
606  gdal_wkt_ls.getColumns(coords, ring_sizes, poly_rings, bounds);
607  } else {
608  UNREACHABLE();
609  }
610  const std::vector<uint8_t> compressed_coords =
611  Geospatial::compress_coords(coords, ti);
612  {
613  auto lock_scope =
614  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
615  : std::unique_lock<std::mutex>(*write_mutex));
616  if constexpr (NDIM == 1) {
617  status = m.setItem(index, compressed_coords);
618  } else if constexpr (NDIM == 2) {
619  status = m.setItem(index, compressed_coords, ring_sizes);
620  } else if constexpr (NDIM == 3) {
621  status = m.setItem(index, compressed_coords, ring_sizes, poly_rings);
622  } else {
623  UNREACHABLE();
624  }
625  }
626  }
627  } else if (const auto tv = boost::get<GeoTargetValuePtr>(&col_val)) {
628  const auto s = boost::get<GeoTypeTargetValuePtr>(tv);
629  CHECK(s);
630  if (s->coords_data == nullptr || s->coords_data->pointer == nullptr) {
631  status = m.setNull(index);
632  } else {
633  const VarlenDatum* d = s->coords_data.get();
634  CHECK(d);
635  CHECK(d->pointer);
636 
637  int32_t nof_values =
638  d->length / (ti_lite->is_geoint() ? 2 * sizeof(int32_t) : 2 * sizeof(double));
639  {
640  auto lock_scope =
641  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
642  : std::unique_lock<std::mutex>(*write_mutex));
643  if constexpr (NDIM == 1) {
644  status = m.setItem<0, false>(index, d->pointer, nof_values);
645  } else if constexpr (NDIM == 2) {
646  VarlenDatum* r = nullptr;
647  if constexpr (is_multi) {
648  r = s->linestring_sizes_data.get();
649  } else {
650  r = s->ring_sizes_data.get();
651  }
652  status = m.setItem<1, /*check_sizes=*/false>(
653  index,
654  d->pointer,
655  nof_values,
656  reinterpret_cast<const int32_t*>(r->pointer),
657  r->length / sizeof(int32_t));
658  } else if constexpr (NDIM == 3) {
659  const VarlenDatum* r = s->ring_sizes_data.get();
660  const VarlenDatum* p = s->poly_rings_data.get();
661  status = m.setItem<2, /*check_sizes=*/false>(
662  index,
663  d->pointer,
664  nof_values,
665  reinterpret_cast<const int32_t*>(r->pointer),
666  r->length / sizeof(int32_t),
667  reinterpret_cast<const int32_t*>(p->pointer),
668  p->length / sizeof(int32_t));
669  } else {
670  UNREACHABLE();
671  }
672  }
673  }
674  } else if (const auto tv = boost::get<GeoTargetValue>(&col_val)) {
675  if (tv->get_ptr() == nullptr) {
676  auto lock_scope =
677  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
678  : std::unique_lock<std::mutex>(*write_mutex));
679  status = m.setNull(index);
680  } else {
681  const auto s = boost::get<GeoTypeTargetValue>(tv->get());
682  const std::vector<double>* d = s.coords.get();
683  const std::vector<int32_t>* r = nullptr;
684  const std::vector<int32_t>* p = nullptr;
685  if constexpr (NDIM == 1) {
686  CHECK(r == nullptr);
687  CHECK(p == nullptr);
688  } else if constexpr (NDIM == 2) {
689  if constexpr (is_multi) {
690  r = s.linestring_sizes.get();
691  } else {
692  r = s.ring_sizes.get();
693  }
694  CHECK(p == nullptr);
695  } else if constexpr (NDIM == 3) {
696  r = s.ring_sizes.get();
697  p = s.poly_rings.get();
698  } else {
699  UNREACHABLE();
700  }
701  CHECK(d);
702  CHECK_NE(d->size(), 0);
703  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(*d, ti);
704  {
705  auto lock_scope =
706  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
707  : std::unique_lock<std::mutex>(*write_mutex));
708  if constexpr (NDIM == 1) {
709  status = m.setItem(index, compressed_coords);
710  } else if constexpr (NDIM == 2) {
711  status = m.setItem(index, compressed_coords, *r);
712  } else if constexpr (NDIM == 3) {
713  status = m.setItem(index, compressed_coords, *r, *p);
714  } else {
715  UNREACHABLE();
716  }
717  }
718  }
719  } else {
720  UNREACHABLE();
721  }
722  CHECK_EQ(status, FlatBufferManager::Status::Success);
723 }
724 
725 template <typename scalar_type, typename value_type>
727  const size_t row_idx,
728  const TargetValue& col_val,
729  std::mutex* write_mutex) {
730  FlatBufferManager::Status status{};
731  const auto arr_tv = boost::get<ArrayTargetValue>(&col_val);
732  if (arr_tv->is_initialized()) {
733  const auto& vec = arr_tv->get();
734  // add a new item to flatbuffer, no initialization
735  {
736  auto lock_scope =
737  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
738  : std::unique_lock<std::mutex>(*write_mutex));
739  status = m.setItem<1, false>(row_idx, nullptr, vec.size());
740  }
741  CHECK_EQ(status, FlatBufferManager::Status::Success);
743  // retrieve the item
744  status = m.getItem(row_idx, item);
745  CHECK_EQ(status, FlatBufferManager::Status::Success);
746  CHECK_EQ(item.nof_sizes, 0); // for sanity
747  CHECK_EQ(item.nof_values, vec.size()); // for sanity
748  // initialize the item's buffer
749  scalar_type* values = reinterpret_cast<scalar_type*>(item.values);
750  size_t index = 0;
751  for (const TargetValue val : vec) {
752  const auto& scalar_val = boost::get<ScalarTargetValue>(&val);
753  values[index++] = static_cast<scalar_type>(*boost::get<value_type>(scalar_val));
754  }
755  } else {
756  // add a new NULL item to flatbuffer
757  {
758  auto lock_scope =
759  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
760  : std::unique_lock<std::mutex>(*write_mutex));
761  status = m.setNull(row_idx);
762  }
763  CHECK_EQ(status, FlatBufferManager::Status::Success);
764  }
765 }
766 
768  const size_t row_idx,
769  const TargetValue& col_val,
770  std::mutex* write_mutex) {
771  FlatBufferManager::Status status{};
772  if (const auto tv = boost::get<ScalarTargetValue>(&col_val)) {
773  const auto ns = boost::get<NullableString>(tv);
774  CHECK(ns);
775  const auto s_ptr = boost::get<std::string>(ns);
776  {
777  auto lock_scope =
778  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
779  : std::unique_lock<std::mutex>(*write_mutex));
780  if (s_ptr) {
781  status = m.setItem(row_idx, *s_ptr);
782  } else {
783  status = m.setNull(row_idx);
784  }
785  }
786  CHECK_EQ(status, FlatBufferManager::Status::Success);
787  } else {
788  UNREACHABLE();
789  }
790 }
791 
793  const size_t row_idx,
794  const SQLTypeInfo& type_info,
795  const TargetValue& col_val,
796  std::mutex* write_mutex) {
797  FlatBufferManager::Status status{};
798  // to be deprecated, this function uses old FlatBuffer API
799  if (const auto tv = boost::get<ScalarTargetValue>(&col_val)) {
800  const auto ns = boost::get<NullableString>(tv);
801  CHECK(ns);
802  const auto s_ptr = boost::get<std::string>(ns);
803  std::vector<double> coords;
804  coords.reserve(2);
805  if (s_ptr == nullptr) {
806  coords.push_back(NULL_ARRAY_DOUBLE);
807  coords.push_back(NULL_ARRAY_DOUBLE);
808  } else {
809  const auto gdal_wkt_pt = Geospatial::GeoPoint(*s_ptr);
810  gdal_wkt_pt.getColumns(coords);
811  CHECK_EQ(coords.size(), 2);
812  }
813  std::vector<std::uint8_t> data = Geospatial::compress_coords(coords, type_info);
814  {
815  auto lock_scope =
816  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
817  : std::unique_lock<std::mutex>(*write_mutex));
818  status = m.setItemOld(
819  row_idx, reinterpret_cast<const int8_t*>(data.data()), data.size());
820  }
821  CHECK_EQ(status, FlatBufferManager::Status::Success);
822  } else if (const auto tv = boost::get<GeoTargetValuePtr>(&col_val)) {
823  const auto s = boost::get<GeoPointTargetValuePtr>(tv);
824  CHECK(s);
825  VarlenDatum* d = s->coords_data.get();
826  CHECK(d);
828  m.getGeoPointMetadata()->is_geoint);
829  {
830  auto lock_scope =
831  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
832  : std::unique_lock<std::mutex>(*write_mutex));
833  status =
834  m.setItemOld(row_idx, reinterpret_cast<const int8_t*>(d->pointer), d->length);
835  }
836  CHECK_EQ(status, FlatBufferManager::Status::Success);
837  } else if (const auto tv = boost::get<GeoTargetValue>(&col_val)) {
838  /*
839  Warning: the following code fails for NULL row values
840  because of the failure to detect the nullness correctly.
841  */
842  const auto s = boost::get<GeoPointTargetValue>(tv->get());
843  const std::vector<double>* d = s.coords.get();
844  CHECK_EQ(d->size(), 2);
845  {
846  auto lock_scope =
847  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
848  : std::unique_lock<std::mutex>(*write_mutex));
849  status = m.setItemOld(
850  row_idx, reinterpret_cast<const int8_t*>(d->data()), m.dtypeSize());
851  }
852  CHECK_EQ(d->size(), 2);
853  CHECK_EQ(status, FlatBufferManager::Status::Success);
854  } else {
855  UNREACHABLE();
856  }
857 }
858 
859 /*
860  * This function processes and decodes its input TargetValue
861  * and write it into its corresponding column buffer's cell (with corresponding
862  * row and column indices)
863  *
864  * NOTE: this is not supposed to be processing varlen types (except
865  * FlatBuffer supported types such as Array, GeoPoint, etc), and they
866  * should be handled differently outside this function. TODO: QE-808.
867  */
868 
869 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
870  const size_t row_idx,
871  const SQLTypeInfo& type_info,
872  int8_t* column_buf,
873  std::mutex* write_mutex) {
874  if (!type_info.usesFlatBuffer()) {
875  toBuffer(col_val, type_info, column_buf + type_info.get_size() * row_idx);
876  return;
877  }
879  FlatBufferManager m{column_buf};
880  if (type_info.is_geometry() && type_info.get_type() == kPOINT) {
881  writeBackCellGeoPoint(m, row_idx, type_info, col_val, write_mutex);
882  return;
883  }
884  const SQLTypeInfoLite* ti_lite =
885  reinterpret_cast<const SQLTypeInfoLite*>(m.get_user_data_buffer());
886  CHECK(ti_lite);
887  if (type_info.is_array()) {
888  if (type_info.get_subtype() == kTEXT &&
889  type_info.get_compression() == kENCODING_NONE) {
890  throw std::runtime_error(
891  "Column<Array<TextEncodedNone>> support not implemented yet (writeBackCell)");
892  }
893  switch (ti_lite->subtype) {
895  writeBackCellArrayScalar<double, double>(m, row_idx, col_val, write_mutex);
896  break;
898  writeBackCellArrayScalar<float, float>(m, row_idx, col_val, write_mutex);
899  break;
902  writeBackCellArrayScalar<int8_t, int64_t>(m, row_idx, col_val, write_mutex);
903  break;
905  writeBackCellArrayScalar<int16_t, int64_t>(m, row_idx, col_val, write_mutex);
906  break;
909  writeBackCellArrayScalar<int32_t, int64_t>(m, row_idx, col_val, write_mutex);
910  break;
912  writeBackCellArrayScalar<int64_t, int64_t>(m, row_idx, col_val, write_mutex);
913  break;
914  default:
915  UNREACHABLE();
916  }
917  } else if (type_info.is_text_encoding_none()) {
918  writeBackCellTextEncodingNone(m, row_idx, col_val, write_mutex);
919  } else if (type_info.is_geometry()) {
920  switch (type_info.get_type()) {
921  case kLINESTRING: {
926  /*is_multi=*/false>(
927  m, row_idx, type_info, col_val, write_mutex);
928  break;
929  }
930  case kPOLYGON: {
935  /*is_multi=*/false>(
936  m, row_idx, type_info, col_val, write_mutex);
937  break;
938  }
939  case kMULTIPOINT: {
944  /*is_multi=*/true>(
945  m, row_idx, type_info, col_val, write_mutex);
946  break;
947  }
948  case kMULTILINESTRING: {
953  /*is_multi=*/true>(
954  m, row_idx, type_info, col_val, write_mutex);
955  break;
956  }
957  case kMULTIPOLYGON: {
962  /*is_true=*/false>(
963  m, row_idx, type_info, col_val, write_mutex);
964  break;
965  }
966  default:
967  UNREACHABLE() << "writeBackCell not implemented for " << type_info.toString();
968  }
969  } else {
970  UNREACHABLE();
971  }
972 }
973 
979 template <typename DATA_TYPE>
981  const size_t input_buffer_entry_idx,
982  const size_t output_buffer_entry_idx,
983  const size_t target_idx,
984  const size_t slot_idx,
985  const ReadFunction& read_from_function) {
986  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
987  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
988  target_types_[target_idx]));
989  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
990  val;
991 }
992 
993 template <>
994 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
995  const size_t input_buffer_entry_idx,
996  const size_t output_buffer_entry_idx,
997  const size_t target_idx,
998  const size_t slot_idx,
999  const ReadFunction& read_from_function) {
1000  const int32_t ival =
1001  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
1002  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
1003  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
1004 }
1005 
1006 template <>
1007 void ColumnarResults::writeBackCellDirect<double>(
1008  const ResultSet& rows,
1009  const size_t input_buffer_entry_idx,
1010  const size_t output_buffer_entry_idx,
1011  const size_t target_idx,
1012  const size_t slot_idx,
1013  const ReadFunction& read_from_function) {
1014  const int64_t ival =
1015  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
1016  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
1017  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
1018 }
1019 
1030  const size_t num_columns) {
1032  switch (rows.getQueryDescriptionType()) {
1034  materializeAllColumnsProjection(rows, num_columns);
1035  break;
1036  }
1038  materializeAllColumnsTableFunction(rows, num_columns);
1039  break;
1040  }
1043  materializeAllColumnsGroupBy(rows, num_columns);
1044  break;
1045  }
1046  default:
1047  UNREACHABLE()
1048  << "Direct columnar conversion for this query type is not supported yet.";
1049  }
1050 }
1051 
1060  const size_t num_columns) {
1061  CHECK(rows.query_mem_desc_.didOutputColumnar());
1063  (rows.query_mem_desc_.getQueryDescriptionType() ==
1065 
1066  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
1067 
1068  // We can directly copy each non-lazy column's content
1069  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
1070 
1071  // Only lazy columns are iterated through first and then materialized
1072  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
1073 }
1074 
1076  const size_t num_columns) {
1077  CHECK(rows.query_mem_desc_.didOutputColumnar());
1079  (rows.query_mem_desc_.getQueryDescriptionType() ==
1081 
1082  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
1083  // Lazy fetching is not currently allowed for table function outputs
1084  for (const auto& col_lazy_fetch_info : lazy_fetch_info) {
1085  CHECK(!col_lazy_fetch_info.is_lazily_fetched);
1086  }
1087  // We can directly copy each non-lazy column's content
1088  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
1089 }
1090 
1091 /*
1092  * For all non-lazy columns, we can directly copy back the results of each column's
1093  * contents from different storages and put them into the corresponding output buffer.
1094  *
1095  * This function is parallelized through assigning each column to a CPU thread.
1096  */
1098  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
1099  const ResultSet& rows,
1100  const size_t num_columns) {
1102  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
1103  // Saman: make sure when this lazy_fetch_info is empty
1104  if (lazy_fetch_info.empty()) {
1105  return true;
1106  } else {
1107  return !lazy_fetch_info[col_idx].is_lazily_fetched;
1108  }
1109  };
1110 
1111  // parallelized by assigning each column to a thread
1112  std::vector<std::future<void>> direct_copy_threads;
1113  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
1114  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
1115  CHECK(!column_buffers_[col_idx]);
1116  // The name of the method implies a copy but this is not a copy!!
1117  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
1118  } else if (is_column_non_lazily_fetched(col_idx)) {
1119  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
1121  if (rows.getColType(col_idx).usesFlatBuffer() &&
1122  target_types_[col_idx].usesFlatBuffer()) {
1123  // If both source and target result sets use FlatBuffer
1124  // layout, creating a columnar result should be using
1125  // zero-copy columnar conversion.
1126  UNREACHABLE();
1127  }
1128  direct_copy_threads.push_back(std::async(
1130  [&rows, this](const size_t column_index) {
1131  size_t column_size = rows.getColumnarBufferSize(column_index);
1132  rows.copyColumnIntoBuffer(
1133  column_index, column_buffers_[column_index], column_size);
1134  },
1135  col_idx));
1136  }
1137  }
1138 
1139  for (auto& child : direct_copy_threads) {
1140  child.wait();
1141  }
1142 }
1143 
1154  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
1155  const ResultSet& rows,
1156  const size_t num_columns) {
1158  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
1160  std::mutex write_mutex;
1161  const auto do_work_just_lazy_columns = [num_columns, &rows, &write_mutex, this](
1162  const size_t row_idx,
1163  const std::vector<bool>& targets_to_skip) {
1164  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
1165  for (size_t i = 0; i < num_columns; ++i) {
1166  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
1167  auto& type_info = target_types_[i];
1168  writeBackCell(crt_row[i], row_idx, type_info, column_buffers_[i], &write_mutex);
1169  }
1170  }
1171  };
1172 
1173  const auto contains_lazy_fetched_column =
1174  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
1175  for (auto& col_info : lazy_fetch_info) {
1176  if (col_info.is_lazily_fetched) {
1177  return true;
1178  }
1179  }
1180  return false;
1181  };
1182 
1183  // parallelized by assigning a chunk of rows to each thread)
1184  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
1185  if (contains_lazy_fetched_column(lazy_fetch_info)) {
1186  const size_t worker_count =
1188  std::vector<std::future<void>> conversion_threads;
1189  std::vector<bool> targets_to_skip;
1190  if (skip_non_lazy_columns) {
1191  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
1192  targets_to_skip.reserve(num_columns);
1193  for (size_t i = 0; i < num_columns; i++) {
1194  // we process lazy columns (i.e., skip non-lazy columns)
1195  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
1196  }
1197  }
1198  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
1199  conversion_threads.push_back(std::async(
1201  [&do_work_just_lazy_columns, &targets_to_skip, this](const size_t start,
1202  const size_t end) {
1204  size_t local_idx = 0;
1205  for (size_t i = start; i < end; ++i, ++local_idx) {
1206  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1207  executor_->checkNonKernelTimeInterrupted())) {
1208  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1209  }
1210  do_work_just_lazy_columns(i, targets_to_skip);
1211  }
1212  } else {
1213  for (size_t i = start; i < end; ++i) {
1214  do_work_just_lazy_columns(i, targets_to_skip);
1215  }
1216  }
1217  },
1218  interval.begin,
1219  interval.end));
1220  }
1221 
1222  try {
1223  for (auto& child : conversion_threads) {
1224  child.wait();
1225  }
1226  } catch (QueryExecutionError& e) {
1227  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
1228  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1229  }
1230  throw e;
1231  } catch (...) {
1232  throw;
1233  }
1234  }
1235 }
1236 
1244  const size_t num_columns) {
1246  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1247  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1248 
1249  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
1250  const size_t entry_count = rows.entryCount();
1251  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
1252 
1253  // step 1: compute total non-empty elements and store a bitmap per thread
1254  std::vector<size_t> non_empty_per_thread(num_threads,
1255  0); // number of non-empty entries per thread
1256 
1257  ColumnBitmap bitmap(size_per_thread, num_threads);
1258 
1260  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
1261 
1262  // step 2: go through the generated bitmap and copy/decode corresponding entries
1263  // into the output buffer
1264  compactAndCopyEntries(rows,
1265  bitmap,
1266  non_empty_per_thread,
1267  num_columns,
1268  entry_count,
1269  num_threads,
1270  size_per_thread);
1271 }
1272 
1278 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
1279  ColumnBitmap& bitmap,
1280  std::vector<size_t>& non_empty_per_thread,
1281  const size_t entry_count,
1282  const size_t num_threads,
1283  const size_t size_per_thread) const {
1285  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1286  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1287  CHECK_EQ(num_threads, non_empty_per_thread.size());
1288  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
1289  const size_t local_idx,
1290  const size_t entry_idx,
1291  const size_t thread_idx) {
1292  if (!rows.isRowAtEmpty(entry_idx)) {
1293  total_non_empty++;
1294  bitmap.set(local_idx, thread_idx, true);
1295  }
1296  };
1297  auto locate_and_count_func =
1298  [&do_work, &non_empty_per_thread, this](
1299  size_t start_index, size_t end_index, size_t thread_idx) {
1300  size_t total_non_empty = 0;
1301  size_t local_idx = 0;
1303  for (size_t entry_idx = start_index; entry_idx < end_index;
1304  entry_idx++, local_idx++) {
1305  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1306  executor_->checkNonKernelTimeInterrupted())) {
1307  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1308  }
1309  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
1310  }
1311  } else {
1312  for (size_t entry_idx = start_index; entry_idx < end_index;
1313  entry_idx++, local_idx++) {
1314  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
1315  }
1316  }
1317  non_empty_per_thread[thread_idx] = total_non_empty;
1318  };
1319 
1320  std::vector<std::future<void>> conversion_threads;
1321  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1322  const size_t start_entry = thread_idx * size_per_thread;
1323  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1324  conversion_threads.push_back(std::async(
1325  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
1326  }
1327 
1328  try {
1329  for (auto& child : conversion_threads) {
1330  child.wait();
1331  }
1332  } catch (QueryExecutionError& e) {
1333  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
1334  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1335  }
1336  throw e;
1337  } catch (...) {
1338  throw;
1339  }
1340 }
1341 
1352  const ResultSet& rows,
1353  const ColumnBitmap& bitmap,
1354  const std::vector<size_t>& non_empty_per_thread,
1355  const size_t num_columns,
1356  const size_t entry_count,
1357  const size_t num_threads,
1358  const size_t size_per_thread) {
1360  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1361  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1362  CHECK_EQ(num_threads, non_empty_per_thread.size());
1363 
1364  // compute the exclusive scan over all non-empty totals
1365  std::vector<size_t> global_offsets(num_threads + 1, 0);
1366  std::partial_sum(non_empty_per_thread.begin(),
1367  non_empty_per_thread.end(),
1368  std::next(global_offsets.begin()));
1369 
1370  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
1371  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
1372  rows.getSupportedSingleSlotTargetBitmap();
1373 
1374  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
1375  // differently and accessed through result set's iterator
1376  if (num_single_slot_targets < num_columns) {
1378  bitmap,
1379  non_empty_per_thread,
1380  global_offsets,
1381  single_slot_targets_to_skip,
1382  slot_idx_per_target_idx,
1383  num_columns,
1384  entry_count,
1385  num_threads,
1386  size_per_thread);
1387  } else {
1389  bitmap,
1390  non_empty_per_thread,
1391  global_offsets,
1392  slot_idx_per_target_idx,
1393  num_columns,
1394  entry_count,
1395  num_threads,
1396  size_per_thread);
1397  }
1398 }
1399 
1407  const ResultSet& rows,
1408  const ColumnBitmap& bitmap,
1409  const std::vector<size_t>& non_empty_per_thread,
1410  const std::vector<size_t>& global_offsets,
1411  const std::vector<bool>& targets_to_skip,
1412  const std::vector<size_t>& slot_idx_per_target_idx,
1413  const size_t num_columns,
1414  const size_t entry_count,
1415  const size_t num_threads,
1416  const size_t size_per_thread) {
1418  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1419  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1420 
1421  const auto [write_functions, read_functions] =
1422  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
1423  CHECK_EQ(write_functions.size(), num_columns);
1424  CHECK_EQ(read_functions.size(), num_columns);
1425  std::mutex write_mutex;
1426  auto do_work = [this,
1427  &bitmap,
1428  &rows,
1429  &slot_idx_per_target_idx,
1430  &global_offsets,
1431  &targets_to_skip,
1432  &num_columns,
1433  &write_mutex,
1434  &write_functions = write_functions,
1435  &read_functions = read_functions](size_t& non_empty_idx,
1436  const size_t total_non_empty,
1437  const size_t local_idx,
1438  size_t& entry_idx,
1439  const size_t thread_idx,
1440  const size_t end_idx) {
1441  if (non_empty_idx >= total_non_empty) {
1442  // all non-empty entries has been written back
1443  entry_idx = end_idx;
1444  }
1445  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
1446  if (bitmap.get(local_idx, thread_idx)) {
1447  // targets that are recovered from the result set iterators:
1448  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
1449  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
1450  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
1451  auto& type_info = target_types_[column_idx];
1452  writeBackCell(crt_row[column_idx],
1453  output_buffer_row_idx,
1454  type_info,
1455  column_buffers_[column_idx],
1456  &write_mutex);
1457  }
1458  }
1459  // targets that are copied directly without any translation/decoding from
1460  // result set
1461  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
1462  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
1463  continue;
1464  }
1465  write_functions[column_idx](rows,
1466  entry_idx,
1467  output_buffer_row_idx,
1468  column_idx,
1469  slot_idx_per_target_idx[column_idx],
1470  read_functions[column_idx]);
1471  }
1472  non_empty_idx++;
1473  }
1474  };
1475 
1476  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
1477  const size_t start_index,
1478  const size_t end_index,
1479  const size_t thread_idx) {
1480  const size_t total_non_empty = non_empty_per_thread[thread_idx];
1481  size_t non_empty_idx = 0;
1482  size_t local_idx = 0;
1484  for (size_t entry_idx = start_index; entry_idx < end_index;
1485  entry_idx++, local_idx++) {
1486  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1487  executor_->checkNonKernelTimeInterrupted())) {
1488  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1489  }
1490  do_work(
1491  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
1492  }
1493  } else {
1494  for (size_t entry_idx = start_index; entry_idx < end_index;
1495  entry_idx++, local_idx++) {
1496  do_work(
1497  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
1498  }
1499  }
1500  };
1501 
1502  std::vector<std::future<void>> compaction_threads;
1503  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1504  const size_t start_entry = thread_idx * size_per_thread;
1505  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1506  compaction_threads.push_back(std::async(
1507  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
1508  }
1509 
1510  try {
1511  for (auto& child : compaction_threads) {
1512  child.wait();
1513  }
1514  } catch (QueryExecutionError& e) {
1515  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
1516  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1517  }
1518  throw e;
1519  } catch (...) {
1520  throw;
1521  }
1522 }
1523 
1531  const ResultSet& rows,
1532  const ColumnBitmap& bitmap,
1533  const std::vector<size_t>& non_empty_per_thread,
1534  const std::vector<size_t>& global_offsets,
1535  const std::vector<size_t>& slot_idx_per_target_idx,
1536  const size_t num_columns,
1537  const size_t entry_count,
1538  const size_t num_threads,
1539  const size_t size_per_thread) {
1541  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1542  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1543 
1544  const auto [write_functions, read_functions] =
1545  initAllConversionFunctions(rows, slot_idx_per_target_idx);
1546  CHECK_EQ(write_functions.size(), num_columns);
1547  CHECK_EQ(read_functions.size(), num_columns);
1548  auto do_work = [&rows,
1549  &bitmap,
1550  &global_offsets,
1551  &num_columns,
1552  &slot_idx_per_target_idx,
1553  &write_functions = write_functions,
1554  &read_functions = read_functions](size_t& entry_idx,
1555  size_t& non_empty_idx,
1556  const size_t total_non_empty,
1557  const size_t local_idx,
1558  const size_t thread_idx,
1559  const size_t end_idx) {
1560  if (non_empty_idx >= total_non_empty) {
1561  // all non-empty entries has been written back
1562  entry_idx = end_idx;
1563  return;
1564  }
1565  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
1566  if (bitmap.get(local_idx, thread_idx)) {
1567  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
1568  write_functions[column_idx](rows,
1569  entry_idx,
1570  output_buffer_row_idx,
1571  column_idx,
1572  slot_idx_per_target_idx[column_idx],
1573  read_functions[column_idx]);
1574  }
1575  non_empty_idx++;
1576  }
1577  };
1578  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
1579  const size_t start_index,
1580  const size_t end_index,
1581  const size_t thread_idx) {
1582  const size_t total_non_empty = non_empty_per_thread[thread_idx];
1583  size_t non_empty_idx = 0;
1584  size_t local_idx = 0;
1586  for (size_t entry_idx = start_index; entry_idx < end_index;
1587  entry_idx++, local_idx++) {
1588  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1589  executor_->checkNonKernelTimeInterrupted())) {
1590  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1591  }
1592  do_work(
1593  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1594  }
1595  } else {
1596  for (size_t entry_idx = start_index; entry_idx < end_index;
1597  entry_idx++, local_idx++) {
1598  do_work(
1599  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1600  }
1601  }
1602  };
1603 
1604  std::vector<std::future<void>> compaction_threads;
1605  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1606  const size_t start_entry = thread_idx * size_per_thread;
1607  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1608  compaction_threads.push_back(std::async(
1609  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
1610  }
1611 
1612  try {
1613  for (auto& child : compaction_threads) {
1614  child.wait();
1615  }
1616  } catch (QueryExecutionError& e) {
1617  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
1618  throw QueryExecutionError(ErrorCode::INTERRUPTED);
1619  }
1620  throw e;
1621  } catch (...) {
1622  throw;
1623  }
1624 }
1625 
1631 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
1632  const ResultSet& rows,
1633  const std::vector<bool>& targets_to_skip) {
1635  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1636  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1637 
1638  std::vector<WriteFunction> result;
1639  result.reserve(target_types_.size());
1640 
1641  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1642  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1643  result.emplace_back([](const ResultSet& rows,
1644  const size_t input_buffer_entry_idx,
1645  const size_t output_buffer_entry_idx,
1646  const size_t target_idx,
1647  const size_t slot_idx,
1648  const ReadFunction& read_function) {
1649  UNREACHABLE() << "Invalid write back function used.";
1650  });
1651  continue;
1652  }
1653 
1654  if (target_types_[target_idx].is_fp()) {
1655  switch (target_types_[target_idx].get_size()) {
1656  case 8:
1657  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
1658  this,
1659  std::placeholders::_1,
1660  std::placeholders::_2,
1661  std::placeholders::_3,
1662  std::placeholders::_4,
1663  std::placeholders::_5,
1664  std::placeholders::_6));
1665  break;
1666  case 4:
1667  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
1668  this,
1669  std::placeholders::_1,
1670  std::placeholders::_2,
1671  std::placeholders::_3,
1672  std::placeholders::_4,
1673  std::placeholders::_5,
1674  std::placeholders::_6));
1675  break;
1676  default:
1677  UNREACHABLE() << "Invalid target type encountered.";
1678  break;
1679  }
1680  } else {
1681  switch (target_types_[target_idx].get_size()) {
1682  case 8:
1683  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
1684  this,
1685  std::placeholders::_1,
1686  std::placeholders::_2,
1687  std::placeholders::_3,
1688  std::placeholders::_4,
1689  std::placeholders::_5,
1690  std::placeholders::_6));
1691  break;
1692  case 4:
1693  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
1694  this,
1695  std::placeholders::_1,
1696  std::placeholders::_2,
1697  std::placeholders::_3,
1698  std::placeholders::_4,
1699  std::placeholders::_5,
1700  std::placeholders::_6));
1701  break;
1702  case 2:
1703  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
1704  this,
1705  std::placeholders::_1,
1706  std::placeholders::_2,
1707  std::placeholders::_3,
1708  std::placeholders::_4,
1709  std::placeholders::_5,
1710  std::placeholders::_6));
1711  break;
1712  case 1:
1713  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
1714  this,
1715  std::placeholders::_1,
1716  std::placeholders::_2,
1717  std::placeholders::_3,
1718  std::placeholders::_4,
1719  std::placeholders::_5,
1720  std::placeholders::_6));
1721  break;
1722  default:
1723  UNREACHABLE() << "Invalid target type encountered.";
1724  break;
1725  }
1726  }
1727  }
1728  return result;
1729 }
1730 
1731 namespace {
1732 
1733 int64_t invalid_read_func(const ResultSet& rows,
1734  const size_t input_buffer_entry_idx,
1735  const size_t target_idx,
1736  const size_t slot_idx) {
1737  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
1738  return static_cast<int64_t>(0);
1739 }
1740 
1741 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1742 int64_t read_float_key_baseline(const ResultSet& rows,
1743  const size_t input_buffer_entry_idx,
1744  const size_t target_idx,
1745  const size_t slot_idx) {
1746  // float keys in baseline hash are written as doubles in the buffer, so
1747  // the result should properly be casted before being written in the output
1748  // columns
1749  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1750  input_buffer_entry_idx, target_idx, slot_idx));
1751  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1752 }
1753 
1754 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1755 int64_t read_int64_func(const ResultSet& rows,
1756  const size_t input_buffer_entry_idx,
1757  const size_t target_idx,
1758  const size_t slot_idx) {
1759  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1760  input_buffer_entry_idx, target_idx, slot_idx);
1761 }
1762 
1763 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1764 int64_t read_int32_func(const ResultSet& rows,
1765  const size_t input_buffer_entry_idx,
1766  const size_t target_idx,
1767  const size_t slot_idx) {
1768  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1769  input_buffer_entry_idx, target_idx, slot_idx);
1770 }
1771 
1772 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1773 int64_t read_int16_func(const ResultSet& rows,
1774  const size_t input_buffer_entry_idx,
1775  const size_t target_idx,
1776  const size_t slot_idx) {
1777  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1778  input_buffer_entry_idx, target_idx, slot_idx);
1779 }
1780 
1781 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1782 int64_t read_int8_func(const ResultSet& rows,
1783  const size_t input_buffer_entry_idx,
1784  const size_t target_idx,
1785  const size_t slot_idx) {
1786  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1787  input_buffer_entry_idx, target_idx, slot_idx);
1788 }
1789 
1790 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1791 int64_t read_float_func(const ResultSet& rows,
1792  const size_t input_buffer_entry_idx,
1793  const size_t target_idx,
1794  const size_t slot_idx) {
1795  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
1796  input_buffer_entry_idx, target_idx, slot_idx);
1797  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1798 }
1799 
1800 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1801 int64_t read_double_func(const ResultSet& rows,
1802  const size_t input_buffer_entry_idx,
1803  const size_t target_idx,
1804  const size_t slot_idx) {
1805  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1806  input_buffer_entry_idx, target_idx, slot_idx);
1807  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
1808 }
1809 
1810 } // namespace
1811 
1818 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1819 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
1820  const ResultSet& rows,
1821  const std::vector<size_t>& slot_idx_per_target_idx,
1822  const std::vector<bool>& targets_to_skip) {
1824  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1825  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1826 
1827  std::vector<ReadFunction> read_functions;
1828  read_functions.reserve(target_types_.size());
1829 
1830  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1831  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1832  // for targets that should be skipped, we use a placeholder function that should
1833  // never be called. The CHECKs inside it make sure that never happens.
1834  read_functions.emplace_back(invalid_read_func);
1835  continue;
1836  }
1837 
1838  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1839  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1840  // for key columns only
1841  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1842  if (target_types_[target_idx].is_fp()) {
1843  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1844  switch (target_types_[target_idx].get_type()) {
1845  case kFLOAT:
1846  read_functions.emplace_back(
1847  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1848  break;
1849  case kDOUBLE:
1850  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1851  break;
1852  default:
1853  UNREACHABLE()
1854  << "Invalid data type encountered (BaselineHash, floating point key).";
1855  break;
1856  }
1857  } else {
1858  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1859  case 8:
1860  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1861  break;
1862  case 4:
1863  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1864  break;
1865  default:
1866  UNREACHABLE()
1867  << "Invalid data type encountered (BaselineHash, integer key).";
1868  }
1869  }
1870  continue;
1871  }
1872  }
1873  if (target_types_[target_idx].is_fp()) {
1874  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1875  case 8:
1876  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1877  break;
1878  case 4:
1879  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1880  break;
1881  default:
1882  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1883  break;
1884  }
1885  } else {
1886  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1887  case 8:
1888  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1889  break;
1890  case 4:
1891  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1892  break;
1893  case 2:
1894  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1895  break;
1896  case 1:
1897  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1898  break;
1899  default:
1900  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1901  break;
1902  }
1903  }
1904  }
1905  return read_functions;
1906 }
1907 
1915 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1916  std::vector<ColumnarResults::ReadFunction>>
1918  const ResultSet& rows,
1919  const std::vector<size_t>& slot_idx_per_target_idx,
1920  const std::vector<bool>& targets_to_skip) {
1922  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1923  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1924 
1925  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1926  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1927  if (rows.didOutputColumnar()) {
1928  return std::make_tuple(
1929  std::move(write_functions),
1930  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1931  rows, slot_idx_per_target_idx, targets_to_skip));
1932  } else {
1933  return std::make_tuple(
1934  std::move(write_functions),
1935  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1936  rows, slot_idx_per_target_idx, targets_to_skip));
1937  }
1938  } else {
1939  if (rows.didOutputColumnar()) {
1940  return std::make_tuple(
1941  std::move(write_functions),
1942  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1943  rows, slot_idx_per_target_idx, targets_to_skip));
1944  } else {
1945  return std::make_tuple(
1946  std::move(write_functions),
1947  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1948  rows, slot_idx_per_target_idx, targets_to_skip));
1949  }
1950  }
1951 }
bool is_geoint() const
Definition: sqltypes_lite.h:61
GroupByPerfectHash
Definition: enums.h:58
bool isParallelConversion() const
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int8_t * > column_buffers_
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
HOST DEVICE int64_t dtypeSize() const
Definition: FlatBuffer.h:628
void writeBackCellGeoPoint(FlatBufferManager &m, const size_t row_idx, const SQLTypeInfo &type_info, const TargetValue &col_val, std::mutex *write_mutex)
void materializeAllColumnsTableFunction(const ResultSet &rows, const size_t num_columns)
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
void writeBackCellTextEncodingNone(FlatBufferManager &m, const size_t row_idx, const TargetValue &col_val, std::mutex *write_mutex)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
SQLTypes subtype
Definition: sqltypes_lite.h:54
void writeBackCellGeoNestedArray(FlatBufferManager &m, const int64_t index, const SQLTypeInfo &ti, const TargetValue &col_val, std::mutex *write_mutex)
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
bool is_fp() const
Definition: sqltypes.h:573
std::vector< size_t > get_padded_target_sizes(const ResultSet &rows, const std::vector< SQLTypeInfo > &target_types)
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:338
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1472
int64_t computeTotalNofValuesForColumnTextEncodingNone(const ResultSet &rows, const size_t column_idx)
void initializeFlatBuffer(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1993
Projection
Definition: enums.h:58
Constants for Builtin SQL Types supported by HEAVY.AI.
void set(const size_t index, const size_t bank_index, const bool val)
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
void writeBackCellArrayScalar(FlatBufferManager &m, const size_t row_idx, const TargetValue &col_val, std::mutex *write_mutex)
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
int8_t * pointer
Definition: Datum.h:58
HOST DEVICE Status getItem(const int64_t index, NestedArrayItem< NDIM > &result)
Definition: FlatBuffer.h:1349
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1600
TableFunction
Definition: enums.h:58
tuple rows
Definition: report.py:114
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
future< Result > async(Fn &&fn, Args &&...args)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
#define CHECK_NE(x, y)
Definition: Logger.h:302
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
executor_(executor)
bool hasErrorCode(ErrorCode const ec) const
Definition: ErrorHandling.h:65
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
int64_t getFlatBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1841
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
HOST DEVICE Status setItem(const int64_t index, const int8_t *values_buf, const int32_t nof_values)
Definition: FlatBuffer.h:1593
bool usesFlatBuffer() const
Definition: sqltypes.h:1083
std::string toString() const
Definition: sqltypes.h:525
Status setItemOld(const int64_t index, const int8_t *src, const int64_t size, int8_t **dest=nullptr)
Definition: FlatBuffer.h:1963
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
int64_t computeTotalNofValuesForColumnArray(const ResultSet &rows, const size_t column_idx)
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
int64_t computeTotalNofValuesForColumnGeoType(const ResultSet &rows, const SQLTypeInfo &ti, const size_t column_idx)
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
Definition: sqltypes.h:79
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
bool get(const size_t index, const size_t bank_index) const
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const SQLTypeInfo &type_info, int8_t *column_buf, std::mutex *write_mutex=nullptr)
int64_t toBuffer(const TargetValue &col_val, const SQLTypeInfo &type_info, int8_t *buf)
std::shared_ptr< Executor > executor_
GroupByBaselineHash
Definition: enums.h:58
#define NULL_ARRAY_DOUBLE
std::vector< size_t > padded_target_sizes_
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:597
#define DEBUG_TIMER(name)
Definition: Logger.h:412
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:195
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
bool is_text_encoding_none() const
Definition: sqltypes.h:614
HOST static DEVICE bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:528
int cpu_threads()
Definition: thread_count.h:25
const std::vector< SQLTypeInfo > target_types_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:977
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
bool is_array() const
Definition: sqltypes.h:585
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)
HOST DEVICE Status setNull(int64_t index)
Definition: FlatBuffer.h:2029
size_t length
Definition: Datum.h:57