OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Analyzer/Analyzer.h"
20 #include "Logger/Logger.h"
27 #include "Shared/funcannotations.h"
28 
29 namespace {
30 
32  const SQLTypeInfo& ti,
33  int8_t* literal_buffer,
34  int64_t offset) {
35  if (ti.is_fp()) {
36  switch (get_bit_width(ti)) {
37  case 32:
38  std::memcpy(literal_buffer + offset, &d.floatval, sizeof(float));
39  break;
40  case 64:
41  std::memcpy(literal_buffer + offset, &d.doubleval, sizeof(double));
42  break;
43  default:
44  UNREACHABLE();
45  }
46  } else if (ti.is_integer() || ti.is_timestamp() || ti.is_timeinterval()) {
47  switch (get_bit_width(ti)) {
48  case 8:
49  std::memcpy(literal_buffer + offset, &d.tinyintval, sizeof(int8_t));
50  break;
51  case 16:
52  std::memcpy(literal_buffer + offset, &d.smallintval, sizeof(int16_t));
53  break;
54  case 32:
55  std::memcpy(literal_buffer + offset, &d.intval, sizeof(int32_t));
56  break;
57  case 64:
58  std::memcpy(literal_buffer + offset, &d.bigintval, sizeof(int64_t));
59  break;
60  default:
61  UNREACHABLE();
62  }
63  } else if (ti.is_boolean()) {
64  std::memcpy(literal_buffer + offset, &d.boolval, sizeof(int8_t));
65  } else if (ti.is_text_encoding_none()) {
66  auto string_size = d.stringval->size();
67  std::memcpy(literal_buffer + offset, &string_size, sizeof(int64_t));
68  std::memcpy(
69  literal_buffer + offset + sizeof(int64_t), d.stringval->data(), string_size);
70  } else {
71  throw TableFunctionError("Literal value " + DatumToString(d, ti) +
72  " is not yet supported.");
73  }
74 }
75 
77  size_t input_element_count) {
78  size_t allocated_output_row_count = 0;
79  switch (exe_unit.table_func.getOutputRowSizeType()) {
83  allocated_output_row_count = exe_unit.output_buffer_size_param;
84  break;
85  }
87  allocated_output_row_count =
88  exe_unit.output_buffer_size_param * input_element_count;
89  break;
90  }
92  allocated_output_row_count = input_element_count;
93  break;
94  }
95  default: {
96  UNREACHABLE();
97  }
98  }
99  return allocated_output_row_count;
100 }
101 
102 } // namespace
103 
105  const TableFunctionExecutionUnit& exe_unit,
106  const std::vector<InputTableInfo>& table_infos,
107  const std::shared_ptr<CompilationContext>& compilation_context,
108  const ColumnFetcher& column_fetcher,
109  const ExecutorDeviceType device_type,
110  Executor* executor,
111  bool is_pre_launch_udtf) {
112  auto timer = DEBUG_TIMER(__func__);
113  CHECK(compilation_context);
114  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
115  std::vector<std::unique_ptr<char[]>> literals_owner;
116 
117  const int device_id = 0; // TODO(adb): support multi-gpu table functions
118  std::unique_ptr<CudaAllocator> device_allocator;
119  if (device_type == ExecutorDeviceType::GPU) {
120  auto data_mgr = executor->getDataMgr();
121  device_allocator.reset(new CudaAllocator(
122  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
123  }
124  std::vector<const int8_t*> col_buf_ptrs;
125  std::vector<int64_t> col_sizes;
126  std::vector<const int8_t*> input_str_dict_proxy_ptrs;
127  std::optional<size_t> input_num_rows;
128 
129  int col_index = -1;
130  // TODO: col_list_bufs are allocated on CPU memory, so UDTFs with column_list
131  // arguments are not supported on GPU atm.
132  std::vector<std::vector<const int8_t*>> col_list_bufs;
133  std::vector<std::vector<const int8_t*>> input_col_list_str_dict_proxy_ptrs;
134 
135  for (const auto& input_expr : exe_unit.input_exprs) {
136  auto ti = input_expr->get_type_info();
137  if (!ti.is_column_list()) {
138  CHECK_EQ(col_index, -1);
139  }
140  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
141  CHECK(ti.is_column_list() || ti.is_column()) << "ti=" << ti;
142  const auto& table_key = col_var->getTableKey();
143  auto table_info_it = std::find_if(
144  table_infos.begin(), table_infos.end(), [&table_key](const auto& table_info) {
145  return table_info.table_key == table_key;
146  });
147  CHECK(table_info_it != table_infos.end());
148  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
149  executor,
150  *col_var,
151  table_info_it->info.fragments.front(),
154  device_id,
155  device_allocator.get(),
156  /*thread_idx=*/0,
157  chunks_owner,
158  column_fetcher.columnarized_table_cache_);
159  // We use the number of entries in the first column to be the number of rows to base
160  // the output off of (optionally depending on the sizing parameter)
161  if (!input_num_rows) {
162  input_num_rows = (buf_elem_count > 0 ? buf_elem_count : 1);
163  }
164 
165  int8_t* input_str_dict_proxy_ptr = nullptr;
166  if (ti.is_subtype_dict_encoded_string()) {
167  const auto input_string_dictionary_proxy = executor->getStringDictionaryProxy(
168  ti.getStringDictKey(), executor->getRowSetMemoryOwner(), true);
169  input_str_dict_proxy_ptr =
170  reinterpret_cast<int8_t*>(input_string_dictionary_proxy);
171  }
172  if (ti.is_column_list()) {
173  if (col_index == -1) {
174  col_list_bufs.emplace_back();
175  input_col_list_str_dict_proxy_ptrs.emplace_back();
176  col_list_bufs.back().reserve(ti.get_dimension());
177  input_col_list_str_dict_proxy_ptrs.back().reserve(ti.get_dimension());
178  } else {
179  CHECK_EQ(col_sizes.back(), buf_elem_count);
180  }
181  col_index++;
182  col_list_bufs.back().push_back(col_buf);
183  input_col_list_str_dict_proxy_ptrs.back().push_back(input_str_dict_proxy_ptr);
184  // append col_buf to column_list col_buf
185  if (col_index + 1 == ti.get_dimension()) {
186  col_index = -1;
187  }
188  // columns in the same column_list point to column_list data
189  col_buf_ptrs.push_back((const int8_t*)col_list_bufs.back().data());
190  input_str_dict_proxy_ptrs.push_back(
191  (const int8_t*)input_col_list_str_dict_proxy_ptrs.back().data());
192  } else {
193  col_buf_ptrs.push_back(col_buf);
194  input_str_dict_proxy_ptrs.push_back(input_str_dict_proxy_ptr);
195  }
196  col_sizes.push_back(buf_elem_count);
197  } else {
198  // literals
199  col_sizes.push_back(0);
200  input_str_dict_proxy_ptrs.push_back(nullptr);
201  size_t literal_buffer_size = 0;
202  int8_t* cpu_literal_buf_ptr = nullptr;
203 
204  if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
205  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as
206  // a separate serialization component
207  const auto const_val_datum = constant_val->get_constval();
208  const auto& ti = constant_val->get_type_info();
209  if (ti.is_text_encoding_none()) {
210  // clang-format off
211  /*
212  Literal string is encoded in a contiguous buffer with the
213  following memory layout:
214 
215  | <string size> | <string data> |
216  |<-- 8 bytes -->|<-- <string size> -->|
217  */
218  // clang-format on
219  literal_buffer_size =
220  sizeof(int64_t) + ((const_val_datum.stringval->size() + 7) / 8) * 8;
221  } else {
222  literal_buffer_size = ((get_bit_width(ti) / 8 + 7) / 8) * 8;
223  }
224  // literal_buffer_size is round up to the next multiple of 8
225  literals_owner.emplace_back(std::make_unique<char[]>(literal_buffer_size));
226  cpu_literal_buf_ptr = reinterpret_cast<int8_t*>(literals_owner.back().get());
227  append_literal_buffer(const_val_datum, ti, cpu_literal_buf_ptr, 0);
228  } else if (const auto& array_expr =
229  dynamic_cast<Analyzer::ArrayExpr*>(input_expr)) {
230  const auto& ti = input_expr->get_type_info().get_elem_type();
231  // clang-format off
232  /*
233  Literal array expression is encoded in a contiguous buffer
234  with the following memory layout:
235 
236  | <array size> | <array is_null> | <array data> |
237  |<-- 8 bytes ->|<-- 8 bytes ---->|<-- <array size> * <array element size> -->|
238  */
239  // clang-format on
240  int64_t size = array_expr->getElementCount();
241  int64_t is_null = (array_expr->isNull() ? 0xffffffffffffffff : 0);
242  const auto elem_size = get_bit_width(ti) / 8;
243  // literal_buffer_size is round up to the next multiple of 8
244  literal_buffer_size = 2 * sizeof(int64_t) + (((size + 7) / 8) * 8) * elem_size;
245  literals_owner.emplace_back(std::make_unique<char[]>(literal_buffer_size));
246  cpu_literal_buf_ptr = reinterpret_cast<int8_t*>(literals_owner.back().get());
247  std::memcpy(cpu_literal_buf_ptr, &size, sizeof(int64_t));
248  std::memcpy(cpu_literal_buf_ptr + sizeof(int64_t), &is_null, sizeof(int64_t));
249  for (int64_t i = 0; i < size; i++) {
250  if (const auto& constant_val =
251  dynamic_cast<const Analyzer::Constant*>(array_expr->getElement(i))) {
252  const auto const_val_datum = constant_val->get_constval();
253  append_literal_buffer(const_val_datum,
254  ti,
255  cpu_literal_buf_ptr,
256  sizeof(int64_t) * 2 + i * elem_size);
257  } else {
258  UNREACHABLE();
259  }
260  }
261  } else {
262  throw TableFunctionError("Unsupported expression as input to table function: " +
263  input_expr->toString() +
264  "\n Only literal constants and columns are supported!");
265  }
266  if (device_type == ExecutorDeviceType::GPU) {
267  auto* gpu_allocator = device_allocator.get();
268  const auto gpu_literal_buf_ptr = gpu_allocator->alloc(literal_buffer_size);
269  gpu_allocator->copyToDevice(
270  gpu_literal_buf_ptr, cpu_literal_buf_ptr, literal_buffer_size);
271  col_buf_ptrs.push_back(gpu_literal_buf_ptr);
272  } else {
273  CHECK_EQ(device_type, ExecutorDeviceType::CPU);
274  col_buf_ptrs.push_back(cpu_literal_buf_ptr);
275  }
276  }
277  }
278  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
279  CHECK_EQ(col_sizes.size(), exe_unit.input_exprs.size());
280  if (!exe_unit.table_func
281  .hasOutputSizeIndependentOfInputSize()) { // includes compile-time constants,
282  // user-specified constants,
283  // and runtime table funtion
284  // specified sizing, only
285  // user-specified row-multipliers
286  // currently take into account input
287  // row size
288  CHECK(input_num_rows);
289  }
290  std::vector<int8_t*> output_str_dict_proxy_ptrs;
291  for (const auto& output_expr : exe_unit.target_exprs) {
292  int8_t* output_str_dict_proxy_ptr = nullptr;
293  auto ti = output_expr->get_type_info();
294  if (ti.is_dict_encoded_string()) {
295  const auto output_string_dictionary_proxy = executor->getStringDictionaryProxy(
296  ti.getStringDictKey(), executor->getRowSetMemoryOwner(), true);
297  output_str_dict_proxy_ptr =
298  reinterpret_cast<int8_t*>(output_string_dictionary_proxy);
299  }
300  output_str_dict_proxy_ptrs.emplace_back(output_str_dict_proxy_ptr);
301  }
302 
303  if (is_pre_launch_udtf) {
306  exe_unit,
307  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
308  col_buf_ptrs,
309  col_sizes,
310  input_str_dict_proxy_ptrs,
311  *input_num_rows,
312  executor);
313  return nullptr;
314  } else {
315  switch (device_type) {
317  return launchCpuCode(
318  exe_unit,
319  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
320  col_buf_ptrs,
321  col_sizes,
322  input_str_dict_proxy_ptrs,
323  *input_num_rows,
324  output_str_dict_proxy_ptrs,
325  executor);
327  return launchGpuCode(
328  exe_unit,
329  std::dynamic_pointer_cast<GpuCompilationContext>(compilation_context),
330  col_buf_ptrs,
331  col_sizes,
332  input_str_dict_proxy_ptrs,
333  *input_num_rows,
334  output_str_dict_proxy_ptrs,
335  /*device_id=*/0,
336  executor);
337  }
338  }
339  UNREACHABLE();
340  return nullptr;
341 }
342 
344 
346  const TableFunctionExecutionUnit& exe_unit,
347  const std::shared_ptr<CpuCompilationContext>& compilation_context,
348  std::vector<const int8_t*>& col_buf_ptrs,
349  std::vector<int64_t>& col_sizes,
350  std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
351  const size_t elem_count, // taken from first source only currently
352  Executor* executor) {
353  auto timer = DEBUG_TIMER(__func__);
354  int64_t output_row_count = 0;
355 
356  // If TableFunctionManager must be a singleton but it has been
357  // initialized from another thread, TableFunctionManager constructor
358  // blocks via TableFunctionManager_singleton_mutex until the
359  // existing singleton is deconstructed.
360  auto mgr = std::make_unique<TableFunctionManager>(
361  exe_unit,
362  executor,
363  col_buf_ptrs,
365  /*is_singleton=*/!exe_unit.table_func.usesManager());
366 
367  // setup the inputs
368  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
369  const auto byte_stream_ptr = !col_buf_ptrs.empty()
370  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
371  : nullptr;
372  if (!col_buf_ptrs.empty()) {
373  CHECK(byte_stream_ptr);
374  }
375  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
376  if (!col_sizes.empty()) {
377  CHECK(col_sizes_ptr);
378  }
379  const auto input_str_dict_proxy_byte_stream_ptr =
380  !input_str_dict_proxy_ptrs.empty()
381  ? reinterpret_cast<const int8_t**>(input_str_dict_proxy_ptrs.data())
382  : nullptr;
383 
384  // execute
385  const auto err = compilation_context->table_function_entry_point()(
386  reinterpret_cast<const int8_t*>(mgr.get()),
387  byte_stream_ptr, // input columns buffer
388  col_sizes_ptr, // input column sizes
389  input_str_dict_proxy_byte_stream_ptr, // input string dictionary proxy ptrs
390  nullptr,
391  nullptr, // output string dictionary proxy ptrs - not supported for pre-flights yet
392  &output_row_count);
394  // table_function_entry_point does not initialize output_row_count
395  // when a UDTF returns NotAnError, so we'll set it here.
396  output_row_count = mgr->get_nrows();
397  }
398  if (exe_unit.table_func.hasPreFlightOutputSizer()) {
399  exe_unit.output_buffer_size_param = output_row_count;
400  }
401 
403  } else if (err == TableFunctionErrorCode::GenericError) {
404  throw UserTableFunctionError("Error executing table function pre flight check: " +
405  std::string(mgr->get_error_message()));
406  } else if (err) {
407  throw UserTableFunctionError("Error executing table function pre flight check: " +
408  std::to_string(err));
409  }
410 }
411 
412 // clang-format off
413 /*
414  Managing the output buffers from table functions
415  ------------------------------------------------
416 
417  In general, the results of a query (a list of columns) is hold by a
418  ResultSet instance. While ResultSet is a rather complicated
419  structure, its most important members are
420 
421  std::vector<TargetInfo> targets_ that holds the type of output
422  columns (recall: `struct TargetInfo {..., SQLTypeInfo sql_type,
423  ...};`)
424 
425  std::unique_ptr<ResultSetStorage> storage_ that stores the
426  underlying buffer for a result set (recall: `struct
427  ResultSetStorage {..., int8_t* buff_, ...};`)
428 
429  QueryMemoryDescriptor query_mem_desc_ that describes the format of
430  the storage for a result set.
431 
432  QueryMemoryDescriptor structure contains the following relevant
433  members:
434 
435  QueryDescriptionType query_desc_type_ is equal to one of
436  GroupByPerfectHash, GroupByBaselineHash, Projection,
437  TableFunction, NonGroupedAggregate, Estimator. In the following,
438  we assume query_desc_type_ == TableFunction.
439 
440  bool output_columnar_ is always true for table function result
441  sets.
442 
443  size_t entry_count_ is the number of entries in the storage
444  buffer. This typically corresponds to the number of output rows.
445 
446  ColSlotContext col_slot_context_ describes the internal structure
447  of the storage buffer using the following members:
448 
449  std::vector<SlotSize> slot_sizes_ where we have `struct SlotSize
450  { int8_t padded_size; int8_t logical_size; };`
451 
452  std::vector<std::vector<size_t>> col_to_slot_map_ describes the
453  mapping of a column to possibly multiple slots.
454 
455  std::unordered_map<SlotIndex, ArraySize> varlen_output_slot_map_
456 
457  In the case of table function result sets, the QueryMemoryDescriptor
458  instance is created in TableFunctionManager::allocate_output_buffers
459  method and we have query_desc_type_ == TableFunction.
460 
461  Depending on the target info of an output column, the internal
462  structure of the storage buffer has two variants:
463 
464  - traditional where the buffer size of a particular column is
465  described by entry_count_ and
466  col_slot_context_.slot_sizes_. This variant is used for output
467  columns of fixed-width scalar types such as integers, floats,
468  boolean, text encoded dicts, etc. For the corresponding column
469  with col_idx, we have
470 
471  col_to_slot_map_[col_idx] == {slot_idx}
472  slot_sizes_[slot_idx] == {column_width, column_width}
473 
474  where column_width is targets_[col_idx].sql_type.get_size().
475 
476  - flatbuffer where the buffer size of a particular column is
477  described by varlen_output_slot_map_. This variant is used for
478  output columns of variable length composite types such as arrays
479  of ints, floats, etc. For the corresponding column with col_idx,
480  we have
481 
482  col_to_slot_map_[col_idx] == {slot_idx}
483  slot_sizes_[slot_idx] == {0, 0}
484  varlen_output_slot_map_ contains an item col_idx:flatbuffer_size
485 
486  Only table functions produce result sets that may contain both
487  variants. The variants can be distinguished via
488  `getPaddedSlotWidthBytes(slot_idx) == 0` test.
489 
490  In the case of table function result sets, col_idx == slot_idx holds.
491 
492 */
493 // clang-format on
494 
496  const TableFunctionExecutionUnit& exe_unit,
497  const std::shared_ptr<CpuCompilationContext>& compilation_context,
498  std::vector<const int8_t*>& col_buf_ptrs,
499  std::vector<int64_t>& col_sizes,
500  std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
501  const size_t elem_count, // taken from first source only currently
502  std::vector<int8_t*>& output_str_dict_proxy_ptrs,
503  Executor* executor) {
504  auto timer = DEBUG_TIMER(__func__);
505  int64_t output_row_count = 0;
506 
507  // If TableFunctionManager must be a singleton but it has been
508  // initialized from another thread, TableFunctionManager constructor
509  // blocks via TableFunctionManager_singleton_mutex until the
510  // existing singleton is deconstructed.
511  auto mgr = std::make_unique<TableFunctionManager>(
512  exe_unit,
513  executor,
514  col_buf_ptrs,
516  /*is_singleton=*/!exe_unit.table_func.usesManager());
517 
518  if (exe_unit.table_func.hasOutputSizeKnownPreLaunch()) {
519  // allocate output buffers because the size is known up front, from
520  // user specified parameters (and table size in the case of a user
521  // specified row multiplier)
522  output_row_count = get_output_row_count(exe_unit, elem_count);
523  } else if (exe_unit.table_func.hasPreFlightOutputSizer()) {
524  output_row_count = exe_unit.output_buffer_size_param;
525  }
526 
527  // setup the inputs
528  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
529  const auto byte_stream_ptr = !col_buf_ptrs.empty()
530  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
531  : nullptr;
532  if (!col_buf_ptrs.empty()) {
533  CHECK(byte_stream_ptr);
534  }
535  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
536  if (!col_sizes.empty()) {
537  CHECK(col_sizes_ptr);
538  }
539  const auto input_str_dict_proxy_byte_stream_ptr =
540  !input_str_dict_proxy_ptrs.empty()
541  ? reinterpret_cast<const int8_t**>(input_str_dict_proxy_ptrs.data())
542  : nullptr;
543 
544  const auto output_str_dict_proxy_byte_stream_ptr =
545  !output_str_dict_proxy_ptrs.empty()
546  ? reinterpret_cast<int8_t**>(output_str_dict_proxy_ptrs.data())
547  : nullptr;
548 
549  // execute
550  int32_t err;
551  try {
552  err = compilation_context->table_function_entry_point()(
553  reinterpret_cast<const int8_t*>(mgr.get()),
554  byte_stream_ptr, // input columns buffer
555  col_sizes_ptr, // input column sizes
556  input_str_dict_proxy_byte_stream_ptr, // input str dictionary proxies
557  nullptr,
558  output_str_dict_proxy_byte_stream_ptr,
559  &output_row_count);
560  } catch (std::exception const& e) {
561  throw UserTableFunctionError("Error executing table function: " +
562  std::string(e.what()));
563  }
564 
566  // table_function_entry_point does not initialize output_row_count
567  // when a UDTF returns NotAnError, so we'll set it here.
568  output_row_count = mgr->get_nrows();
569  } else if (err == TableFunctionErrorCode::GenericError) {
570  throw UserTableFunctionError("Error executing table function: " +
571  std::string(mgr->get_error_message()));
572  }
573 
574  else if (err) {
575  throw UserTableFunctionError("Error executing table function: " +
576  std::to_string(err));
577  }
578 
579  if (exe_unit.table_func.hasCompileTimeOutputSizeConstant()) {
580  if (static_cast<size_t>(output_row_count) != mgr->get_nrows()) {
581  throw TableFunctionError(
582  "Table function with constant sizing parameter must return " +
583  std::to_string(mgr->get_nrows()) + " (got " + std::to_string(output_row_count) +
584  ")");
585  }
586  } else {
587  if (output_row_count < 0 || (size_t)output_row_count > mgr->get_nrows()) {
588  output_row_count = mgr->get_nrows();
589  }
590  }
591  // Update entry count, it may differ from allocated mem size
592  if (exe_unit.table_func.hasTableFunctionSpecifiedParameter() && !mgr->query_buffers) {
593  // set_output_row_size has not been called
594  if (output_row_count == 0) {
595  // allocate for empty output columns
596  mgr->allocate_output_buffers(0);
597  } else {
598  throw TableFunctionError("Table function must call set_output_row_size");
599  }
600  }
601 
602  mgr->query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
603 
604  auto group_by_buffers_ptr = mgr->query_buffers->getGroupByBuffersPtr();
605  CHECK(group_by_buffers_ptr);
606  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
607 
608  auto num_out_columns = exe_unit.target_exprs.size();
609  int8_t* src = reinterpret_cast<int8_t*>(output_buffers_ptr);
610  int8_t* dst = reinterpret_cast<int8_t*>(output_buffers_ptr);
611  // Todo (todd): Consolidate this column byte offset logic that occurs in at least 4
612  // places
613 
614  for (size_t col_idx = 0; col_idx < num_out_columns; col_idx++) {
615  auto ti = exe_unit.target_exprs[col_idx]->get_type_info();
616  if (ti.usesFlatBuffer()) {
617  // TODO: implement FlatBuffer normalization when the
618  // max_nof_values is larger than the nof specified values.
619  //
620  // TODO: implement flatbuffer resize when output_row_count < mgr->get_nrows()
621  CHECK_EQ(mgr->get_nrows(), output_row_count);
622  FlatBufferManager m{src};
623  const size_t allocated_column_size = m.getBufferSize();
624  const size_t actual_column_size = allocated_column_size;
625  src = align_to_int64(src + allocated_column_size);
626  dst = align_to_int64(dst + actual_column_size);
627  if (ti.is_text_encoding_dict_array()) {
628  const auto* ti_lite =
629  reinterpret_cast<const SQLTypeInfoLite*>(m.get_user_data_buffer());
630  CHECK(ti_lite);
631  CHECK_EQ(*ti_lite, ti.toLite()); // ensure dict/db_id are preserved
632  }
633  } else {
634  const size_t target_width = ti.get_size();
635  const size_t allocated_column_size = target_width * mgr->get_nrows();
636  const size_t actual_column_size = target_width * output_row_count;
637  if (src != dst) {
638  auto t = memmove(dst, src, actual_column_size);
639  CHECK_EQ(dst, t);
640  }
641  src = align_to_int64(src + allocated_column_size);
642  dst = align_to_int64(dst + actual_column_size);
643  }
644  }
645  return mgr->query_buffers->getResultSetOwned(0);
646 }
647 
648 namespace {
649 enum {
659 };
660 }
661 
663  const TableFunctionExecutionUnit& exe_unit,
664  const std::shared_ptr<GpuCompilationContext>& compilation_context,
665  std::vector<const int8_t*>& col_buf_ptrs,
666  std::vector<int64_t>& col_sizes,
667  std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
668  const size_t elem_count,
669  std::vector<int8_t*>& output_str_dict_proxy_ptrs,
670  const int device_id,
671  Executor* executor) {
672 #ifdef HAVE_CUDA
673  auto timer = DEBUG_TIMER(__func__);
675  throw QueryMustRunOnCpu();
676  }
677 
678  auto num_out_columns = exe_unit.target_exprs.size();
679  auto data_mgr = executor->getDataMgr();
680  auto gpu_allocator = std::make_unique<CudaAllocator>(
681  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
682  CHECK(gpu_allocator);
683  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
684 
685  // TODO: implement table function manager for CUDA
686  // kernels. kernel_params[MANAGER] ought to contain a device pointer
687  // to a struct that a table function kernel with a
688  // TableFunctionManager argument can access from the device.
689  kernel_params[MANAGER] =
690  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int8_t*)));
691 
692  // setup the inputs
693  auto byte_stream_ptr = !(col_buf_ptrs.empty())
694  ? gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t))
695  : nullptr;
696  if (byte_stream_ptr) {
697  gpu_allocator->copyToDevice(byte_stream_ptr,
698  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
699  col_buf_ptrs.size() * sizeof(int64_t));
700  }
701  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
702 
703  auto col_sizes_ptr = !(col_sizes.empty())
704  ? gpu_allocator->alloc(col_sizes.size() * sizeof(int64_t))
705  : nullptr;
706  if (col_sizes_ptr) {
707  gpu_allocator->copyToDevice(col_sizes_ptr,
708  reinterpret_cast<int8_t*>(col_sizes.data()),
709  col_sizes.size() * sizeof(int64_t));
710  }
711  kernel_params[COL_SIZES] = reinterpret_cast<CUdeviceptr>(col_sizes_ptr);
712 
713  kernel_params[ERROR_BUFFER] =
714  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
715  // initialize output memory
717  executor, elem_count, QueryDescriptionType::TableFunction);
718 
719  for (size_t i = 0; i < num_out_columns; i++) {
720  const size_t col_width = exe_unit.target_exprs[i]->get_type_info().get_size();
721  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
722  }
723  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
724  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
725  exe_unit,
727  device_id,
729  (allocated_output_row_count == 0 ? 1 : allocated_output_row_count),
730  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
731  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
733  gpu_allocator.get(),
734  executor);
735 
736  // setup the output
737  int64_t output_row_count = allocated_output_row_count;
738 
739  kernel_params[OUTPUT_ROW_COUNT] =
740  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
741  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
742  reinterpret_cast<int8_t*>(&output_row_count),
743  sizeof(output_row_count));
744  /*
745   TODO: RBC generated runtime table functions do not support
746  concurrent execution on a CUDA device. Hence, we'll force 1 as
747  block/grid size in the case of runtime table functions. To support
748  this, in RBC, we'll need to expose threadIdx/blockIdx/blockDim to
749  runtime table functions and these must do something sensible with
750  this information..
751  */
752  const unsigned block_size_x =
753  (exe_unit.table_func.isRuntime() ? 1 : executor->blockSize());
754  const unsigned block_size_y = 1;
755  const unsigned block_size_z = 1;
756  const unsigned grid_size_x =
757  (exe_unit.table_func.isRuntime() ? 1 : executor->gridSize());
758  const unsigned grid_size_y = 1;
759  const unsigned grid_size_z = 1;
760 
761  auto gpu_output_buffers =
762  query_buffers->setupTableFunctionGpuBuffers(query_mem_desc,
763  device_id,
764  block_size_x,
765  grid_size_x,
766  true /* zero_initialize_buffers */);
767 
768  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.ptrs);
769 
770  // execute
771  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
772 
773  std::vector<void*> param_ptrs;
774  for (auto& param : kernel_params) {
775  param_ptrs.push_back(&param);
776  }
777 
778  // Get cu func
779 
780  CHECK(compilation_context);
781  const auto native_code = compilation_context->getNativeCode(device_id);
782  auto cu_func = static_cast<CUfunction>(native_code.first);
783  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
784  VLOG(1) << "Launch GPU table function kernel compiled with the following block and "
785  "grid sizes: "
786  << block_size_x << " and " << grid_size_x;
787  checkCudaErrors(cuLaunchKernel(cu_func,
788  grid_size_x,
789  grid_size_y,
790  grid_size_z,
791  block_size_x,
792  block_size_y,
793  block_size_z,
794  0, // shared mem bytes
795  qe_cuda_stream,
796  &param_ptrs[0],
797  nullptr));
798  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
799 
800  // read output row count from GPU
801  gpu_allocator->copyFromDevice(
802  reinterpret_cast<int8_t*>(&output_row_count),
803  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
804  sizeof(int64_t));
805  if (exe_unit.table_func.hasNonUserSpecifiedOutputSize()) {
806  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
807  throw TableFunctionError(
808  "Table function with constant sizing parameter must return " +
809  std::to_string(allocated_output_row_count) + " (got " +
810  std::to_string(output_row_count) + ")");
811  }
812  } else {
813  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
814  output_row_count = allocated_output_row_count;
815  }
816  }
817 
818  // Update entry count, it may differ from allocated mem size
819  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
820 
821  // Copy back to CPU storage
822  query_buffers->copyFromTableFunctionGpuBuffers(data_mgr,
823  query_mem_desc,
824  output_row_count,
825  gpu_output_buffers,
826  device_id,
827  block_size_x,
828  grid_size_x);
829 
830  return query_buffers->getResultSetOwned(0);
831 #else
832  UNREACHABLE();
833  return nullptr;
834 #endif
835 }
int8_t tinyintval
Definition: Datum.h:73
Defines data structures for the semantic analysis phase of query processing.
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:460
bool is_timestamp() const
Definition: sqltypes.h:1046
std::vector< Analyzer::Expr * > input_exprs
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
bool is_fp() const
Definition: sqltypes.h:573
int8_t boolval
Definition: Datum.h:72
unsigned long long CUdeviceptr
Definition: nocuda.h:28
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< ResultSet > ResultSetPtr
int32_t intval
Definition: Datum.h:75
ExecutorDeviceType
std::string to_string(char const *&&v)
TableFunction
Definition: enums.h:58
float floatval
Definition: Datum.h:77
size_t get_bit_width(const SQLTypeInfo &ti)
CONSTEXPR DEVICE bool is_null(const T &value)
bool is_integer() const
Definition: sqltypes.h:567
int64_t bigintval
Definition: Datum.h:76
bool is_timeinterval() const
Definition: sqltypes.h:594
int16_t smallintval
Definition: Datum.h:74
void * CUfunction
Definition: nocuda.h:25
bool is_boolean() const
Definition: sqltypes.h:582
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::mutex TableFunctionManager_singleton_mutex
std::string * stringval
Definition: Datum.h:81
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
void launchPreCodeOnCpu(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, Executor *executor)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
bool is_text_encoding_none() const
Definition: sqltypes.h:614
Definition: Datum.h:71
OutputBufferSizeType getOutputRowSizeType() const
void append_literal_buffer(const Datum &d, const SQLTypeInfo &ti, int8_t *literal_buffer, int64_t offset)
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)
ColumnCacheMap & columnarized_table_cache_
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
#define VLOG(n)
Definition: Logger.h:388
double doubleval
Definition: Datum.h:78
static int64_t getBufferSize(const void *buffer)
Definition: FlatBuffer.h:553