OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowResultSetConverter Class Reference

#include <ArrowResultSet.h>

Classes

struct  ColumnBuilder
 
struct  SerializedArrowOutput
 

Public Member Functions

 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n, const ArrowTransport transport_method)
 
 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n, const ArrowTransport transport_method, const size_t min_result_size_for_bulk_dictionary_fetch, const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
 
ArrowResult getArrowResult () const
 
 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n)
 
 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n, const size_t min_result_size_for_bulk_dictionary_fetch, const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
 
std::shared_ptr
< arrow::RecordBatch > 
convertToArrow () const
 

Static Public Attributes

static constexpr size_t default_min_result_size_for_bulk_dictionary_fetch {10000UL}
 
static constexpr double default_max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch {0.1}
 

Private Member Functions

std::shared_ptr
< arrow::RecordBatch > 
getArrowBatch (const std::shared_ptr< arrow::Schema > &schema) const
 
std::shared_ptr< arrow::Field > makeField (const std::string name, const SQLTypeInfo &target_type) const
 
SerializedArrowOutput getSerializedArrowOutput (arrow::ipc::DictionaryFieldMapper *mapper) const
 
void initializeColumnBuilder (ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const size_t result_col_idx, const std::shared_ptr< arrow::Field > &field) const
 
void append (ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
 
std::shared_ptr< arrow::Array > finishColumnBuilder (ColumnBuilder &column_builder) const
 

Private Attributes

std::shared_ptr< ResultSetresults_
 
std::shared_ptr
< Data_Namespace::DataMgr
data_mgr_ = nullptr
 
ExecutorDeviceType device_type_ = ExecutorDeviceType::GPU
 
int32_t device_id_ = 0
 
std::vector< std::string > col_names_
 
int32_t top_n_
 
ArrowTransport transport_method_
 
const size_t min_result_size_for_bulk_dictionary_fetch_
 
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
 

Friends

class ArrowResultSet
 

Detailed Description

Definition at line 228 of file ArrowResultSet.h.

Constructor & Destructor Documentation

ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::shared_ptr< Data_Namespace::DataMgr data_mgr,
const ExecutorDeviceType  device_type,
const int32_t  device_id,
const std::vector< std::string > &  col_names,
const int32_t  first_n,
const ArrowTransport  transport_method 
)
inline

Definition at line 234 of file ArrowResultSet.h.

241  : results_(results)
242  , data_mgr_(data_mgr)
243  , device_type_(device_type)
244  , device_id_(device_id)
245  , col_names_(col_names)
246  , top_n_(first_n)
247  , transport_method_(transport_method)
const size_t min_result_size_for_bulk_dictionary_fetch_
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
ArrowTransport transport_method_
std::vector< std::string > col_names_
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > results_
static constexpr size_t default_min_result_size_for_bulk_dictionary_fetch
static constexpr double default_max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch
ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::shared_ptr< Data_Namespace::DataMgr data_mgr,
const ExecutorDeviceType  device_type,
const int32_t  device_id,
const std::vector< std::string > &  col_names,
const int32_t  first_n,
const ArrowTransport  transport_method,
const size_t  min_result_size_for_bulk_dictionary_fetch,
const double  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch 
)
inline

Definition at line 254 of file ArrowResultSet.h.

264  : results_(results)
265  , data_mgr_(data_mgr)
266  , device_type_(device_type)
267  , device_id_(device_id)
268  , col_names_(col_names)
269  , top_n_(first_n)
270  , transport_method_(transport_method)
272  min_result_size_for_bulk_dictionary_fetch)
274  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch) {}
const size_t min_result_size_for_bulk_dictionary_fetch_
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
ArrowTransport transport_method_
std::vector< std::string > col_names_
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > results_
ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::vector< std::string > &  col_names,
const int32_t  first_n 
)
inline

Definition at line 293 of file ArrowResultSet.h.

ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::vector< std::string > &  col_names,
const int32_t  first_n,
const size_t  min_result_size_for_bulk_dictionary_fetch,
const double  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch 
)
inline

Definition at line 305 of file ArrowResultSet.h.

311  : results_(results)
312  , col_names_(col_names)
313  , top_n_(first_n)
315  min_result_size_for_bulk_dictionary_fetch)
317  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch) {}
const size_t min_result_size_for_bulk_dictionary_fetch_
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
std::vector< std::string > col_names_
std::shared_ptr< ResultSet > results_

Member Function Documentation

void ArrowResultSetConverter::append ( ColumnBuilder column_builder,
const ValueArray values,
const std::shared_ptr< std::vector< bool >> &  is_valid 
) const
private

Definition at line 1618 of file ArrowResultSetConverter.cpp.

References CHECK_EQ, ArrowResultSetConverter::ColumnBuilder::col_type, device_type_, GPU, SQLTypeInfo::is_dict_encoded_string(), kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDECIMAL, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and ArrowResultSetConverter::ColumnBuilder::physical_type.

Referenced by getArrowBatch().

1621  {
1622  if (column_builder.col_type.is_dict_encoded_string()) {
1623  CHECK_EQ(column_builder.physical_type,
1624  kINT); // assume all dicts use none-encoded type for now
1625  appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1626  column_builder, values, is_valid);
1627  return;
1628  }
1629  switch (column_builder.physical_type) {
1630  case kBOOLEAN:
1631  appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1632  column_builder, values, is_valid);
1633  break;
1634  case kTINYINT:
1635  appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1636  break;
1637  case kSMALLINT:
1638  appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1639  column_builder, values, is_valid);
1640  break;
1641  case kINT:
1642  appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1643  column_builder, values, is_valid);
1644  break;
1645  case kBIGINT:
1646  appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1647  column_builder, values, is_valid);
1648  break;
1649  case kDECIMAL:
1650  appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1651  column_builder, values, is_valid);
1652  break;
1653  case kFLOAT:
1654  appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1655  break;
1656  case kDOUBLE:
1657  appendToColumnBuilder<arrow::DoubleBuilder, double>(
1658  column_builder, values, is_valid);
1659  break;
1660  case kTIME:
1661  appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1662  column_builder, values, is_valid);
1663  break;
1664  case kTIMESTAMP:
1665  appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1666  column_builder, values, is_valid);
1667  break;
1668  case kDATE:
1670  ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1671  column_builder, values, is_valid)
1672  : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1673  column_builder, values, is_valid);
1674  break;
1675  case kARRAY:
1676  if (column_builder.col_type.get_subtype() == kBOOLEAN) {
1677  appendToListColumnBuilder<arrow::BooleanBuilder, int8_t>(
1678  column_builder, values, is_valid);
1679  break;
1680  } else if (column_builder.col_type.get_subtype() == kTINYINT) {
1681  appendToListColumnBuilder<arrow::Int8Builder, int8_t>(
1682  column_builder, values, is_valid);
1683  break;
1684  } else if (column_builder.col_type.get_subtype() == kSMALLINT) {
1685  appendToListColumnBuilder<arrow::Int16Builder, int16_t>(
1686  column_builder, values, is_valid);
1687  break;
1688  } else if (column_builder.col_type.get_subtype() == kINT) {
1689  appendToListColumnBuilder<arrow::Int32Builder, int32_t>(
1690  column_builder, values, is_valid);
1691  break;
1692  } else if (column_builder.col_type.get_subtype() == kBIGINT) {
1693  appendToListColumnBuilder<arrow::Int64Builder, int64_t>(
1694  column_builder, values, is_valid);
1695  break;
1696  } else if (column_builder.col_type.get_subtype() == kFLOAT) {
1697  appendToListColumnBuilder<arrow::FloatBuilder, float>(
1698  column_builder, values, is_valid);
1699  break;
1700  } else if (column_builder.col_type.get_subtype() == kDOUBLE) {
1701  appendToListColumnBuilder<arrow::DoubleBuilder, double>(
1702  column_builder, values, is_valid);
1703  break;
1704  } else if (column_builder.col_type.is_dict_encoded_type()) {
1705  appendToListColumnBuilder<arrow::StringDictionaryBuilder, int64_t>(
1706  column_builder, values, is_valid);
1707  break;
1708  } else {
1709  throw std::runtime_error(column_builder.col_type.get_type_name() +
1710  " is not supported in Arrow result sets.");
1711  }
1712  case kCHAR:
1713  case kVARCHAR:
1714  case kTEXT:
1715  appendToColumnBuilder<arrow::StringBuilder, std::string>(
1716  column_builder, values, is_valid);
1717  break;
1718  default:
1719  // TODO(miyu): support more scalar types.
1720  throw std::runtime_error(column_builder.col_type.get_type_name() +
1721  " is not supported in Arrow result sets.");
1722  }
1723 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
Definition: sqltypes.h:76
ExecutorDeviceType device_type_
Definition: sqltypes.h:79
Definition: sqltypes.h:80
Definition: sqltypes.h:68
Definition: sqltypes.h:72

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::convertToArrow ( ) const

Definition at line 715 of file ArrowResultSetConverter.cpp.

References CHECK, col_names_, DEBUG_TIMER, f(), getArrowBatch(), makeField(), results_, and VLOG.

Referenced by getArrowResult(), and getSerializedArrowOutput().

715  {
716  auto timer = DEBUG_TIMER(__func__);
717  const auto col_count = results_->colCount();
718  std::vector<std::shared_ptr<arrow::Field>> fields;
719  CHECK(col_names_.empty() || col_names_.size() == col_count);
720  for (size_t i = 0; i < col_count; ++i) {
721  const auto ti = results_->getColType(i);
722  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
723  }
724 #if ARROW_CONVERTER_DEBUG
725  VLOG(1) << "Arrow fields: ";
726  for (const auto& f : fields) {
727  VLOG(1) << "\t" << f->ToString(true);
728  }
729 #endif
730  return getArrowBatch(arrow::schema(fields));
731 }
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
std::vector< std::string > col_names_
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
std::shared_ptr< ResultSet > results_
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::Array > ArrowResultSetConverter::finishColumnBuilder ( ColumnBuilder column_builder) const
inlineprivate

Definition at line 1386 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, and ArrowResultSetConverter::ColumnBuilder::builder.

Referenced by getArrowBatch().

1387  {
1388  std::shared_ptr<arrow::Array> values;
1389  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1390  return values;
1391 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36

+ Here is the caller graph for this function:

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::getArrowBatch ( const std::shared_ptr< arrow::Schema > &  schema) const
private

Definition at line 733 of file ArrowResultSetConverter.cpp.

References append(), ARROW_RECORDBATCH_MAKE, threading_serial::async(), CHECK, CHECK_EQ, cpu_threads(), anonymous_namespace{ArrowResultSetConverter.cpp}::create_or_append_validity(), DEBUG_TIMER, device_type_, field(), finishColumnBuilder(), GPU, initializeColumnBuilder(), kBIGINT, kBOOLEAN, kDATE, kDECIMAL, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, heavyai::Projection, run_benchmark_import::result, results_, heavyai::TableFunction, and top_n_.

Referenced by convertToArrow().

734  {
735  std::vector<std::shared_ptr<arrow::Array>> result_columns;
736 
737  // First, check if the result set is empty.
738  // If so, we return an arrow result set that only
739  // contains the schema (no record batch will be serialized).
740  if (results_->isEmpty()) {
741  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
742  }
743 
744  const size_t entry_count = top_n_ < 0
745  ? results_->entryCount()
746  : std::min(size_t(top_n_), results_->entryCount());
747 
748  const auto col_count = results_->colCount();
749  size_t row_count = 0;
750 
751  result_columns.resize(col_count);
752  std::vector<ColumnBuilder> builders(col_count);
753 
754  // Create array builders
755  for (size_t i = 0; i < col_count; ++i) {
756  initializeColumnBuilder(builders[i], results_->getColType(i), i, schema->field(i));
757  }
758 
759  // TODO(miyu): speed up for columnar buffers
760  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
761  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
762  const std::vector<bool>& non_lazy_cols,
763  const size_t start_entry,
764  const size_t end_entry) -> size_t {
765  CHECK_EQ(value_seg.size(), col_count);
766  CHECK_EQ(null_bitmap_seg.size(), col_count);
767  const auto local_entry_count = end_entry - start_entry;
768  size_t seg_row_count = 0;
769  for (size_t i = start_entry; i < end_entry; ++i) {
770  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
771  if (row.empty()) {
772  continue;
773  }
774  ++seg_row_count;
775  for (size_t j = 0; j < col_count; ++j) {
776  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
777  continue;
778  }
779 
780  if (auto scalar_value = boost::get<ScalarTargetValue>(&row[j])) {
781  // TODO(miyu): support more types other than scalar.
782  CHECK(scalar_value);
783  const auto& column = builders[j];
784  switch (column.physical_type) {
785  case kBOOLEAN:
786  create_or_append_value<bool, int64_t>(
787  *scalar_value, value_seg[j], local_entry_count);
788  create_or_append_validity<int64_t>(
789  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
790  break;
791  case kTINYINT:
792  create_or_append_value<int8_t, int64_t>(
793  *scalar_value, value_seg[j], local_entry_count);
794  create_or_append_validity<int64_t>(
795  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
796  break;
797  case kSMALLINT:
798  create_or_append_value<int16_t, int64_t>(
799  *scalar_value, value_seg[j], local_entry_count);
800  create_or_append_validity<int64_t>(
801  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
802  break;
803  case kINT:
804  create_or_append_value<int32_t, int64_t>(
805  *scalar_value, value_seg[j], local_entry_count);
806  create_or_append_validity<int64_t>(
807  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
808  break;
809  case kBIGINT:
810  create_or_append_value<int64_t, int64_t>(
811  *scalar_value, value_seg[j], local_entry_count);
812  create_or_append_validity<int64_t>(
813  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
814  break;
815  case kDECIMAL:
816  create_or_append_value<int64_t, int64_t>(
817  *scalar_value, value_seg[j], local_entry_count);
818  create_or_append_validity<int64_t>(
819  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
820  break;
821  case kFLOAT:
822  create_or_append_value<float, float>(
823  *scalar_value, value_seg[j], local_entry_count);
824  create_or_append_validity<float>(
825  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
826  break;
827  case kDOUBLE:
828  create_or_append_value<double, double>(
829  *scalar_value, value_seg[j], local_entry_count);
830  create_or_append_validity<double>(
831  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
832  break;
833  case kTIME:
834  create_or_append_value<int32_t, int64_t>(
835  *scalar_value, value_seg[j], local_entry_count);
836  create_or_append_validity<int64_t>(
837  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
838  break;
839  case kDATE:
841  ? create_or_append_value<int64_t, int64_t>(
842  *scalar_value, value_seg[j], local_entry_count)
843  : create_or_append_value<int32_t, int64_t>(
844  *scalar_value, value_seg[j], local_entry_count);
845  create_or_append_validity<int64_t>(
846  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
847  break;
848  case kTIMESTAMP:
849  create_or_append_value<int64_t, int64_t>(
850  *scalar_value, value_seg[j], local_entry_count);
851  create_or_append_validity<int64_t>(
852  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
853  break;
854  case kTEXT:
855  create_or_append_value<std::string, NullableString>(
856  *scalar_value, value_seg[j], local_entry_count);
857  create_or_append_validity<NullableString>(
858  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
859  break;
860  default:
861  // TODO(miyu): support more scalar types.
862  throw std::runtime_error(column.col_type.get_type_name() +
863  " is not supported in Arrow result sets.");
864  }
865  } else if (auto array = boost::get<ArrayTargetValue>(&row[j])) {
866  // array := Boost::optional<std::vector<ScalarTargetValue>>
867  const auto& column = builders[j];
868  switch (column.col_type.get_subtype()) {
869  case kBOOLEAN:
870  create_or_append_value<int8_t, int64_t>(
871  *array, value_seg[j], local_entry_count);
873  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
874  break;
875  case kTINYINT:
876  create_or_append_value<int8_t, int64_t>(
877  *array, value_seg[j], local_entry_count);
879  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
880  break;
881  case kSMALLINT:
882  create_or_append_value<int16_t, int64_t>(
883  *array, value_seg[j], local_entry_count);
885  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
886  break;
887  case kINT:
888  create_or_append_value<int32_t, int64_t>(
889  *array, value_seg[j], local_entry_count);
891  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
892  break;
893  case kBIGINT:
894  create_or_append_value<int64_t, int64_t>(
895  *array, value_seg[j], local_entry_count);
897  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
898  break;
899  case kFLOAT:
900  create_or_append_value<float, float>(
901  *array, value_seg[j], local_entry_count);
903  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
904  break;
905  case kDOUBLE:
906  create_or_append_value<double, double>(
907  *array, value_seg[j], local_entry_count);
909  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
910  break;
911  case kTEXT:
912  if (column.col_type.is_dict_encoded_type()) {
913  create_or_append_value<int64_t, int64_t>(
914  *array, value_seg[j], local_entry_count);
916  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
917  break;
918  }
919  default:
920  throw std::runtime_error(column.col_type.get_type_name() +
921  " is not supported in Arrow result sets.");
922  }
923  }
924  }
925  }
926  return seg_row_count;
927  };
928 
929  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
930  const std::vector<bool>& non_lazy_cols,
931  const size_t start_col,
932  const size_t end_col) {
933  for (size_t col = start_col; col < end_col; ++col) {
934  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
935  continue;
936  }
937 
938  const auto& column = builders[col];
939  switch (column.physical_type) {
940  case kTINYINT:
941  convert_column<int8_t>(results_, col, entry_count, result[col]);
942  break;
943  case kSMALLINT:
944  convert_column<int16_t>(results_, col, entry_count, result[col]);
945  break;
946  case kINT:
947  convert_column<int32_t>(results_, col, entry_count, result[col]);
948  break;
949  case kBIGINT:
950  convert_column<int64_t>(results_, col, entry_count, result[col]);
951  break;
952  case kFLOAT:
953  convert_column<float>(results_, col, entry_count, result[col]);
954  break;
955  case kDOUBLE:
956  convert_column<double>(results_, col, entry_count, result[col]);
957  break;
958  default:
959  throw std::runtime_error(column.col_type.get_type_name() +
960  " is not supported in Arrow column converter.");
961  }
962  }
963  };
964 
965  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
966  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
967  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
968  // Don't believe we ever output directly from a table function, but this
969  // might be possible with a future query plan optimization
970  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
971  (results_->getQueryMemDesc().getQueryDescriptionType() ==
973  results_->getQueryMemDesc().getQueryDescriptionType() ==
975  entry_count == results_->entryCount();
976  std::vector<bool> non_lazy_cols;
977  if (use_columnar_converter) {
978  auto timer = DEBUG_TIMER("columnar converter");
979  std::vector<size_t> non_lazy_col_pos;
980  size_t non_lazy_col_count = 0;
981  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
982 
983  non_lazy_cols.reserve(col_count);
984  non_lazy_col_pos.reserve(col_count);
985  for (size_t i = 0; i < col_count; ++i) {
986  bool is_lazy =
987  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
988  // Currently column converter cannot handle some data types.
989  // Treat them as lazy.
990  switch (builders[i].physical_type) {
991  case kBOOLEAN:
992  case kTIME:
993  case kDATE:
994  case kTIMESTAMP:
995  is_lazy = true;
996  break;
997  default:
998  break;
999  }
1000  if (builders[i].field->type()->id() == arrow::Type::DICTIONARY) {
1001  is_lazy = true;
1002  }
1003  non_lazy_cols.emplace_back(!is_lazy);
1004  if (!is_lazy) {
1005  ++non_lazy_col_count;
1006  non_lazy_col_pos.emplace_back(i);
1007  }
1008  }
1009 
1010  if (non_lazy_col_count == col_count) {
1011  non_lazy_cols.clear();
1012  non_lazy_col_pos.clear();
1013  } else {
1014  non_lazy_col_pos.emplace_back(col_count);
1015  }
1016 
1017  std::vector<std::future<void>> child_threads;
1018  size_t num_threads =
1019  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
1020 
1021  size_t start_col = 0;
1022  size_t end_col = 0;
1023  for (size_t i = 0; i < num_threads; ++i) {
1024  start_col = end_col;
1025  end_col = (i + 1) * non_lazy_col_count / num_threads;
1026  size_t phys_start_col =
1027  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
1028  size_t phys_end_col =
1029  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
1030  child_threads.push_back(std::async(std::launch::async,
1031  convert_columns,
1032  std::ref(result_columns),
1033  non_lazy_cols,
1034  phys_start_col,
1035  phys_end_col));
1036  }
1037  for (auto& child : child_threads) {
1038  child.get();
1039  }
1040  row_count = entry_count;
1041  }
1042  if (!use_columnar_converter || !non_lazy_cols.empty()) {
1043  auto timer = DEBUG_TIMER("row converter");
1044  row_count = 0;
1045  if (multithreaded) {
1046  const size_t cpu_count = cpu_threads();
1047  std::vector<std::future<size_t>> child_threads;
1048  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
1049  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
1050  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
1051  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
1052  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
1053  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
1054  ++i, start_entry += stride) {
1055  const auto end_entry = std::min(entry_count, start_entry + stride);
1056  child_threads.push_back(std::async(std::launch::async,
1057  fetch,
1058  std::ref(column_value_segs[i]),
1059  std::ref(null_bitmap_segs[i]),
1060  non_lazy_cols,
1061  start_entry,
1062  end_entry));
1063  }
1064  for (auto& child : child_threads) {
1065  row_count += child.get();
1066  }
1067  {
1068  auto timer = DEBUG_TIMER("append rows to arrow");
1069  for (int i = 0; i < schema->num_fields(); ++i) {
1070  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1071  continue;
1072  }
1073 
1074  for (size_t j = 0; j < cpu_count; ++j) {
1075  if (!column_value_segs[j][i]) {
1076  continue;
1077  }
1078  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
1079  }
1080  }
1081  }
1082  } else {
1083  row_count =
1084  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
1085  {
1086  auto timer = DEBUG_TIMER("append rows to arrow single thread");
1087  for (int i = 0; i < schema->num_fields(); ++i) {
1088  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1089  continue;
1090  }
1091 
1092  append(builders[i], *column_values[i], null_bitmaps[i]);
1093  }
1094  }
1095  }
1096 
1097  {
1098  auto timer = DEBUG_TIMER("finish builders");
1099  for (size_t i = 0; i < col_count; ++i) {
1100  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1101  continue;
1102  }
1103 
1104  result_columns[i] = finishColumnBuilder(builders[i]);
1105  }
1106  }
1107  }
1108 
1109  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
1110 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
Definition: sqltypes.h:76
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const size_t result_col_idx, const std::shared_ptr< arrow::Field > &field) const
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
Projection
Definition: enums.h:58
TableFunction
Definition: enums.h:58
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
future< Result > async(Fn &&fn, Args &&...args)
#define ARROW_RECORDBATCH_MAKE
ExecutorDeviceType device_type_
Definition: sqltypes.h:79
Definition: sqltypes.h:80
std::shared_ptr< ResultSet > results_
void create_or_append_validity(const ArrayTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
Definition: sqltypes.h:72
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ArrowResult ArrowResultSetConverter::getArrowResult ( ) const

Serialize an Arrow result to IPC memory. Users are responsible for freeing all CPU IPC buffers using deallocateArrowResultBuffer. GPU buffers will become owned by the caller upon deserialization, and will be automatically freed when they go out of scope.

Definition at line 446 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_LOG, ARROW_THROW_NOT_OK, CHECK, CHECK_GE, convertToArrow(), CPU, DEBUG_TIMER, device_id_, device_type_, arrow::get_and_copy_to_shm(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_shm_buffer(), GPU, SHARED_MEMORY, transport_method_, UNREACHABLE, and WIRE.

446  {
447  auto timer = DEBUG_TIMER(__func__);
448  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
449 
450  struct BuildResultParams {
451  int64_t schemaSize() const {
452  return serialized_schema ? serialized_schema->size() : 0;
453  };
454  int64_t dictSize() const { return serialized_dict ? serialized_dict->size() : 0; };
455  int64_t totalSize() const { return schemaSize() + records_size + dictSize(); }
456  bool hasRecordBatch() const { return records_size > 0; }
457  bool hasDict() const { return dictSize() > 0; }
458 
459  int64_t records_size{0};
460  std::shared_ptr<arrow::Buffer> serialized_schema{nullptr};
461  std::shared_ptr<arrow::Buffer> serialized_dict{nullptr};
462  } result_params;
463 
466  const auto getWireResult = [&]() -> ArrowResult {
467  auto timer = DEBUG_TIMER("serialize batch to wire");
468  const auto total_size = result_params.totalSize();
469  std::vector<char> record_handle_data(total_size);
470  auto serialized_records =
471  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
472 
473  ARROW_ASSIGN_OR_THROW(auto writer, arrow::Buffer::GetWriter(serialized_records));
474 
475  ARROW_THROW_NOT_OK(writer->Write(
476  reinterpret_cast<const uint8_t*>(result_params.serialized_schema->data()),
477  result_params.schemaSize()));
478 
479  if (result_params.hasDict()) {
480  ARROW_THROW_NOT_OK(writer->Write(
481  reinterpret_cast<const uint8_t*>(result_params.serialized_dict->data()),
482  result_params.dictSize()));
483  }
484 
485  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
486  serialized_records, result_params.schemaSize() + result_params.dictSize()));
487 
488  if (result_params.hasRecordBatch()) {
489  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
490  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
491  }
492 
493  return {std::vector<char>(0),
494  0,
495  std::vector<char>(0),
496  serialized_records->size(),
497  std::string{""},
498  std::move(record_handle_data)};
499  };
500 
501  const auto getShmResult = [&]() -> ArrowResult {
502  auto timer = DEBUG_TIMER("serialize batch to shared memory");
503  std::shared_ptr<arrow::Buffer> serialized_records;
504  std::vector<char> schema_handle_buffer;
505  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
506  key_t records_shm_key = IPC_PRIVATE;
507  const int64_t total_size = result_params.totalSize();
508 
509  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
510 
511  memcpy(serialized_records->mutable_data(),
512  result_params.serialized_schema->data(),
513  (size_t)result_params.schemaSize());
514 
515  if (result_params.hasDict()) {
516  memcpy(serialized_records->mutable_data() + result_params.schemaSize(),
517  result_params.serialized_dict->data(),
518  (size_t)result_params.dictSize());
519  }
520 
521  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
522  serialized_records, result_params.schemaSize() + result_params.dictSize()));
523 
524  if (result_params.hasRecordBatch()) {
525  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
526  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
527  }
528 
529  memcpy(&record_handle_buffer[0],
530  reinterpret_cast<const unsigned char*>(&records_shm_key),
531  sizeof(key_t));
532 
533  return {schema_handle_buffer,
534  0,
535  record_handle_buffer,
536  serialized_records->size(),
537  std::string{""}};
538  };
539 
540  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
541  auto options = arrow::ipc::IpcWriteOptions::Defaults();
542  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
543 
544  // If our record batch is going to be empty, we omit it entirely,
545  // only serializing the schema.
546  if (!record_batch->num_rows()) {
547  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
548  arrow::ipc::SerializeSchema(*record_batch->schema(),
549  arrow::default_memory_pool()));
550 
551  switch (transport_method_) {
553  return getWireResult();
555  return getShmResult();
556  default:
557  UNREACHABLE();
558  }
559  }
560 
561  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
562 
563  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
564 
565  for (auto& pair : dictionaries) {
566  arrow::ipc::IpcPayload payload;
567  int64_t dictionary_id = pair.first;
568  const auto& dictionary = pair.second;
569 
571  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
572  int32_t metadata_length = 0;
574  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
575  }
576  result_params.serialized_dict = dict_stream->Finish().ValueOrDie();
577 
578  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
579  arrow::ipc::SerializeSchema(*record_batch->schema(),
580  arrow::default_memory_pool()));
581 
583  arrow::ipc::GetRecordBatchSize(*record_batch, &result_params.records_size));
584 
585  switch (transport_method_) {
587  return getWireResult();
589  return getShmResult();
590  default:
591  UNREACHABLE();
592  }
593  }
594 #ifdef HAVE_CUDA
596 
597  // Copy the schema to the schema handle
598  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
599  ARROW_THROW_NOT_OK(out_stream_result.status());
600  auto out_stream = std::move(out_stream_result).ValueOrDie();
601 
602  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
603  arrow::ipc::DictionaryMemo current_memo;
604  arrow::ipc::DictionaryMemo serialized_memo;
605 
606  arrow::ipc::IpcPayload schema_payload;
607  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
608  arrow::ipc::IpcWriteOptions::Defaults(),
609  mapper,
610  &schema_payload));
611  int32_t schema_payload_length = 0;
612  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
613  arrow::ipc::IpcWriteOptions::Defaults(),
614  out_stream.get(),
615  &schema_payload_length));
616  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
617  ARROW_LOG("GPU") << "Dictionary "
618  << "found dicts: " << dictionaries.size();
619 
621  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
622 
623  // now try a dictionary
624  std::shared_ptr<arrow::Schema> dummy_schema;
625  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
626 
627  for (const auto& pair : dictionaries) {
628  arrow::ipc::IpcPayload payload;
629  const auto& dict_id = pair.first;
630  CHECK_GE(dict_id, 0);
631  ARROW_LOG("GPU") << "Dictionary "
632  << "dict_id: " << dict_id;
633  const auto& dict = pair.second;
634  CHECK(dict);
635 
636  if (!dummy_schema) {
637  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
638  dummy_schema = std::make_shared<arrow::Schema>(
639  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
640  }
641  dict_batches.emplace_back(
642  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
643  }
644 
645  if (!dict_batches.empty()) {
646  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
647  dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
648  }
649 
650  auto complete_ipc_stream = out_stream->Finish();
651  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
652  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
653 
654  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
655  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
656  memcpy(&schema_record_key_buffer[0],
657  reinterpret_cast<const unsigned char*>(&record_key),
658  sizeof(key_t));
659 
660  arrow::cuda::CudaDeviceManager* manager;
661  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
662  std::shared_ptr<arrow::cuda::CudaContext> context;
663  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
664 
665  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
666  ARROW_ASSIGN_OR_THROW(device_serialized,
667  SerializeRecordBatch(*record_batch, context.get()));
668 
669  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
670  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
671 
672  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
673  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
674  cuda_handle->Serialize(arrow::default_memory_pool()));
675 
676  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
677  memcpy(&record_handle_buffer[0],
678  serialized_cuda_handle->data(),
679  serialized_cuda_handle->size());
680 
681  return {schema_record_key_buffer,
682  serialized_records->size(),
683  record_handle_buffer,
684  serialized_cuda_handle->size(),
685  serialized_cuda_handle->ToString()};
686 #else
687  UNREACHABLE();
688  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
689 #endif
690 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
#define UNREACHABLE()
Definition: Logger.h:338
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
ArrowTransport transport_method_
ExecutorDeviceType device_type_
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::shared_ptr< arrow::RecordBatch > convertToArrow() const

+ Here is the call graph for this function:

ArrowResultSetConverter::SerializedArrowOutput ArrowResultSetConverter::getSerializedArrowOutput ( arrow::ipc::DictionaryFieldMapper *  mapper) const
private

Definition at line 693 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_THROW_NOT_OK, convertToArrow(), and DEBUG_TIMER.

694  {
695  auto timer = DEBUG_TIMER(__func__);
696  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
697  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
698 
700  serialized_schema,
701  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
702 
703  if (arrow_copy->num_rows()) {
704  auto timer = DEBUG_TIMER("serialize records");
705  ARROW_THROW_NOT_OK(arrow_copy->Validate());
706  ARROW_ASSIGN_OR_THROW(serialized_records,
707  arrow::ipc::SerializeRecordBatch(
708  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
709  } else {
710  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
711  }
712  return {serialized_schema, serialized_records};
713 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::shared_ptr< arrow::RecordBatch > convertToArrow() const

+ Here is the call graph for this function:

void ArrowResultSetConverter::initializeColumnBuilder ( ColumnBuilder column_builder,
const SQLTypeInfo col_type,
const size_t  result_col_idx,
const std::shared_ptr< arrow::Field > &  field 
) const
private

Definition at line 1252 of file ArrowResultSetConverter.cpp.

References ALL_STRINGS_REMAPPED, ARROW_THROW_NOT_OK, ArrowResultSetConverter::ColumnBuilder::builder, CHECK, CHECK_EQ, CHECK_GT, ArrowResultSetConverter::ColumnBuilder::col_type, DEBUG_TIMER, field(), ArrowResultSetConverter::ColumnBuilder::field, anonymous_namespace{ArrowResultSetConverter.cpp}::get_dict_index_type(), foreign_storage::get_physical_type(), SQLTypeInfo::getStringDictKey(), SQLTypeInfo::is_array(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_dict_encoded_type(), max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_, min_result_size_for_bulk_dictionary_fetch_, ONLY_TRANSIENT_STRINGS_REMAPPED, ArrowResultSetConverter::ColumnBuilder::physical_type, results_, ArrowResultSetConverter::ColumnBuilder::string_array, ArrowResultSetConverter::ColumnBuilder::string_remap_mode, ArrowResultSetConverter::ColumnBuilder::string_remapping, StringDictionaryProxy::transientIndexToId(), and VLOG.

Referenced by getArrowBatch().

1256  {
1257  column_builder.field = field;
1258  column_builder.col_type = col_type;
1259  column_builder.physical_type = col_type.is_dict_encoded_string()
1260  ? get_dict_index_type(col_type)
1261  : get_physical_type(col_type);
1262 
1263  auto value_type = field->type();
1264  if (col_type.is_dict_encoded_type()) {
1265  auto timer = DEBUG_TIMER("Translate string dictionary to Arrow dictionary");
1266  if (!col_type.is_array()) {
1267  column_builder.builder.reset(new arrow::StringDictionary32Builder());
1268  }
1269  // add values to the builder
1270  const auto& dict_key = col_type.getStringDictKey();
1271 
1272  // ResultSet::rowCount(), unlike ResultSet::entryCount(), will return
1273  // the actual number of rows in the result set, taking into account
1274  // things like any limit and offset set
1275  const size_t result_set_rows = results_->rowCount();
1276  // result_set_rows guaranteed > 0 by parent
1277  CHECK_GT(result_set_rows, 0UL);
1278 
1279  const auto sdp = results_->getStringDictionaryProxy(dict_key);
1280  const size_t dictionary_proxy_entries = sdp->entryCount();
1281  const double dictionary_to_result_size_ratio =
1282  static_cast<double>(dictionary_proxy_entries) / result_set_rows;
1283 
1284  // We are conservative with when we do a bulk dictionary fetch,
1285  // even though it is generally more efficient than dictionary unique value "plucking",
1286  // for the following reasons:
1287  // 1) The number of actual distinct dictionary values can be much lower than the
1288  // number of result rows, but without getting the expression range (and that would
1289  // only work in some cases), we don't know by how much
1290  // 2) Regardless of the effect of #1, the size of the dictionary generated via
1291  // the "pluck" method will always be at worst equal in size, and very likely
1292  // significantly smaller, than the dictionary created by the bulk dictionary
1293  // fetch method, and smaller Arrow dictionaries are always a win when it comes to
1294  // sending the Arrow results over the wire, and for lowering the processing load
1295  // for clients (which often is a web browser with a lot less compute and memory
1296  // resources than our server.)
1297 
1298  const bool do_dictionary_bulk_fetch =
1299  result_set_rows > min_result_size_for_bulk_dictionary_fetch_ &&
1300  dictionary_to_result_size_ratio <=
1302 
1303  arrow::StringBuilder str_array_builder;
1304 
1305  if (do_dictionary_bulk_fetch) {
1306  VLOG(1) << "Arrow dictionary creation: bulk copying all dictionary "
1307  << " entries for column at offset " << results_col_slot_idx << ". "
1308  << "Column has " << dictionary_proxy_entries << " string entries"
1309  << " for a result set with " << result_set_rows << " rows.";
1310  column_builder.string_remap_mode =
1312  const auto str_list = results_->getStringDictionaryPayloadCopy(dict_key);
1313  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(str_list));
1314 
1315  // When we fetch the bulk dictionary, we need to also fetch
1316  // the transient entries only contained in the proxy.
1317  // These values are always negative (starting at -2), and so need
1318  // to be remapped to point to the corresponding entries in the Arrow
1319  // dictionary (they are placed at the end after the materialized
1320  // string entries from StringDictionary)
1321 
1322  int32_t crt_transient_id = static_cast<int32_t>(str_list.size());
1323  auto const& transient_vecmap = sdp->getTransientVector();
1324  for (unsigned index = 0; index < transient_vecmap.size(); ++index) {
1325  ARROW_THROW_NOT_OK(str_array_builder.Append(*transient_vecmap[index]));
1326  auto const old_id = StringDictionaryProxy::transientIndexToId(index);
1327  CHECK(column_builder.string_remapping
1328  .insert(std::make_pair(old_id, crt_transient_id++))
1329  .second);
1330  }
1331  } else {
1332  // Pluck unique dictionary values from ResultSet column
1333  VLOG(1) << "Arrow dictionary creation: serializing unique result set dictionary "
1334  << " entries for column at offset " << results_col_slot_idx << ". "
1335  << "Column has " << dictionary_proxy_entries << " string entries"
1336  << " for a result set with " << result_set_rows << " rows.";
1337  column_builder.string_remap_mode = ArrowStringRemapMode::ALL_STRINGS_REMAPPED;
1338 
1339  // ResultSet::getUniqueStringsForDictEncodedTargetCol returns a pair of two vectors,
1340  // the first of int32_t values containing the unique string ids found for
1341  // results_col_slot_idx in the result set, the second containing the associated
1342  // unique strings. Note that the unique string for a unique string id are both
1343  // placed at the same offset in their respective vectors
1344 
1345  auto unique_ids_and_strings =
1346  results_->getUniqueStringsForDictEncodedTargetCol(results_col_slot_idx);
1347  const auto& unique_ids = unique_ids_and_strings.first;
1348  const auto& unique_strings = unique_ids_and_strings.second;
1349  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(unique_strings));
1350  const int32_t num_unique_strings = unique_strings.size();
1351  CHECK_EQ(num_unique_strings, unique_ids.size());
1352  // We need to remap ALL string id values given the Arrow dictionary
1353  // will have "holes", i.e. it is a sparse representation of the underlying
1354  // StringDictionary
1355  for (int32_t unique_string_idx = 0; unique_string_idx < num_unique_strings;
1356  ++unique_string_idx) {
1357  CHECK(
1358  column_builder.string_remapping
1359  .insert(std::make_pair(unique_ids[unique_string_idx], unique_string_idx))
1360  .second);
1361  }
1362  // Note we don't need to get transients from proxy as they are already handled in
1363  // ResultSet::getUniqueStringsForDictEncodedTargetCol
1364  }
1365 
1366  std::shared_ptr<arrow::StringArray> string_array;
1367  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1368 
1369  if (col_type.is_array()) {
1370  column_builder.string_array = std::move(string_array);
1371  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1372  arrow::default_memory_pool(), value_type, &column_builder.builder));
1373  } else {
1374  auto dict_builder =
1375  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1376  CHECK(dict_builder);
1377 
1378  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1379  }
1380  } else {
1381  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1382  arrow::default_memory_pool(), value_type, &column_builder.builder));
1383  }
1384 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
const size_t min_result_size_for_bulk_dictionary_fetch_
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
#define CHECK_GT(x, y)
Definition: Logger.h:305
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
static int32_t transientIndexToId(unsigned const index)
bool is_dict_encoded_type() const
Definition: sqltypes.h:655
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool is_dict_encoded_string() const
Definition: sqltypes.h:643
bool is_array() const
Definition: sqltypes.h:585
#define VLOG(n)
Definition: Logger.h:388
const shared::StringDictKey & getStringDictKey() const
Definition: sqltypes.h:1057

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::Field > ArrowResultSetConverter::makeField ( const std::string  name,
const SQLTypeInfo target_type 
) const
private

Definition at line 1205 of file ArrowResultSetConverter.cpp.

References device_type_, field(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_arrow_type(), and SQLTypeInfo::get_notnull().

Referenced by convertToArrow().

1207  {
1208  return arrow::field(
1209  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
1210 }
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
ExecutorDeviceType device_type_
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
string name
Definition: setup.in.py:72
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class ArrowResultSet
friend

Definition at line 356 of file ArrowResultSet.h.

Member Data Documentation

std::vector<std::string> ArrowResultSetConverter::col_names_
private

Definition at line 351 of file ArrowResultSet.h.

Referenced by convertToArrow().

std::shared_ptr<Data_Namespace::DataMgr> ArrowResultSetConverter::data_mgr_ = nullptr
private

Definition at line 348 of file ArrowResultSet.h.

constexpr double ArrowResultSetConverter::default_max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch {0.1}
static

Definition at line 232 of file ArrowResultSet.h.

constexpr size_t ArrowResultSetConverter::default_min_result_size_for_bulk_dictionary_fetch {10000UL}
static

Definition at line 230 of file ArrowResultSet.h.

Referenced by ArrowResultSet::resultSetArrowLoopback().

int32_t ArrowResultSetConverter::device_id_ = 0
private

Definition at line 350 of file ArrowResultSet.h.

Referenced by getArrowResult().

ExecutorDeviceType ArrowResultSetConverter::device_type_ = ExecutorDeviceType::GPU
private

Definition at line 349 of file ArrowResultSet.h.

Referenced by append(), getArrowBatch(), getArrowResult(), and makeField().

const double ArrowResultSetConverter::max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
private

Definition at line 355 of file ArrowResultSet.h.

Referenced by initializeColumnBuilder().

const size_t ArrowResultSetConverter::min_result_size_for_bulk_dictionary_fetch_
private

Definition at line 354 of file ArrowResultSet.h.

Referenced by initializeColumnBuilder().

std::shared_ptr<ResultSet> ArrowResultSetConverter::results_
private

Definition at line 347 of file ArrowResultSet.h.

Referenced by convertToArrow(), getArrowBatch(), and initializeColumnBuilder().

int32_t ArrowResultSetConverter::top_n_
private

Definition at line 352 of file ArrowResultSet.h.

Referenced by getArrowBatch().

ArrowTransport ArrowResultSetConverter::transport_method_
private

Definition at line 353 of file ArrowResultSet.h.

Referenced by getArrowResult().


The documentation for this class was generated from the following files: