64 std::vector<const int8_t*>& col_buf_ptrs,
65 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
81 for (
size_t i = 0; i < num_out_columns; i++) {
95 if (std::this_thread::get_id() !=
thread_id_) {
96 throw std::runtime_error(
97 "TableFunctionManager instance accessed from an alien thread!");
114 int64_t output_item_values_total_number) {
128 int64_t output_array_values_total_number) {
143 for (
size_t i = 0; i < num_out_columns; i++) {
146 if (ti.usesFlatBuffer()) {
147 int64_t total_number = -1;
148 switch (ti.get_type()) {
151 UNREACHABLE() <<
"allocate_output_buffers not implemented for "
161 throw std::runtime_error(
"set_output_item_values_total_number(" +
163 ", <total_number>) must be called before "
164 "set_output_row_size(<size>) in " +
173 UNREACHABLE() <<
"allocate_output_buffers not implemented for "
186 const size_t col_width = ti.get_size();
187 query_mem_desc.
addColSlotInfo({std::make_tuple(col_width, col_width)});
210 std::vector<std::vector<uint64_t>>{{0}},
214 if (output_num_rows_ != 0) {
215 auto group_by_buffers_ptr =
query_buffers->getGroupByBuffersPtr();
216 CHECK(group_by_buffers_ptr);
217 auto output_buffers_ptr =
reinterpret_cast<int8_t*
>(group_by_buffers_ptr[0]);
218 for (
size_t i = 0; i < num_out_columns; i++) {
223 col->ptr = output_buffers_ptr;
226 auto ti = exe_unit_.target_exprs[i]->get_type_info();
227 if (ti.usesFlatBuffer()) {
229 int64_t total_number = -1;
230 switch (ti.get_type()) {
233 UNREACHABLE() <<
"allocate_output_buffers not implemented for "
248 UNREACHABLE() <<
"allocate_output_buffers not implemented for "
255 CHECK_EQ(m.getBufferSize(), query_mem_desc.getFlatBufferSize(i));
256 output_buffers_ptr =
align_to_int64(output_buffers_ptr + m.getBufferSize());
258 const size_t col_width = ti.get_size();
260 align_to_int64(output_buffers_ptr + col_width * output_num_rows_);
277 const uint8_t* raw_bytes,
278 const size_t num_bytes,
285 const uint8_t*& raw_bytes,
293 const auto proxy =
executor_->getStringDictionaryProxy(
295 return proxy->getDictKey().db_id;
299 const auto proxy =
executor_->getStringDictionaryProxy(
301 return proxy->getDictKey().dict_id;
305 return reinterpret_cast<int8_t*
>(
309 inline std::string
getString(int32_t db_id, int32_t dict_id, int32_t string_id) {
312 return proxy->getString(string_id);
317 const std::string& str) {
320 return proxy->getOrAddTransient(str);
323 inline int8_t*
makeBuffer(int64_t element_count, int64_t element_size) {
325 reinterpret_cast<int8_t*
>(
checked_malloc((element_count + 1) * element_size));
354 CHECK(instance_ ==
nullptr);
356 CHECK(instance_ !=
nullptr);
std::vector< int64_t > output_item_values_total_number_
std::unique_ptr< QueryMemoryInitializer > query_buffers
void set_output_column(int32_t index, int8_t *ptr)
void addColSlotInfoFlatBuffer(const int64_t flatbuffer_size)
std::string getString(int32_t db_id, int32_t dict_id, int32_t string_id)
std::vector< const int8_t * > & col_buf_ptrs_
void set_output_array_values_total_number(int32_t index, int64_t output_array_values_total_number)
TableFunctionManager(const TableFunctionExecutionUnit &exe_unit, Executor *executor, std::vector< const int8_t * > &col_buf_ptrs, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, bool is_singleton)
const TableFunctionExecutionUnit & exe_unit_
void check_thread_id() const
void get_metadata(const char *key, const uint8_t *&raw_bytes, size_t &num_bytes, TableFunctionMetadataType &value_type) const
const table_functions::TableFunction table_func
void set_error_message(const char *msg)
DEVICE int64_t size() const
std::vector< int8_t * > output_column_ptrs
void initializeFlatBuffer(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Constants for Builtin SQL Types supported by HEAVY.AI.
int8_t * getStringDictionaryProxy(int32_t db_id, int32_t dict_id)
#define TRANSIENT_DICT_ID
std::vector< int64_t * > output_col_buf_ptrs
static TableFunctionManager *& get_singleton_internal()
std::string error_message_
void allocate_output_buffers(int64_t output_num_rows)
void * checked_malloc(const size_t size)
int64_t getFlatBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
std::mutex TableFunctionManager_singleton_mutex
std::string getName(const bool drop_suffix=false, const bool lower=false) const
int8_t * makeBuffer(int64_t element_count, int64_t element_size)
UserTableFunctionError(const std::string &message)
std::thread::id thread_id_
void set_metadata(const char *key, const uint8_t *raw_bytes, const size_t num_bytes, const TableFunctionMetadataType value_type) const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
void set_output_item_values_total_number(int32_t index, int64_t output_item_values_total_number)
TableFunctionError(const std::string &message)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
HOST static DEVICE bool isFlatBuffer(const void *buffer)
const int32_t getOrAddTransient(int32_t db_id, int32_t dict_id, const std::string &str)
const char * get_error_message() const
std::string toString() const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
static void set_singleton(TableFunctionManager *instance)