17 #include "../Shared/DateConverters.h"
20 #include "arrow/ipc/dictionary.h"
21 #include "arrow/ipc/options.h"
26 #include <sys/types.h>
40 #include "arrow/api.h"
41 #include "arrow/io/memory.h"
42 #include "arrow/ipc/api.h"
47 #include <arrow/gpu/cuda_api.h>
51 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
53 #define ARROW_CONVERTER_DEBUG true
55 #define ARROW_LOG(category) \
56 VLOG(1) << "[Arrow]" \
57 << "[" << category "] "
67 : arrow::Buffer(buf, size), _rs(rs) {}
109 template <
typename TYPE,
typename VALUE_ARRAY_TYPE>
111 std::shared_ptr<ValueArray>& values,
112 const size_t max_size) {
114 values = std::make_shared<ValueArray>(std::vector<TYPE>());
115 boost::get<std::vector<TYPE>>(*values).reserve(max_size);
118 auto values_ty = boost::get<std::vector<TYPE>>(values.get());
121 auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
123 if constexpr (std::is_same_v<VALUE_ARRAY_TYPE, NullableString>) {
124 if (
auto str = boost::get<std::string>(pval_cty)) {
125 values_ty->push_back(*str);
127 values_ty->push_back(
"");
130 auto val_ty =
static_cast<TYPE
>(*pval_cty);
131 values_ty->push_back(val_ty);
135 template <
typename TYPE,
typename VALUE_ARRAY_TYPE>
137 std::shared_ptr<ValueArray>& values,
138 const size_t max_size) {
140 values = std::make_shared<ValueArray>(
Vec2<TYPE>());
141 boost::get<Vec2<TYPE>>(*values).reserve(max_size);
145 Vec2<TYPE>* values_ty = boost::get<Vec2<TYPE>>(values.get());
148 values_ty->emplace_back(std::vector<TYPE>{});
151 for (
auto val_cty : val_ctys.value()) {
152 auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
154 values_ty->back().emplace_back(static_cast<TYPE>(*pval_cty));
161 std::shared_ptr<std::vector<bool>>& null_bitmap,
162 const size_t max_size) {
169 null_bitmap = std::make_shared<std::vector<bool>>();
170 null_bitmap->reserve(max_size);
173 null_bitmap->push_back(value ?
true :
false);
176 template <
typename TYPE>
179 std::shared_ptr<std::vector<bool>>& null_bitmap,
180 const size_t max_size) {
185 auto pvalue = boost::get<TYPE>(&value);
187 bool is_valid =
false;
188 if constexpr (std::is_same_v<TYPE, NullableString>) {
189 is_valid = boost::get<std::string>(pvalue) !=
nullptr;
197 }
else if (col_type.
is_fp()) {
205 null_bitmap = std::make_shared<std::vector<bool>>();
206 null_bitmap->reserve(max_size);
209 null_bitmap->push_back(is_valid);
212 template <
typename TYPE,
typename enable =
void>
215 template <
typename TYPE>
216 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
218 static constexpr
type value = inline_int_null_value<type>();
221 template <
typename TYPE>
222 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
224 static constexpr
type value = inline_fp_null_value<type>();
227 template <
typename TYPE>
230 template <
typename C_TYPE,
231 typename ARROW_TYPE =
typename arrow::CTypeTraits<C_TYPE>::ArrowType>
235 std::shared_ptr<arrow::Array>& out) {
236 CHECK(
sizeof(C_TYPE) == result->getColType(col).get_size());
238 std::shared_ptr<arrow::Buffer> values;
239 std::shared_ptr<arrow::Buffer> is_valid;
240 const int64_t buf_size = entry_count *
sizeof(C_TYPE);
241 if (result->isZeroCopyColumnarConversionPossible(col)) {
243 reinterpret_cast<const uint8_t*>(result->getColumnarBuffer(col)),
247 auto res = arrow::AllocateBuffer(buf_size);
249 values = std::move(
res).ValueOrDie();
250 result->copyColumnIntoBuffer(
251 col, reinterpret_cast<int8_t*>(values->mutable_data()), buf_size);
254 int64_t null_count = 0;
255 auto res = arrow::AllocateBuffer((entry_count + 7) / 8);
257 is_valid = std::move(
res).ValueOrDie();
259 auto is_valid_data = is_valid->mutable_data();
265 size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
266 for (
size_t i = 0; i < unroll_count; i += 8) {
267 uint8_t valid_byte = 0;
269 valid = vals[i + 0] != null_val;
270 valid_byte |= valid << 0;
271 null_count += !valid;
272 valid = vals[i + 1] != null_val;
273 valid_byte |= valid << 1;
274 null_count += !valid;
275 valid = vals[i + 2] != null_val;
276 valid_byte |= valid << 2;
277 null_count += !valid;
278 valid = vals[i + 3] != null_val;
279 valid_byte |= valid << 3;
280 null_count += !valid;
281 valid = vals[i + 4] != null_val;
282 valid_byte |= valid << 4;
283 null_count += !valid;
284 valid = vals[i + 5] != null_val;
285 valid_byte |= valid << 5;
286 null_count += !valid;
287 valid = vals[i + 6] != null_val;
288 valid_byte |= valid << 6;
289 null_count += !valid;
290 valid = vals[i + 7] != null_val;
291 valid_byte |= valid << 7;
292 null_count += !valid;
293 is_valid_data[i >> 3] = valid_byte;
295 if (unroll_count != entry_count) {
296 uint8_t valid_byte = 0;
297 for (
size_t i = unroll_count; i < entry_count; ++i) {
298 bool valid = vals[i] != null_val;
299 valid_byte |= valid << (i & 7);
300 null_count += !valid;
302 is_valid_data[unroll_count >> 3] = valid_byte;
313 new arrow::NumericArray<ARROW_TYPE>(entry_count, values, is_valid, null_count));
315 out.reset(
new arrow::NumericArray<ARROW_TYPE>(entry_count, values));
320 std::pair<key_t, void*>
get_shm(
size_t shmsz) {
322 return std::make_pair(IPC_PRIVATE,
nullptr);
330 auto key =
static_cast<key_t
>(rand());
334 while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
343 if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
344 throw std::runtime_error(
"failed to create a shared memory");
346 key =
static_cast<key_t
>(rand());
349 auto ipc_ptr = shmat(shmid, NULL, 0);
350 if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
351 throw std::runtime_error(
"failed to attach a shared memory");
354 return std::make_pair(key, ipc_ptr);
360 throw std::runtime_error(
"Arrow IPC not yet supported on Windows.");
361 return std::make_pair(0,
nullptr);
363 auto [key, ipc_ptr] =
get_shm(size);
364 std::shared_ptr<arrow::Buffer> buffer(
365 new arrow::MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
366 return std::make_pair<key_t, std::shared_ptr<arrow::Buffer>>(std::move(key),
372 const std::vector<uint8_t>& bitmap,
373 std::vector<int64_t>& vec1d) {
379 auto all_strings_remapped_bitmap = [&column_builder, &vec1d, &bitmap]() {
380 for (
size_t i = 0; i < vec1d.size(); i++) {
387 auto all_strings_remapped = [&column_builder, &vec1d]() {
388 for (
size_t i = 0; i < vec1d.size(); i++) {
393 auto only_transient_strings_remapped = [&column_builder, &vec1d]() {
394 for (
size_t i = 0; i < vec1d.size(); i++) {
401 auto only_transient_strings_remapped_bitmap = [&column_builder, &vec1d, &bitmap]() {
402 for (
size_t i = 0; i < vec1d.size(); i++) {
403 if (bitmap[i] && vec1d[i] < 0) {
411 bitmap.empty() ? all_strings_remapped() : all_strings_remapped_bitmap();
414 bitmap.empty() ? only_transient_strings_remapped()
415 : only_transient_strings_remapped_bitmap();
428 throw std::runtime_error(
"Arrow IPC not yet supported on Windows.");
430 auto [key, ipc_ptr] =
get_shm(data->size());
434 memcpy(ipc_ptr, data->data(), data->size());
448 std::shared_ptr<arrow::RecordBatch> record_batch =
convertToArrow();
450 struct BuildResultParams {
451 int64_t schemaSize()
const {
452 return serialized_schema ? serialized_schema->size() : 0;
454 int64_t dictSize()
const {
return serialized_dict ? serialized_dict->size() : 0; };
455 int64_t totalSize()
const {
return schemaSize() + records_size + dictSize(); }
456 bool hasRecordBatch()
const {
return records_size > 0; }
457 bool hasDict()
const {
return dictSize() > 0; }
459 int64_t records_size{0};
460 std::shared_ptr<arrow::Buffer> serialized_schema{
nullptr};
461 std::shared_ptr<arrow::Buffer> serialized_dict{
nullptr};
467 auto timer =
DEBUG_TIMER(
"serialize batch to wire");
468 const auto total_size = result_params.totalSize();
469 std::vector<char> record_handle_data(total_size);
470 auto serialized_records =
471 arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
476 reinterpret_cast<const uint8_t*>(result_params.serialized_schema->data()),
477 result_params.schemaSize()));
479 if (result_params.hasDict()) {
481 reinterpret_cast<const uint8_t*>(result_params.serialized_dict->data()),
482 result_params.dictSize()));
485 arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
486 serialized_records, result_params.schemaSize() + result_params.dictSize()));
488 if (result_params.hasRecordBatch()) {
490 *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
493 return {std::vector<char>(0),
495 std::vector<char>(0),
496 serialized_records->size(),
498 std::move(record_handle_data)};
502 auto timer =
DEBUG_TIMER(
"serialize batch to shared memory");
503 std::shared_ptr<arrow::Buffer> serialized_records;
504 std::vector<char> schema_handle_buffer;
505 std::vector<char> record_handle_buffer(
sizeof(key_t), 0);
506 key_t records_shm_key = IPC_PRIVATE;
507 const int64_t total_size = result_params.totalSize();
509 std::tie(records_shm_key, serialized_records) =
get_shm_buffer(total_size);
511 memcpy(serialized_records->mutable_data(),
512 result_params.serialized_schema->data(),
513 (size_t)result_params.schemaSize());
515 if (result_params.hasDict()) {
516 memcpy(serialized_records->mutable_data() + result_params.schemaSize(),
517 result_params.serialized_dict->data(),
518 (size_t)result_params.dictSize());
521 arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
522 serialized_records, result_params.schemaSize() + result_params.dictSize()));
524 if (result_params.hasRecordBatch()) {
526 *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
529 memcpy(&record_handle_buffer[0],
530 reinterpret_cast<const unsigned char*>(&records_shm_key),
533 return {schema_handle_buffer,
535 record_handle_buffer,
536 serialized_records->size(),
540 arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
541 auto options = arrow::ipc::IpcWriteOptions::Defaults();
542 auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
546 if (!record_batch->num_rows()) {
548 arrow::ipc::SerializeSchema(*record_batch->schema(),
549 arrow::default_memory_pool()));
553 return getWireResult();
555 return getShmResult();
563 ARROW_LOG(
"CPU") <<
"found " << dictionaries.size() <<
" dictionaries";
565 for (
auto& pair : dictionaries) {
566 arrow::ipc::IpcPayload payload;
567 int64_t dictionary_id = pair.first;
568 const auto& dictionary = pair.second;
571 GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
572 int32_t metadata_length = 0;
574 WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
576 result_params.serialized_dict = dict_stream->Finish().ValueOrDie();
579 arrow::ipc::SerializeSchema(*record_batch->schema(),
580 arrow::default_memory_pool()));
583 arrow::ipc::GetRecordBatchSize(*record_batch, &result_params.records_size));
587 return getWireResult();
589 return getShmResult();
598 auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
600 auto out_stream = std::move(out_stream_result).ValueOrDie();
602 arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
603 arrow::ipc::DictionaryMemo current_memo;
604 arrow::ipc::DictionaryMemo serialized_memo;
606 arrow::ipc::IpcPayload schema_payload;
608 arrow::ipc::IpcWriteOptions::Defaults(),
611 int32_t schema_payload_length = 0;
613 arrow::ipc::IpcWriteOptions::Defaults(),
615 &schema_payload_length));
618 <<
"found dicts: " << dictionaries.size();
621 arrow::ipc::internal::CollectDictionaries(*record_batch, ¤t_memo));
624 std::shared_ptr<arrow::Schema> dummy_schema;
625 std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
627 for (
const auto& pair : dictionaries) {
628 arrow::ipc::IpcPayload payload;
629 const auto& dict_id = pair.first;
632 <<
"dict_id: " << dict_id;
633 const auto& dict = pair.second;
637 auto dummy_field = std::make_shared<arrow::Field>(
"", dict->type());
638 dummy_schema = std::make_shared<arrow::Schema>(
639 std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
641 dict_batches.emplace_back(
642 arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
645 if (!dict_batches.empty()) {
647 dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
650 auto complete_ipc_stream = out_stream->Finish();
652 auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
655 std::vector<char> schema_record_key_buffer(
sizeof(key_t), 0);
656 memcpy(&schema_record_key_buffer[0],
657 reinterpret_cast<const unsigned char*>(&record_key),
660 arrow::cuda::CudaDeviceManager* manager;
662 std::shared_ptr<arrow::cuda::CudaContext> context;
665 std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
667 SerializeRecordBatch(*record_batch, context.get()));
669 std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
672 std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
674 cuda_handle->Serialize(arrow::default_memory_pool()));
676 std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
677 memcpy(&record_handle_buffer[0],
678 serialized_cuda_handle->data(),
679 serialized_cuda_handle->size());
681 return {schema_record_key_buffer,
682 serialized_records->size(),
683 record_handle_buffer,
684 serialized_cuda_handle->size(),
685 serialized_cuda_handle->ToString()};
688 return {std::vector<char>{}, 0, std::vector<char>{}, 0,
""};
694 arrow::ipc::DictionaryFieldMapper* mapper)
const {
696 std::shared_ptr<arrow::RecordBatch> arrow_copy =
convertToArrow();
697 std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
701 arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
703 if (arrow_copy->num_rows()) {
707 arrow::ipc::SerializeRecordBatch(
708 *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
712 return {serialized_schema, serialized_records};
717 const auto col_count =
results_->colCount();
718 std::vector<std::shared_ptr<arrow::Field>> fields;
720 for (
size_t i = 0; i < col_count; ++i) {
721 const auto ti =
results_->getColType(i);
724 #if ARROW_CONVERTER_DEBUG
725 VLOG(1) <<
"Arrow fields: ";
726 for (
const auto&
f : fields) {
727 VLOG(1) <<
"\t" <<
f->ToString(
true);
734 const std::shared_ptr<arrow::Schema>& schema)
const {
735 std::vector<std::shared_ptr<arrow::Array>> result_columns;
744 const size_t entry_count =
top_n_ < 0
748 const auto col_count =
results_->colCount();
749 size_t row_count = 0;
751 result_columns.resize(col_count);
752 std::vector<ColumnBuilder> builders(col_count);
755 for (
size_t i = 0; i < col_count; ++i) {
760 auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
761 std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
762 const std::vector<bool>& non_lazy_cols,
763 const size_t start_entry,
764 const size_t end_entry) ->
size_t {
765 CHECK_EQ(value_seg.size(), col_count);
766 CHECK_EQ(null_bitmap_seg.size(), col_count);
767 const auto local_entry_count = end_entry - start_entry;
768 size_t seg_row_count = 0;
769 for (
size_t i = start_entry; i < end_entry; ++i) {
770 auto row =
results_->getRowAtNoTranslations(i, non_lazy_cols);
775 for (
size_t j = 0; j < col_count; ++j) {
776 if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
780 if (
auto scalar_value = boost::get<ScalarTargetValue>(&row[j])) {
783 const auto& column = builders[j];
784 switch (column.physical_type) {
786 create_or_append_value<bool, int64_t>(
787 *scalar_value, value_seg[j], local_entry_count);
788 create_or_append_validity<int64_t>(
789 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
792 create_or_append_value<int8_t, int64_t>(
793 *scalar_value, value_seg[j], local_entry_count);
794 create_or_append_validity<int64_t>(
795 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
798 create_or_append_value<int16_t, int64_t>(
799 *scalar_value, value_seg[j], local_entry_count);
800 create_or_append_validity<int64_t>(
801 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
804 create_or_append_value<int32_t, int64_t>(
805 *scalar_value, value_seg[j], local_entry_count);
806 create_or_append_validity<int64_t>(
807 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
810 create_or_append_value<int64_t, int64_t>(
811 *scalar_value, value_seg[j], local_entry_count);
812 create_or_append_validity<int64_t>(
813 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
816 create_or_append_value<int64_t, int64_t>(
817 *scalar_value, value_seg[j], local_entry_count);
818 create_or_append_validity<int64_t>(
819 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
822 create_or_append_value<float, float>(
823 *scalar_value, value_seg[j], local_entry_count);
824 create_or_append_validity<float>(
825 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
828 create_or_append_value<double, double>(
829 *scalar_value, value_seg[j], local_entry_count);
830 create_or_append_validity<double>(
831 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
834 create_or_append_value<int32_t, int64_t>(
835 *scalar_value, value_seg[j], local_entry_count);
836 create_or_append_validity<int64_t>(
837 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
841 ? create_or_append_value<int64_t, int64_t>(
842 *scalar_value, value_seg[j], local_entry_count)
843 : create_or_append_value<int32_t, int64_t>(
844 *scalar_value, value_seg[j], local_entry_count);
845 create_or_append_validity<int64_t>(
846 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
849 create_or_append_value<int64_t, int64_t>(
850 *scalar_value, value_seg[j], local_entry_count);
851 create_or_append_validity<int64_t>(
852 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
855 create_or_append_value<std::string, NullableString>(
856 *scalar_value, value_seg[j], local_entry_count);
857 create_or_append_validity<NullableString>(
858 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
862 throw std::runtime_error(column.col_type.get_type_name() +
863 " is not supported in Arrow result sets.");
865 }
else if (
auto array = boost::get<ArrayTargetValue>(&row[j])) {
867 const auto& column = builders[j];
868 switch (column.col_type.get_subtype()) {
870 create_or_append_value<int8_t, int64_t>(
871 *array, value_seg[j], local_entry_count);
873 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
876 create_or_append_value<int8_t, int64_t>(
877 *array, value_seg[j], local_entry_count);
879 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
882 create_or_append_value<int16_t, int64_t>(
883 *array, value_seg[j], local_entry_count);
885 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
888 create_or_append_value<int32_t, int64_t>(
889 *array, value_seg[j], local_entry_count);
891 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
894 create_or_append_value<int64_t, int64_t>(
895 *array, value_seg[j], local_entry_count);
897 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
900 create_or_append_value<float, float>(
901 *array, value_seg[j], local_entry_count);
903 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
906 create_or_append_value<double, double>(
907 *array, value_seg[j], local_entry_count);
909 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
912 if (column.col_type.is_dict_encoded_type()) {
913 create_or_append_value<int64_t, int64_t>(
914 *array, value_seg[j], local_entry_count);
916 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
920 throw std::runtime_error(column.col_type.get_type_name() +
921 " is not supported in Arrow result sets.");
926 return seg_row_count;
929 auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>&
result,
930 const std::vector<bool>& non_lazy_cols,
931 const size_t start_col,
932 const size_t end_col) {
933 for (
size_t col = start_col; col < end_col; ++col) {
934 if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
938 const auto& column = builders[col];
939 switch (column.physical_type) {
941 convert_column<int8_t>(
results_, col, entry_count, result[col]);
944 convert_column<int16_t>(
results_, col, entry_count, result[col]);
947 convert_column<int32_t>(
results_, col, entry_count, result[col]);
950 convert_column<int64_t>(
results_, col, entry_count, result[col]);
953 convert_column<float>(
results_, col, entry_count, result[col]);
956 convert_column<double>(
results_, col, entry_count, result[col]);
959 throw std::runtime_error(column.col_type.get_type_name() +
960 " is not supported in Arrow column converter.");
965 std::vector<std::shared_ptr<ValueArray>> column_values(col_count,
nullptr);
966 std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count,
nullptr);
967 const bool multithreaded = entry_count > 10000 && !
results_->isTruncated();
970 bool use_columnar_converter =
results_->isDirectColumnarConversionPossible() &&
971 (
results_->getQueryMemDesc().getQueryDescriptionType() ==
973 results_->getQueryMemDesc().getQueryDescriptionType() ==
975 entry_count ==
results_->entryCount();
976 std::vector<bool> non_lazy_cols;
977 if (use_columnar_converter) {
979 std::vector<size_t> non_lazy_col_pos;
980 size_t non_lazy_col_count = 0;
981 const auto& lazy_fetch_info =
results_->getLazyFetchInfo();
983 non_lazy_cols.reserve(col_count);
984 non_lazy_col_pos.reserve(col_count);
985 for (
size_t i = 0; i < col_count; ++i) {
987 lazy_fetch_info.empty() ?
false : lazy_fetch_info[i].is_lazily_fetched;
990 switch (builders[i].physical_type) {
1000 if (builders[i].
field->type()->id() == arrow::Type::DICTIONARY) {
1003 non_lazy_cols.emplace_back(!is_lazy);
1005 ++non_lazy_col_count;
1006 non_lazy_col_pos.emplace_back(i);
1010 if (non_lazy_col_count == col_count) {
1011 non_lazy_cols.clear();
1012 non_lazy_col_pos.clear();
1014 non_lazy_col_pos.emplace_back(col_count);
1017 std::vector<std::future<void>> child_threads;
1018 size_t num_threads =
1019 std::min(multithreaded ? (
size_t)
cpu_threads() : (
size_t)1, non_lazy_col_count);
1021 size_t start_col = 0;
1023 for (
size_t i = 0; i < num_threads; ++i) {
1024 start_col = end_col;
1025 end_col = (i + 1) * non_lazy_col_count / num_threads;
1026 size_t phys_start_col =
1027 non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
1028 size_t phys_end_col =
1029 non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
1032 std::ref(result_columns),
1037 for (
auto& child : child_threads) {
1040 row_count = entry_count;
1042 if (!use_columnar_converter || !non_lazy_cols.empty()) {
1045 if (multithreaded) {
1047 std::vector<std::future<size_t>> child_threads;
1048 std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
1049 cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count,
nullptr));
1050 std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
1051 cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count,
nullptr));
1052 const auto stride = (entry_count + cpu_count - 1) / cpu_count;
1053 for (
size_t i = 0, start_entry = 0; start_entry < entry_count;
1054 ++i, start_entry += stride) {
1055 const auto end_entry = std::min(entry_count, start_entry + stride);
1058 std::ref(column_value_segs[i]),
1059 std::ref(null_bitmap_segs[i]),
1064 for (
auto& child : child_threads) {
1065 row_count += child.get();
1069 for (
int i = 0; i < schema->num_fields(); ++i) {
1070 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1074 for (
size_t j = 0; j < cpu_count; ++j) {
1075 if (!column_value_segs[j][i]) {
1078 append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
1084 fetch(column_values, null_bitmaps, non_lazy_cols,
size_t(0), entry_count);
1086 auto timer =
DEBUG_TIMER(
"append rows to arrow single thread");
1087 for (
int i = 0; i < schema->num_fields(); ++i) {
1088 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1092 append(builders[i], *column_values[i], null_bitmaps[i]);
1099 for (
size_t i = 0; i < col_count; ++i) {
1100 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1118 return arrow::boolean();
1120 return arrow::int8();
1122 return arrow::int16();
1124 return arrow::int32();
1126 return arrow::int64();
1128 return arrow::float32();
1130 return arrow::float64();
1135 auto value_type = std::make_shared<arrow::StringType>();
1136 return arrow::dictionary(arrow::int32(), value_type,
false);
1138 return arrow::utf8();
1143 return time32(arrow::TimeUnit::SECOND);
1149 return arrow::date64();
1151 return arrow::date32();
1157 return timestamp(arrow::TimeUnit::SECOND);
1159 return timestamp(arrow::TimeUnit::MILLI);
1161 return timestamp(arrow::TimeUnit::MICRO);
1163 return timestamp(arrow::TimeUnit::NANO);
1165 throw std::runtime_error(
1166 "Unsupported timestamp precision for Arrow result sets: " +
1172 return arrow::list(arrow::boolean());
1174 return arrow::list(arrow::int8());
1176 return arrow::list(arrow::int16());
1178 return arrow::list(arrow::int32());
1180 return arrow::list(arrow::int64());
1182 return arrow::list(arrow::float32());
1184 return arrow::list(arrow::float64());
1187 auto value_type = std::make_shared<arrow::StringType>();
1188 return arrow::list(arrow::dictionary(arrow::int32(), value_type,
false));
1191 throw std::runtime_error(
"Unsupported array type for Arrow result sets: " +
1198 " is not supported in Arrow result sets.");
1206 const std::string
name,
1215 const size_t device_id,
1216 std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
1222 const key_t& schema_key = *(key_t*)(&result.
sm_handle[0]);
1223 auto shm_id = shmget(schema_key, result.
sm_size, 0666);
1225 throw std::runtime_error(
1226 "failed to get an valid shm ID w/ given shm key of the schema");
1228 if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1229 throw std::runtime_error(
"failed to deallocate Arrow schema on errorno(" +
1236 const key_t& df_key = *(key_t*)(&result.
df_handle[0]);
1237 auto shm_id = shmget(df_key, result.
df_size, 0666);
1239 throw std::runtime_error(
1240 "failed to get an valid shm ID w/ given shm key of the data");
1242 if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1243 throw std::runtime_error(
"failed to deallocate Arrow data frame");
1255 const size_t results_col_slot_idx,
1256 const std::shared_ptr<arrow::Field>&
field)
const {
1258 column_builder.
col_type = col_type;
1263 auto value_type = field->type();
1265 auto timer =
DEBUG_TIMER(
"Translate string dictionary to Arrow dictionary");
1267 column_builder.
builder.reset(
new arrow::StringDictionary32Builder());
1275 const size_t result_set_rows =
results_->rowCount();
1279 const auto sdp =
results_->getStringDictionaryProxy(dict_key);
1280 const size_t dictionary_proxy_entries = sdp->entryCount();
1281 const double dictionary_to_result_size_ratio =
1282 static_cast<double>(dictionary_proxy_entries) / result_set_rows;
1298 const bool do_dictionary_bulk_fetch =
1300 dictionary_to_result_size_ratio <=
1303 arrow::StringBuilder str_array_builder;
1305 if (do_dictionary_bulk_fetch) {
1306 VLOG(1) <<
"Arrow dictionary creation: bulk copying all dictionary "
1307 <<
" entries for column at offset " << results_col_slot_idx <<
". "
1308 <<
"Column has " << dictionary_proxy_entries <<
" string entries"
1309 <<
" for a result set with " << result_set_rows <<
" rows.";
1312 const auto str_list =
results_->getStringDictionaryPayloadCopy(dict_key);
1322 int32_t crt_transient_id =
static_cast<int32_t
>(str_list.size());
1323 auto const& transient_vecmap = sdp->getTransientVector();
1324 for (
unsigned index = 0; index < transient_vecmap.size(); ++index) {
1328 .insert(std::make_pair(old_id, crt_transient_id++))
1333 VLOG(1) <<
"Arrow dictionary creation: serializing unique result set dictionary "
1334 <<
" entries for column at offset " << results_col_slot_idx <<
". "
1335 <<
"Column has " << dictionary_proxy_entries <<
" string entries"
1336 <<
" for a result set with " << result_set_rows <<
" rows.";
1345 auto unique_ids_and_strings =
1346 results_->getUniqueStringsForDictEncodedTargetCol(results_col_slot_idx);
1347 const auto& unique_ids = unique_ids_and_strings.first;
1348 const auto& unique_strings = unique_ids_and_strings.second;
1350 const int32_t num_unique_strings = unique_strings.size();
1351 CHECK_EQ(num_unique_strings, unique_ids.size());
1355 for (int32_t unique_string_idx = 0; unique_string_idx < num_unique_strings;
1356 ++unique_string_idx) {
1359 .insert(std::make_pair(unique_ids[unique_string_idx], unique_string_idx))
1366 std::shared_ptr<arrow::StringArray> string_array;
1372 arrow::default_memory_pool(), value_type, &column_builder.
builder));
1375 dynamic_cast<arrow::StringDictionary32Builder*
>(column_builder.
builder.get());
1376 CHECK(dict_builder);
1382 arrow::default_memory_pool(), value_type, &column_builder.
builder));
1388 std::shared_ptr<arrow::Array> values;
1395 template <
typename BUILDER_TYPE,
typename VALUE_ARRAY_TYPE>
1398 const std::shared_ptr<std::vector<bool>>& is_valid) {
1399 static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1400 "Dictionary encoded string builder requires function specialization.");
1402 std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1404 if (scale_epoch_values<BUILDER_TYPE>()) {
1405 auto scale_sec_to_millisec = [](
auto seconds) {
return seconds *
kMilliSecsPerSec; };
1406 auto scale_values = [&](
auto epoch) {
1407 return std::is_same<BUILDER_TYPE, arrow::Date32Builder>::value
1409 : scale_sec_to_millisec(epoch);
1411 std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1414 auto typed_builder =
dynamic_cast<BUILDER_TYPE*
>(column_builder.
builder.get());
1415 CHECK(typed_builder);
1416 if (column_builder.
field->nullable()) {
1417 CHECK(is_valid.get());
1425 void appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1428 const std::shared_ptr<std::vector<bool>>& is_valid) {
1429 std::vector<int64_t> vals = boost::get<std::vector<int64_t>>(values);
1430 auto typed_builder =
1431 dynamic_cast<arrow::Decimal128Builder*
>(column_builder.builder.get());
1432 CHECK(typed_builder);
1433 CHECK_EQ(is_valid->size(), vals.size());
1434 if (column_builder.field->nullable()) {
1435 CHECK(is_valid.get());
1436 for (
size_t i = 0; i < vals.size(); i++) {
1437 const auto v = vals[i];
1438 const auto valid = (*is_valid)[i];
1446 for (
const auto& v : vals) {
1453 void appendToColumnBuilder<arrow::StringBuilder, std::string>(
1456 const std::shared_ptr<std::vector<bool>>& is_valid) {
1457 std::vector<std::string> vals = boost::get<std::vector<std::string>>(values);
1458 auto typed_builder =
dynamic_cast<arrow::StringBuilder*
>(column_builder.builder.get());
1459 CHECK(typed_builder);
1460 CHECK_EQ(is_valid->size(), vals.size());
1462 if (column_builder.field->nullable()) {
1463 CHECK(is_valid.get());
1466 std::vector<uint8_t> transformed_bitmap;
1467 transformed_bitmap.reserve(is_valid->size());
1469 is_valid->begin(), is_valid->end(), [&transformed_bitmap](
const bool is_valid) {
1470 transformed_bitmap.push_back(is_valid ? 1 : 0);
1479 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1482 const std::shared_ptr<std::vector<bool>>& is_valid) {
1483 auto typed_builder =
1484 dynamic_cast<arrow::StringDictionary32Builder*
>(column_builder.builder.get());
1485 CHECK(typed_builder);
1487 std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1491 for (
size_t i = 0; i < vals.size(); i++) {
1492 auto& val = vals[i];
1496 vals[i] = column_builder.string_remapping.at(val);
1500 if (column_builder.field->nullable()) {
1501 CHECK(is_valid.get());
1503 std::vector<uint8_t> transformed_bitmap;
1504 transformed_bitmap.reserve(is_valid->size());
1506 is_valid->begin(), is_valid->end(), [&transformed_bitmap](
const bool is_valid) {
1507 transformed_bitmap.push_back(is_valid ? 1 : 0);
1511 vals.data(),
static_cast<int64_t
>(vals.size()), transformed_bitmap.data()));
1514 typed_builder->AppendIndices(vals.data(),
static_cast<int64_t
>(vals.size())));
1518 template <
typename BUILDER_TYPE,
typename VALUE_TYPE>
1521 const std::shared_ptr<std::vector<bool>>& is_valid) {
1523 auto list_builder =
dynamic_cast<arrow::ListBuilder*
>(column_builder.
builder.get());
1524 CHECK(list_builder);
1526 auto value_builder =
static_cast<BUILDER_TYPE*
>(list_builder->value_builder());
1528 if (column_builder.
field->nullable()) {
1529 for (
size_t i = 0; i < vals.size(); i++) {
1530 if ((*is_valid)[i]) {
1531 const auto& val = vals[i];
1532 std::vector<uint8_t> bitmap(val.size());
1533 std::transform(val.begin(), val.end(), bitmap.begin(), [](VALUE_TYPE pvalue) {
1537 if constexpr (std::is_same_v<BUILDER_TYPE, arrow::BooleanBuilder>) {
1538 std::vector<uint8_t> bval(val.size());
1539 std::copy(val.begin(), val.end(), bval.begin());
1541 value_builder->AppendValues(bval.data(), bval.size(), bitmap.data()));
1544 value_builder->AppendValues(val.data(), val.size(), bitmap.data()));
1551 for (
size_t i = 0; i < vals.size(); i++) {
1552 if ((*is_valid)[i]) {
1553 const auto& val = vals[i];
1555 if constexpr (std::is_same_v<BUILDER_TYPE, arrow::BooleanBuilder>) {
1556 std::vector<uint8_t> bval(val.size());
1557 std::copy(val.begin(), val.end(), bval.begin());
1570 void appendToListColumnBuilder<arrow::StringDictionaryBuilder, int64_t>(
1573 const std::shared_ptr<std::vector<bool>>& is_valid) {
1576 auto* list_builder =
dynamic_cast<arrow::ListBuilder*
>(column_builder.builder.get());
1577 CHECK(list_builder);
1581 auto* value_builder =
1582 dynamic_cast<arrow::StringDictionaryBuilder*
>(list_builder->value_builder());
1583 CHECK(value_builder);
1585 if (column_builder.field->nullable()) {
1586 for (
size_t i = 0; i < vec2d.size(); i++) {
1587 if ((*is_valid)[i]) {
1588 auto& vec1d = vec2d[i];
1589 std::vector<uint8_t> bitmap(vec1d.size());
1590 std::transform(vec1d.begin(), vec1d.end(), bitmap.begin(), [](int64_t pvalue) {
1597 vec1d.data(),
static_cast<int64_t
>(vec1d.size()), bitmap.data()));
1603 for (
size_t i = 0; i < vec2d.size(); i++) {
1604 if ((*is_valid)[i]) {
1605 auto& vec1d = vec2d[i];
1621 const std::shared_ptr<std::vector<bool>>& is_valid)
const {
1625 appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1626 column_builder, values, is_valid);
1631 appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1632 column_builder, values, is_valid);
1635 appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1638 appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1639 column_builder, values, is_valid);
1642 appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1643 column_builder, values, is_valid);
1646 appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1647 column_builder, values, is_valid);
1650 appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1651 column_builder, values, is_valid);
1654 appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1657 appendToColumnBuilder<arrow::DoubleBuilder, double>(
1658 column_builder, values, is_valid);
1661 appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1662 column_builder, values, is_valid);
1665 appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1666 column_builder, values, is_valid);
1670 ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1671 column_builder, values, is_valid)
1672 : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1673 column_builder, values, is_valid);
1676 if (column_builder.col_type.get_subtype() ==
kBOOLEAN) {
1677 appendToListColumnBuilder<arrow::BooleanBuilder, int8_t>(
1678 column_builder, values, is_valid);
1680 }
else if (column_builder.col_type.get_subtype() ==
kTINYINT) {
1681 appendToListColumnBuilder<arrow::Int8Builder, int8_t>(
1682 column_builder, values, is_valid);
1684 }
else if (column_builder.col_type.get_subtype() ==
kSMALLINT) {
1685 appendToListColumnBuilder<arrow::Int16Builder, int16_t>(
1686 column_builder, values, is_valid);
1688 }
else if (column_builder.col_type.get_subtype() ==
kINT) {
1689 appendToListColumnBuilder<arrow::Int32Builder, int32_t>(
1690 column_builder, values, is_valid);
1692 }
else if (column_builder.col_type.get_subtype() ==
kBIGINT) {
1693 appendToListColumnBuilder<arrow::Int64Builder, int64_t>(
1694 column_builder, values, is_valid);
1696 }
else if (column_builder.col_type.get_subtype() ==
kFLOAT) {
1697 appendToListColumnBuilder<arrow::FloatBuilder, float>(
1698 column_builder, values, is_valid);
1700 }
else if (column_builder.col_type.get_subtype() ==
kDOUBLE) {
1701 appendToListColumnBuilder<arrow::DoubleBuilder, double>(
1702 column_builder, values, is_valid);
1704 }
else if (column_builder.col_type.is_dict_encoded_type()) {
1705 appendToListColumnBuilder<arrow::StringDictionaryBuilder, int64_t>(
1706 column_builder, values, is_valid);
1709 throw std::runtime_error(column_builder.col_type.get_type_name() +
1710 " is not supported in Arrow result sets.");
1715 appendToColumnBuilder<arrow::StringBuilder, std::string>(
1716 column_builder, values, is_valid);
1720 throw std::runtime_error(column_builder.col_type.get_type_name() +
1721 " is not supported in Arrow result sets.");
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
HOST DEVICE SQLTypes get_subtype() const
ArrowStringRemapMode string_remap_mode
std::vector< std::vector< T >> Vec2
const size_t min_result_size_for_bulk_dictionary_fetch_
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
#define ARROW_THROW_NOT_OK(s)
SQLTypes get_dict_index_type(const SQLTypeInfo &ti)
std::shared_ptr< arrow::StringArray > string_array
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
void appendToListColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
void convert_column(ResultSetPtr result, size_t col, size_t entry_count, std::shared_ptr< arrow::Array > &out)
ArrowResult getArrowResult() const
std::vector< char > sm_handle
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const size_t result_col_idx, const std::shared_ptr< arrow::Field > &field) const
ResultSetBuffer(const uint8_t *buf, size_t size, ResultSetPtr rs)
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
#define ARROW_LOG(category)
HOST DEVICE int get_scale() const
typename std::make_signed< TYPE >::type type
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< arrow::Decimal128 >, std::vector< float >, std::vector< double >, std::vector< std::string >, std::vector< std::vector< int8_t >>, std::vector< std::vector< int16_t >>, std::vector< std::vector< int32_t >>, std::vector< std::vector< int64_t >>, std::vector< std::vector< float >>, std::vector< std::vector< double >>, std::vector< std::vector< std::string >>> ValueArray
std::shared_ptr< arrow::Field > field
std::shared_ptr< ResultSet > ResultSetPtr
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
HOST DEVICE SQLTypes get_type() const
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::pair< key_t, void * > get_shm(size_t shmsz)
ArrowTransport transport_method_
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
future< Result > async(Fn &&fn, Args &&...args)
#define ARROW_RECORDBATCH_MAKE
std::vector< std::string > col_names_
DEVICE auto copy(ARGS &&...args)
ExecutorDeviceType device_type_
boost::optional< std::vector< ScalarTargetValue >> ArrayTargetValue
std::vector< char > df_handle
void remap_string_values(const ArrowResultSetConverter::ColumnBuilder &column_builder, const std::vector< uint8_t > &bitmap, std::vector< int64_t > &vec1d)
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
static int32_t transientIndexToId(unsigned const index)
bool is_dict_encoded_type() const
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryFieldMapper *mapper) const
OUTPUT transform(INPUT const &input, FUNC const &func)
int get_precision() const
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
std::unordered_map< StrId, ArrowStrId > string_remapping
std::shared_ptr< ResultSet > results_
std::string get_type_name() const
typename null_type< TYPE >::type null_type_t
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
void create_or_append_validity(const ArrayTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
#define DEBUG_TIMER(name)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
bool is_dict_encoded_string() const
HOST DEVICE bool get_notnull() const
const shared::StringDictKey & getStringDictKey() const
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
std::shared_ptr< arrow::RecordBatch > convertToArrow() const