33 int8_t* literal_buffer,
38 std::memcpy(literal_buffer + offset, &d.
floatval,
sizeof(
float));
41 std::memcpy(literal_buffer + offset, &d.
doubleval,
sizeof(
double));
49 std::memcpy(literal_buffer + offset, &d.
tinyintval,
sizeof(int8_t));
52 std::memcpy(literal_buffer + offset, &d.
smallintval,
sizeof(int16_t));
55 std::memcpy(literal_buffer + offset, &d.
intval,
sizeof(int32_t));
58 std::memcpy(literal_buffer + offset, &d.
bigintval,
sizeof(int64_t));
64 std::memcpy(literal_buffer + offset, &d.
boolval,
sizeof(int8_t));
67 std::memcpy(literal_buffer + offset, &string_size,
sizeof(int64_t));
69 literal_buffer + offset +
sizeof(int64_t), d.
stringval->data(), string_size);
72 " is not yet supported.");
77 size_t input_element_count) {
78 size_t allocated_output_row_count = 0;
87 allocated_output_row_count =
92 allocated_output_row_count = input_element_count;
99 return allocated_output_row_count;
106 const std::vector<InputTableInfo>& table_infos,
107 const std::shared_ptr<CompilationContext>& compilation_context,
111 bool is_pre_launch_udtf) {
113 CHECK(compilation_context);
114 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
115 std::vector<std::unique_ptr<char[]>> literals_owner;
117 const int device_id = 0;
118 std::unique_ptr<CudaAllocator> device_allocator;
120 auto data_mgr = executor->getDataMgr();
124 std::vector<const int8_t*> col_buf_ptrs;
125 std::vector<int64_t> col_sizes;
126 std::vector<const int8_t*> input_str_dict_proxy_ptrs;
127 std::optional<size_t> input_num_rows;
132 std::vector<std::vector<const int8_t*>> col_list_bufs;
133 std::vector<std::vector<const int8_t*>> input_col_list_str_dict_proxy_ptrs;
135 for (
const auto& input_expr : exe_unit.
input_exprs) {
136 auto ti = input_expr->get_type_info();
137 if (!ti.is_column_list()) {
140 if (
auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
141 CHECK(ti.is_column_list() || ti.is_column()) <<
"ti=" << ti;
142 const auto& table_key = col_var->getTableKey();
143 auto table_info_it = std::find_if(
144 table_infos.begin(), table_infos.end(), [&table_key](
const auto& table_info) {
145 return table_info.table_key == table_key;
147 CHECK(table_info_it != table_infos.end());
151 table_info_it->info.fragments.front(),
155 device_allocator.get(),
161 if (!input_num_rows) {
162 input_num_rows = (buf_elem_count > 0 ? buf_elem_count : 1);
165 int8_t* input_str_dict_proxy_ptr =
nullptr;
166 if (ti.is_subtype_dict_encoded_string()) {
167 const auto input_string_dictionary_proxy = executor->getStringDictionaryProxy(
168 ti.getStringDictKey(), executor->getRowSetMemoryOwner(),
true);
169 input_str_dict_proxy_ptr =
170 reinterpret_cast<int8_t*
>(input_string_dictionary_proxy);
172 if (ti.is_column_list()) {
173 if (col_index == -1) {
174 col_list_bufs.emplace_back();
175 input_col_list_str_dict_proxy_ptrs.emplace_back();
176 col_list_bufs.back().reserve(ti.get_dimension());
177 input_col_list_str_dict_proxy_ptrs.back().reserve(ti.get_dimension());
179 CHECK_EQ(col_sizes.back(), buf_elem_count);
182 col_list_bufs.back().push_back(col_buf);
183 input_col_list_str_dict_proxy_ptrs.back().push_back(input_str_dict_proxy_ptr);
185 if (col_index + 1 == ti.get_dimension()) {
189 col_buf_ptrs.push_back((
const int8_t*)col_list_bufs.back().data());
190 input_str_dict_proxy_ptrs.push_back(
191 (
const int8_t*)input_col_list_str_dict_proxy_ptrs.back().data());
193 col_buf_ptrs.push_back(col_buf);
194 input_str_dict_proxy_ptrs.push_back(input_str_dict_proxy_ptr);
196 col_sizes.push_back(buf_elem_count);
199 col_sizes.push_back(0);
200 input_str_dict_proxy_ptrs.push_back(
nullptr);
201 size_t literal_buffer_size = 0;
202 int8_t* cpu_literal_buf_ptr =
nullptr;
204 if (
const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
207 const auto const_val_datum = constant_val->get_constval();
208 const auto& ti = constant_val->get_type_info();
209 if (ti.is_text_encoding_none()) {
219 literal_buffer_size =
220 sizeof(int64_t) + ((const_val_datum.stringval->size() + 7) / 8) * 8;
225 literals_owner.emplace_back(std::make_unique<
char[]>(literal_buffer_size));
226 cpu_literal_buf_ptr =
reinterpret_cast<int8_t*
>(literals_owner.back().get());
228 }
else if (
const auto& array_expr =
229 dynamic_cast<Analyzer::ArrayExpr*>(input_expr)) {
230 const auto& ti = input_expr->get_type_info().get_elem_type();
240 int64_t size = array_expr->getElementCount();
241 int64_t
is_null = (array_expr->isNull() ? 0xffffffffffffffff : 0);
244 literal_buffer_size = 2 *
sizeof(int64_t) + (((size + 7) / 8) * 8) * elem_size;
245 literals_owner.emplace_back(std::make_unique<
char[]>(literal_buffer_size));
246 cpu_literal_buf_ptr =
reinterpret_cast<int8_t*
>(literals_owner.back().get());
247 std::memcpy(cpu_literal_buf_ptr, &size,
sizeof(int64_t));
248 std::memcpy(cpu_literal_buf_ptr +
sizeof(int64_t), &is_null,
sizeof(int64_t));
249 for (int64_t i = 0; i < size; i++) {
250 if (
const auto& constant_val =
251 dynamic_cast<const Analyzer::Constant*>(array_expr->getElement(i))) {
252 const auto const_val_datum = constant_val->get_constval();
256 sizeof(int64_t) * 2 + i * elem_size);
263 input_expr->toString() +
264 "\n Only literal constants and columns are supported!");
267 auto* gpu_allocator = device_allocator.get();
268 const auto gpu_literal_buf_ptr = gpu_allocator->alloc(literal_buffer_size);
269 gpu_allocator->copyToDevice(
270 gpu_literal_buf_ptr, cpu_literal_buf_ptr, literal_buffer_size);
271 col_buf_ptrs.push_back(gpu_literal_buf_ptr);
274 col_buf_ptrs.push_back(cpu_literal_buf_ptr);
288 CHECK(input_num_rows);
290 std::vector<int8_t*> output_str_dict_proxy_ptrs;
292 int8_t* output_str_dict_proxy_ptr =
nullptr;
293 auto ti = output_expr->get_type_info();
294 if (ti.is_dict_encoded_string()) {
295 const auto output_string_dictionary_proxy = executor->getStringDictionaryProxy(
296 ti.getStringDictKey(), executor->getRowSetMemoryOwner(),
true);
297 output_str_dict_proxy_ptr =
298 reinterpret_cast<int8_t*
>(output_string_dictionary_proxy);
300 output_str_dict_proxy_ptrs.emplace_back(output_str_dict_proxy_ptr);
303 if (is_pre_launch_udtf) {
307 std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
310 input_str_dict_proxy_ptrs,
315 switch (device_type) {
319 std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
322 input_str_dict_proxy_ptrs,
324 output_str_dict_proxy_ptrs,
329 std::dynamic_pointer_cast<GpuCompilationContext>(compilation_context),
332 input_str_dict_proxy_ptrs,
334 output_str_dict_proxy_ptrs,
347 const std::shared_ptr<CpuCompilationContext>& compilation_context,
348 std::vector<const int8_t*>& col_buf_ptrs,
349 std::vector<int64_t>& col_sizes,
350 std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
351 const size_t elem_count,
352 Executor* executor) {
354 int64_t output_row_count = 0;
360 auto mgr = std::make_unique<TableFunctionManager>(
365 !exe_unit.table_func.usesManager());
369 const auto byte_stream_ptr = !col_buf_ptrs.empty()
370 ?
reinterpret_cast<const int8_t**
>(col_buf_ptrs.data())
372 if (!col_buf_ptrs.empty()) {
373 CHECK(byte_stream_ptr);
375 const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() :
nullptr;
376 if (!col_sizes.empty()) {
377 CHECK(col_sizes_ptr);
379 const auto input_str_dict_proxy_byte_stream_ptr =
380 !input_str_dict_proxy_ptrs.empty()
381 ?
reinterpret_cast<const int8_t**
>(input_str_dict_proxy_ptrs.data())
385 const auto err = compilation_context->table_function_entry_point()(
386 reinterpret_cast<const int8_t*
>(mgr.get()),
389 input_str_dict_proxy_byte_stream_ptr,
396 output_row_count = mgr->get_nrows();
398 if (exe_unit.table_func.hasPreFlightOutputSizer()) {
399 exe_unit.output_buffer_size_param = output_row_count;
405 std::string(mgr->get_error_message()));
497 const std::shared_ptr<CpuCompilationContext>& compilation_context,
498 std::vector<const int8_t*>& col_buf_ptrs,
499 std::vector<int64_t>& col_sizes,
500 std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
501 const size_t elem_count,
502 std::vector<int8_t*>& output_str_dict_proxy_ptrs,
503 Executor* executor) {
505 int64_t output_row_count = 0;
511 auto mgr = std::make_unique<TableFunctionManager>(
516 !exe_unit.table_func.usesManager());
518 if (exe_unit.table_func.hasOutputSizeKnownPreLaunch()) {
523 }
else if (exe_unit.table_func.hasPreFlightOutputSizer()) {
524 output_row_count = exe_unit.output_buffer_size_param;
529 const auto byte_stream_ptr = !col_buf_ptrs.empty()
530 ?
reinterpret_cast<const int8_t**
>(col_buf_ptrs.data())
532 if (!col_buf_ptrs.empty()) {
533 CHECK(byte_stream_ptr);
535 const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() :
nullptr;
536 if (!col_sizes.empty()) {
537 CHECK(col_sizes_ptr);
539 const auto input_str_dict_proxy_byte_stream_ptr =
540 !input_str_dict_proxy_ptrs.empty()
541 ?
reinterpret_cast<const int8_t**
>(input_str_dict_proxy_ptrs.data())
544 const auto output_str_dict_proxy_byte_stream_ptr =
545 !output_str_dict_proxy_ptrs.empty()
546 ?
reinterpret_cast<int8_t**
>(output_str_dict_proxy_ptrs.data())
552 err = compilation_context->table_function_entry_point()(
553 reinterpret_cast<const int8_t*
>(mgr.get()),
556 input_str_dict_proxy_byte_stream_ptr,
558 output_str_dict_proxy_byte_stream_ptr,
560 }
catch (std::exception
const& e) {
562 std::string(e.what()));
568 output_row_count = mgr->get_nrows();
571 std::string(mgr->get_error_message()));
579 if (exe_unit.table_func.hasCompileTimeOutputSizeConstant()) {
580 if (static_cast<size_t>(output_row_count) != mgr->get_nrows()) {
582 "Table function with constant sizing parameter must return " +
587 if (output_row_count < 0 || (
size_t)output_row_count > mgr->get_nrows()) {
588 output_row_count = mgr->get_nrows();
592 if (exe_unit.table_func.hasTableFunctionSpecifiedParameter() && !mgr->query_buffers) {
594 if (output_row_count == 0) {
596 mgr->allocate_output_buffers(0);
602 mgr->query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
604 auto group_by_buffers_ptr = mgr->query_buffers->getGroupByBuffersPtr();
605 CHECK(group_by_buffers_ptr);
606 auto output_buffers_ptr =
reinterpret_cast<int64_t*
>(group_by_buffers_ptr[0]);
608 auto num_out_columns = exe_unit.target_exprs.size();
609 int8_t* src =
reinterpret_cast<int8_t*
>(output_buffers_ptr);
610 int8_t* dst =
reinterpret_cast<int8_t*
>(output_buffers_ptr);
614 for (
size_t col_idx = 0; col_idx < num_out_columns; col_idx++) {
615 auto ti = exe_unit.target_exprs[col_idx]->get_type_info();
616 if (ti.usesFlatBuffer()) {
621 CHECK_EQ(mgr->get_nrows(), output_row_count);
624 const size_t actual_column_size = allocated_column_size;
627 if (ti.is_text_encoding_dict_array()) {
628 const auto* ti_lite =
634 const size_t target_width = ti.get_size();
635 const size_t allocated_column_size = target_width * mgr->get_nrows();
636 const size_t actual_column_size = target_width * output_row_count;
638 auto t = memmove(dst, src, actual_column_size);
645 return mgr->query_buffers->getResultSetOwned(0);
664 const std::shared_ptr<GpuCompilationContext>& compilation_context,
665 std::vector<const int8_t*>& col_buf_ptrs,
666 std::vector<int64_t>& col_sizes,
667 std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
668 const size_t elem_count,
669 std::vector<int8_t*>& output_str_dict_proxy_ptrs,
671 Executor* executor) {
679 auto data_mgr = executor->getDataMgr();
680 auto gpu_allocator = std::make_unique<CudaAllocator>(
682 CHECK(gpu_allocator);
690 reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(
sizeof(int8_t*)));
693 auto byte_stream_ptr = !(col_buf_ptrs.empty())
694 ? gpu_allocator->alloc(col_buf_ptrs.size() *
sizeof(int64_t))
696 if (byte_stream_ptr) {
697 gpu_allocator->copyToDevice(byte_stream_ptr,
698 reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
699 col_buf_ptrs.size() *
sizeof(int64_t));
703 auto col_sizes_ptr = !(col_sizes.empty())
704 ? gpu_allocator->alloc(col_sizes.size() *
sizeof(int64_t))
707 gpu_allocator->copyToDevice(col_sizes_ptr,
708 reinterpret_cast<int8_t*>(col_sizes.data()),
709 col_sizes.size() *
sizeof(int64_t));
714 reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(
sizeof(int32_t)));
719 for (
size_t i = 0; i < num_out_columns; i++) {
720 const size_t col_width = exe_unit.
target_exprs[i]->get_type_info().get_size();
721 query_mem_desc.
addColSlotInfo({std::make_tuple(col_width, col_width)});
724 auto query_buffers = std::make_unique<QueryMemoryInitializer>(
729 (allocated_output_row_count == 0 ? 1 : allocated_output_row_count),
730 std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
731 std::vector<std::vector<uint64_t>>{{0}},
737 int64_t output_row_count = allocated_output_row_count;
740 reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(
sizeof(int64_t*)));
741 gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[
OUTPUT_ROW_COUNT]),
742 reinterpret_cast<int8_t*>(&output_row_count),
743 sizeof(output_row_count));
752 const unsigned block_size_x =
753 (exe_unit.table_func.isRuntime() ? 1 : executor->blockSize());
754 const unsigned block_size_y = 1;
755 const unsigned block_size_z = 1;
756 const unsigned grid_size_x =
757 (exe_unit.table_func.isRuntime() ? 1 : executor->gridSize());
758 const unsigned grid_size_y = 1;
759 const unsigned grid_size_z = 1;
761 auto gpu_output_buffers =
762 query_buffers->setupTableFunctionGpuBuffers(query_mem_desc,
773 std::vector<void*> param_ptrs;
774 for (
auto& param : kernel_params) {
775 param_ptrs.push_back(¶m);
780 CHECK(compilation_context);
781 const auto native_code = compilation_context->getNativeCode(device_id);
782 auto cu_func =
static_cast<CUfunction>(native_code.first);
784 VLOG(1) <<
"Launch GPU table function kernel compiled with the following block and "
786 << block_size_x <<
" and " << grid_size_x;
801 gpu_allocator->copyFromDevice(
802 reinterpret_cast<int8_t*>(&output_row_count),
803 reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
805 if (exe_unit.table_func.hasNonUserSpecifiedOutputSize()) {
806 if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
808 "Table function with constant sizing parameter must return " +
813 if (output_row_count < 0 || (
size_t)output_row_count > allocated_output_row_count) {
814 output_row_count = allocated_output_row_count;
819 query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
822 query_buffers->copyFromTableFunctionGpuBuffers(data_mgr,
830 return query_buffers->getResultSetOwned(0);
Defines data structures for the semantic analysis phase of query processing.
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
bool is_timestamp() const
std::vector< Analyzer::Expr * > input_exprs
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
unsigned long long CUdeviceptr
std::shared_ptr< ResultSet > ResultSetPtr
size_t output_buffer_size_param
bool containsPreFlightFn() const
size_t get_bit_width(const SQLTypeInfo &ti)
CONSTEXPR DEVICE bool is_null(const T &value)
bool is_timeinterval() const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::mutex TableFunctionManager_singleton_mutex
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk's pointer and element count on either CPU or GPU.
void launchPreCodeOnCpu(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, Executor *executor)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
bool hasTableFunctionSpecifiedParameter() const
#define DEBUG_TIMER(name)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
bool hasOutputSizeIndependentOfInputSize() const
bool is_text_encoding_none() const
OutputBufferSizeType getOutputRowSizeType() const
void append_literal_buffer(const Datum &d, const SQLTypeInfo &ti, int8_t *literal_buffer, int64_t offset)
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)
ColumnCacheMap & columnarized_table_cache_
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
static int64_t getBufferSize(const void *buffer)