19 #include <arrow/api.h>
20 #include <arrow/io/memory.h>
21 #include <arrow/ipc/api.h>
29 switch (field.type()->id()) {
30 case arrow::Type::INT8:
32 case arrow::Type::INT16:
34 case arrow::Type::INT32:
36 case arrow::Type::INT64:
38 case arrow::Type::FLOAT:
40 case arrow::Type::DOUBLE:
42 case arrow::Type::DICTIONARY:
44 case arrow::Type::TIMESTAMP: {
47 if (get_precision(arrow::timestamp(arrow::TimeUnit::SECOND))) {
49 }
else if (get_precision(arrow::timestamp(arrow::TimeUnit::MILLI))) {
51 }
else if (get_precision(arrow::timestamp(arrow::TimeUnit::MICRO))) {
53 }
else if (get_precision(arrow::timestamp(arrow::TimeUnit::NANO))) {
59 case arrow::Type::DATE32:
61 case arrow::Type::DATE64:
63 case arrow::Type::TIME32:
75 const std::vector<TargetMetaInfo>& targets_meta,
77 : rows_(rows), targets_meta_(targets_meta), crt_row_idx_(0) {
80 for (
int i = 0; i < schema->num_fields(); ++i) {
81 std::shared_ptr<arrow::Field>
field = schema->field(i);
89 const std::shared_ptr<ResultSet>&
rows,
90 const std::vector<TargetMetaInfo>& targets_meta,
92 const size_t min_result_size_for_bulk_dictionary_fetch,
93 const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
94 : rows_(rows), targets_meta_(targets_meta), crt_row_idx_(0) {
96 min_result_size_for_bulk_dictionary_fetch,
97 max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
99 for (
int i = 0; i < schema->num_fields(); ++i) {
100 std::shared_ptr<arrow::Field>
field = schema->field(i);
107 template <
typename Type,
typename ArrayType>
109 const arrow::Array& column,
111 const size_t idx)
const {
112 const auto& col =
static_cast<const ArrayType&
>(column);
113 row.emplace_back(col.IsNull(idx) ? null_val :
static_cast<Type>(col.Value(idx)));
116 std::vector<std::string> ArrowResultSet::getDictionaryStrings(
117 const size_t col_idx)
const {
119 throw std::runtime_error(
"ArrowResultSet::getDictionaryStrings: col_idx is invalid.");
121 const auto& column_typeinfo =
getColType(col_idx);
122 if (column_typeinfo.get_type() !=
kTEXT) {
123 throw std::runtime_error(
124 "ArrowResultSet::getDictionaryStrings: col_idx does not refer to column of type "
128 const auto& column = *
columns_[col_idx];
129 CHECK_EQ(arrow::Type::DICTIONARY, column.type_id());
130 const auto& dict_column =
static_cast<const arrow::DictionaryArray&
>(column);
131 const auto& dictionary =
132 static_cast<const arrow::StringArray&
>(*dict_column.dictionary());
133 const size_t dictionary_size = dictionary.length();
134 std::vector<std::string> dictionary_strings;
135 dictionary_strings.reserve(dictionary_size);
136 for (
size_t d = 0; d < dictionary_size; ++d) {
137 dictionary_strings.emplace_back(dictionary.GetString(d));
139 return dictionary_strings;
148 std::vector<TargetValue> row;
152 switch (column_typeinfo.get_type()) {
154 CHECK_EQ(arrow::Type::INT8, column.type_id());
155 appendValue<int64_t, arrow::Int8Array>(
160 CHECK_EQ(arrow::Type::INT16, column.type_id());
161 appendValue<int64_t, arrow::Int16Array>(
166 CHECK_EQ(arrow::Type::INT32, column.type_id());
167 appendValue<int64_t, arrow::Int32Array>(
172 CHECK_EQ(arrow::Type::INT64, column.type_id());
173 appendValue<int64_t, arrow::Int64Array>(
178 CHECK_EQ(arrow::Type::FLOAT, column.type_id());
179 appendValue<float, arrow::FloatArray>(
184 CHECK_EQ(arrow::Type::DOUBLE, column.type_id());
185 appendValue<double, arrow::DoubleArray>(
191 CHECK_EQ(arrow::Type::DICTIONARY, column.type_id());
192 const auto& dict_column =
static_cast<const arrow::DictionaryArray&
>(column);
193 if (dict_column.IsNull(index)) {
196 const auto& indices =
197 static_cast<const arrow::Int32Array&
>(*dict_column.indices());
198 const auto& dictionary =
199 static_cast<const arrow::StringArray&
>(*dict_column.dictionary());
200 row.emplace_back(dictionary.GetString(indices.Value(index)));
205 CHECK_EQ(arrow::Type::TIMESTAMP, column.type_id());
206 appendValue<int64_t, arrow::TimestampArray>(
212 CHECK(arrow::Type::DATE32 == column.type_id() ||
213 arrow::Type::DATE64 == column.type_id());
214 column_typeinfo.is_date_in_days()
215 ? appendValue<int64_t, arrow::Date32Array>(
217 : appendValue<int64_t, arrow::Date64Array>(
222 CHECK_EQ(arrow::Type::TIME32, column.type_id());
223 appendValue<int64_t, arrow::Time32Array>(
271 return rowCount() ==
static_cast<size_t>(0);
279 default_max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
284 const size_t min_result_size_for_bulk_dictionary_fetch,
285 const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch) {
286 std::vector<std::string> col_names;
290 col_names.push_back(meta.get_resname());
293 for (
unsigned int i = 0; i <
rows_->colCount(); i++) {
304 min_result_size_for_bulk_dictionary_fetch,
305 max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
310 results_ = std::make_shared<ArrowResult>(converter.getArrowResult());
313 arrow::io::BufferReader reader(
314 reinterpret_cast<const uint8_t*>(
results_->df_buffer.data()),
results_->df_size);
317 arrow::ipc::RecordBatchStreamReader::Open(&reader));
342 const std::shared_ptr<ResultSet>&
rows,
344 return results ? std::make_unique<ArrowResultSet>(
346 : std::make_unique<ArrowResultSet>(rows, device_type);
351 const std::shared_ptr<ResultSet>&
rows,
353 const size_t min_result_size_for_bulk_dictionary_fetch,
354 const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch) {
355 std::vector<TargetMetaInfo> dummy_targets_meta;
356 return results ? std::make_unique<ArrowResultSet>(
360 min_result_size_for_bulk_dictionary_fetch,
361 max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
362 : std::make_unique<ArrowResultSet>(
366 min_result_size_for_bulk_dictionary_fetch,
367 max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
#define ARROW_THROW_NOT_OK(s)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::shared_ptr< ArrowResult > results_
std::shared_ptr< ResultSet > rows_
SQLTypeInfo getColType(const size_t col_idx) const
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
arrow::ipc::DictionaryMemo dictionary_memo_
SQLTypeInfo type_from_arrow_field(const arrow::Field &field)
ArrowResultSet(const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta, const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
size_t entryCount() const
const std::vector< TargetMetaInfo > & getTargetsMeta() const
const std::shared_ptr< ResultSet > & getRows() const
std::vector< TargetValue > getNextRow(const bool translate_strings, const bool decimal_to_double) const
std::unique_ptr< ArrowResultSet > result_set_arrow_loopback(const ExecutionResult &results)
constexpr float inline_fp_null_value< float >()
std::vector< TargetMetaInfo > column_metainfo_
constexpr double inline_fp_null_value< double >()
boost::variant< std::string, void * > NullableString
bool definitelyHasNoRows() const
static constexpr size_t default_min_result_size_for_bulk_dictionary_fetch
void appendValue(std::vector< TargetValue > &row, const arrow::Array &column, const Type null_val, const size_t idx) const
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
void resultSetArrowLoopback(const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
std::vector< TargetValue > getRowAt(const size_t index) const
std::shared_ptr< arrow::RecordBatch > record_batch_
std::vector< TargetMetaInfo > targets_meta_
std::vector< std::shared_ptr< arrow::Array > > columns_