48 const char*
const* argv,
60 std::vector<std::string> col_defs;
62 p_vt->external_query_table->schema.end(),
63 std::back_inserter(col_defs),
65 return target_metainfo.get_resname() +
" " +
66 target_metainfo.get_type_info().get_type_name();
69 const auto create_statement =
70 "create table vtable (" + (col_defs_str.empty() ?
"dummy int" : col_defs_str) +
")";
73 int rc = sqlite3_declare_vtab(db, create_statement.c_str());
75 if (rc != SQLITE_OK) {
89 const char*
const* argv,
92 return vt_create(db, p_aux, argc, argv, pp_vt, pzErr);
102 if (rc == SQLITE_OK) {
109 int vt_open(sqlite3_vtab* pVTab, sqlite3_vtab_cursor** pp_cursor) {
111 *pp_cursor =
reinterpret_cast<sqlite3_vtab_cursor*
>(p_cur);
113 return (p_cur ? SQLITE_OK : SQLITE_NOMEM);
129 CHECK_EQ(p->external_query_table->fetch_result.num_rows.size(), size_t(1));
130 CHECK_EQ(p->external_query_table->fetch_result.num_rows.front().size(), size_t(1));
131 return p->external_query_table->fetch_result.num_rows.front().front();
138 if (p_cur->count == num_rows) {
157 const auto ids_column =
reinterpret_cast<const T*
>(column);
158 const auto val = ids_column[cursor];
160 if (val == inline_int_null_value<T>()) {
168 int vt_column(sqlite3_vtab_cursor*
cur, sqlite3_context* ctx,
int col_idx) {
172 auto p =
reinterpret_cast<OmniSciVtab*
>(p_cur->base.pVtab);
173 const auto& external_query_table = *(p->external_query_table);
174 CHECK_LT(static_cast<size_t>(col_idx),
175 external_query_table.fetch_result.col_buffers[0].size());
176 const auto column = external_query_table.fetch_result.col_buffers[0][col_idx];
177 const auto& col_ti = external_query_table.schema[col_idx].get_type_info();
178 switch (col_ti.get_type()) {
180 const auto val = column[p_cur->count - 1];
181 if (val == inline_int_null_value<int8_t>()) {
182 sqlite3_result_null(ctx);
184 sqlite3_result_int(ctx, val);
189 const auto int_column =
reinterpret_cast<const int16_t*
>(column);
190 const auto val = int_column[p_cur->count - 1];
191 if (val == inline_int_null_value<int16_t>()) {
192 sqlite3_result_null(ctx);
194 sqlite3_result_int(ctx, val);
199 const auto int_column =
reinterpret_cast<const int32_t*
>(column);
200 const auto val = int_column[p_cur->count - 1];
201 if (val == inline_int_null_value<int32_t>()) {
202 sqlite3_result_null(ctx);
204 sqlite3_result_int(ctx, val);
209 const auto int_column =
reinterpret_cast<const int64_t*
>(column);
210 const auto val = int_column[p_cur->count - 1];
211 if (val == inline_int_null_value<int64_t>()) {
212 sqlite3_result_null(ctx);
214 sqlite3_result_int(ctx, val);
219 const auto float_column =
reinterpret_cast<const float*
>(column);
220 const auto val = float_column[p_cur->count - 1];
222 sqlite3_result_null(ctx);
224 sqlite3_result_double(ctx, val);
229 const auto double_column =
reinterpret_cast<const double*
>(column);
230 const auto val = double_column[p_cur->count - 1];
232 sqlite3_result_null(ctx);
234 sqlite3_result_double(ctx, val);
240 const auto executor = external_query_table.executor;
241 const auto sdp = executor->getStringDictionaryProxy(
242 col_ti.getStringDictKey(), executor->getRowSetMemoryOwner(),
true);
245 switch (col_ti.get_size()) {
247 decoded_string = decode_string<uint8_t>(column, p_cur->count - 1, sdp);
251 decoded_string = decode_string<uint16_t>(column, p_cur->count - 1, sdp);
255 decoded_string = decode_string<int32_t>(column, p_cur->count - 1, sdp);
260 LOG(
FATAL) <<
"Invalid encoding size: " << col_ti.get_size();
264 sqlite3_result_null(ctx);
267 ctx, decoded_string.
payload.first, decoded_string.
payload.second,
nullptr);
271 const auto chunk_iter =
277 sqlite3_result_null(ctx);
280 ctx, reinterpret_cast<const char*>(vd.
pointer), vd.
length,
nullptr);
286 LOG(
FATAL) <<
"Unexpected type: " << col_ti.get_type_name();
298 *p_rowid = p_cur->
count;
306 sqlite3_value** argv) {
349 std::map<size_t, TargetMetaInfo> schema_map;
351 const auto& table_key = kv.first.getScanDesc().getTableKey();
354 const int column_id = kv.first.getColId();
356 if (table_key.table_id < 0) {
358 plan_state->
executor_->getTemporaryTables(), table_key.table_id);
359 column_type = result_set->getColType(column_id);
362 const auto cd = catalog->getMetadataForColumn(table_key.table_id, column_id);
363 column_type = cd->columnType;
366 throw std::runtime_error(
"Type not supported yet for extern execution: " +
369 const auto column_ref =
372 schema_map.emplace(kv.second,
TargetMetaInfo(column_ref, column_type));
375 std::vector<TargetMetaInfo> schema;
376 for (
const auto& kv : schema_map) {
377 schema.push_back(kv.second);
385 : external_query_table_(external_query_table) {
386 int status = sqlite3_open(
":memory:", &
db_);
394 int status = sqlite3_close(
db_);
401 int status = sqlite3_exec(
db_, sql.c_str(),
nullptr,
nullptr, &msg);
408 const size_t output_buffer_entry_count,
410 const size_t row_size_quad) {
411 const auto off = pos * row_size_quad;
412 CHECK_LT(pos, output_buffer_entry_count);
413 output_buffer[off] = off;
414 return output_buffer + off + 1;
420 const std::string& sql,
423 connector.
query(sql);
427 auto rs = std::make_unique<ResultSet>(output_spec.
target_infos,
430 output_spec.
executor->getRowSetMemoryOwner(),
433 const auto storage = rs->allocateStorage();
434 auto output_buffer = storage->getUnderlyingBuffer();
435 CHECK(!num_rows || output_buffer);
436 for (
size_t row_idx = 0; row_idx < num_rows; ++row_idx) {
440 query_mem_desc.getRowSize() /
sizeof(int64_t));
443 for (
size_t col_idx = 0; col_idx < connector.
getNumCols(); ++col_idx, ++slot_idx) {
444 const auto& col_type = output_spec.
target_infos[col_idx].sql_type;
445 const int sqlite_col_type = connector.
columnTypes[col_idx];
446 switch (col_type.get_type()) {
452 static const std::string overflow_message{
"Overflow or underflow"};
453 if (sqlite_col_type != SQLITE_INTEGER && sqlite_col_type != SQLITE_NULL) {
454 throw std::runtime_error(overflow_message);
456 if (!connector.
isNull(row_idx, col_idx)) {
458 const auto val = connector.
getData<int64_t>(row_idx, col_idx);
459 if (val > limits.first || val < limits.second) {
460 throw std::runtime_error(overflow_message);
469 CHECK(sqlite_col_type == SQLITE_FLOAT || sqlite_col_type == SQLITE_NULL);
470 if (!connector.
isNull(row_idx, col_idx)) {
471 reinterpret_cast<double*
>(row)[slot_idx] =
472 connector.
getData<
double>(row_idx, col_idx);
479 CHECK(sqlite_col_type == SQLITE_FLOAT || sqlite_col_type == SQLITE_NULL);
480 if (!connector.
isNull(row_idx, col_idx)) {
481 reinterpret_cast<double*
>(row)[slot_idx] =
482 connector.
getData<
double>(row_idx, col_idx);
491 CHECK(sqlite_col_type == SQLITE_TEXT || sqlite_col_type == SQLITE_NULL);
492 if (!connector.
isNull(row_idx, col_idx)) {
493 const auto str = connector.
getData<std::string>(row_idx, col_idx);
494 const auto owned_str =
495 output_spec.
executor->getRowSetMemoryOwner()->addString(str);
496 row[slot_idx] =
reinterpret_cast<int64_t
>(owned_str->c_str());
497 row[++slot_idx] = str.size();
505 LOG(
FATAL) <<
"Unexpected type: " << col_type.get_type_name();
std::lock_guard< T > lock_guard
bool is_supported_type_for_extern_execution(const SQLTypeInfo &ti)
std::pair< const char *, size_t > getStringBytes(int32_t string_id) const noexcept
int vt_connect(sqlite3 *db, void *p_aux, int argc, const char *const *argv, sqlite3_vtab **pp_vt, char **pzErr)
T getData(const int row, const int col)
int vt_filter(sqlite3_vtab_cursor *p_vtc, int idxNum, const char *idxStr, int argc, sqlite3_value **argv)
int vt_rowid(sqlite3_vtab_cursor *cur, sqlite_int64 *p_rowid)
int vt_close(sqlite3_vtab_cursor *cur)
std::unordered_map< InputColDescriptor, size_t > global_to_local_col_ids_
RUNTIME_EXPORT ALWAYS_INLINE DEVICE int64_t * get_scan_output_slot(int64_t *output_buffer, const uint32_t output_buffer_entry_count, const uint32_t pos, const int64_t offset_in_fragment, const uint32_t row_size_quad)
void run(const std::string &sql)
DecodedString decode_string(const int8_t *column, const size_t cursor, StringDictionaryProxy *sdp)
virtual void query(const std::string &queryString)
int vt_next(sqlite3_vtab_cursor *cur)
int64_t get_num_rows(OmniSciCursor *p_cur)
std::unique_ptr< ResultSet > run_query_external(const ExecutionUnitSql &sql, const FetchResult &fetch_result, const PlanState *plan_state, const ExternalQueryOutputSpec &output_spec)
sqlite3_module omnisci_module
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
const Executor * executor_
std::string serialize_column_ref(const int table_id, const int column_id, const Catalog_Namespace::Catalog *catalog)
const Executor * executor
std::unique_ptr< ResultSet > runSelect(const std::string &sql, const ExternalQueryOutputSpec &output_spec)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
static SysCatalog & instance()
std::vector< TargetMetaInfo > create_table_schema(const PlanState *plan_state)
int vt_open(sqlite3_vtab *pVTab, sqlite3_vtab_cursor **pp_cursor)
std::pair< const char *, size_t > payload
int vt_column(sqlite3_vtab_cursor *cur, sqlite3_context *ctx, int col_idx)
int vt_disconnect(sqlite3_vtab *pVtab)
OUTPUT transform(INPUT const &input, FUNC const &func)
std::vector< int > columnTypes
std::vector< TargetInfo > target_infos
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
QueryMemoryDescriptor query_mem_desc
bool isNull(const int row, const int col) const
int vt_destructor(sqlite3_vtab *pVtab)
constexpr float inline_fp_null_value< float >()
ExternalQueryTable external_query_table_
constexpr double inline_fp_null_value< double >()
std::string get_type_name() const
static std::mutex session_mutex_
virtual size_t getNumCols() const
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
std::pair< int64_t, int64_t > inline_int_max_min(const size_t byte_width)
int vt_destroy(sqlite3_vtab *pVtab)
int vt_create(sqlite3 *db, void *p_aux, int argc, const char *const *argv, sqlite3_vtab **pp_vt, char **pzErr)
SqliteMemDatabase(const ExternalQueryTable &external_query_table)
int vt_best_index(sqlite3_vtab *tab, sqlite3_index_info *pIdxInfo)
virtual size_t getNumRows() const
const ExternalQueryTable * external_query_table
int vt_eof(sqlite3_vtab_cursor *cur)