17 #ifndef ARROW_IMPORTER_H
18 #define ARROW_IMPORTER_H
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <boost/algorithm/string.hpp>
28 #include <boost/variant.hpp>
38 using std::runtime_error::runtime_error;
41 template <
typename T = ArrowImporterException>
45 static std::mutex mtx;
46 std::unique_lock<std::mutex> lock(mtx);
52 #ifdef ENABLE_IMPORT_PARQUET
53 #include <parquet/api/reader.h>
54 #include <parquet/api/writer.h>
55 #include <parquet/arrow/reader.h>
56 #include <parquet/exception.h>
57 #endif // ENABLE_IMPORT_PARQUET
59 #include "arrow/util/decimal.h"
64 boost::variant<bool, float, double, int64_t, std::string, void*, arrow::Decimal128>;
70 typename std::enable_if_t<std::is_integral<T>::value && !std::is_same<T, bool>::value,
75 #define exprtype(expr) std::decay_t<decltype(expr)>
79 return bad_rows_tracker ?
"File " + bad_rows_tracker->
file_name +
", row-group " +
81 (cd ?
", column " + cd->
columnName +
": " :
"")
85 template <
typename SrcType,
typename DstType>
87 using ArrayType =
typename arrow::TypeTraits<SrcType>::ArrayType;
88 return (DstType)
static_cast<const ArrayType&
>(array).
Value(idx);
91 template <
typename SrcType>
93 using ArrayType =
typename arrow::TypeTraits<SrcType>::ArrayType;
94 return static_cast<const ArrayType&
>(array).GetString(idx);
97 #define NUMERIC_CASE(tid, src_type, var_type) \
98 case arrow::Type::tid: \
99 return get_numeric_value<src_type, var_type>;
100 #define STRING_CASE(tid, src_type) \
101 case arrow::Type::tid: \
102 return get_string_value<src_type>;
107 switch (array.type_id()) {
124 NUMERIC_CASE(DECIMAL, arrow::Decimal128Type, arrow::Decimal128)
130 array.type()->name() +
" is not supported");
140 "Invalid type conversion from parquet " + pt +
" type to " +
144 template <
typename DATA_TYPE,
typename VALUE_TYPE>
150 "Invalid data conversion from parquet value " +
std::to_string(v) +
159 "Invalid data conversion from parquet string '" + v +
"' to " +
175 const arrow::Array& array,
179 , bad_rows_tracker(bad_rows_tracker)
180 , arrow_type(*array.
type())
181 , arrow_decimal_scale(
182 arrow_type.id() == arrow::
Type::DECIMAL
183 ? static_cast<const arrow::Decimal128Type&>(arrow_type).scale()
185 , old_type(cd->columnType.get_type(),
186 cd->columnType.get_dimension(),
189 , new_type(cd->columnType.get_type(),
190 cd->columnType.get_dimension(),
195 template <
typename DATA_TYPE>
199 const arrow::Array& array,
200 std::vector<DATA_TYPE>& buffer,
210 static const std::map<std::pair<int32_t, arrow::TimeUnit::type>,
211 std::pair<SQLOps, int64_t>>
213 {{0, arrow::TimeUnit::MILLI}, {
kDIVIDE, kMillisecondsInSecond}},
214 {{0, arrow::TimeUnit::MICRO}, {
kDIVIDE, kMicrosecondsInSecond}},
215 {{0, arrow::TimeUnit::NANO}, {
kDIVIDE, kNanosecondsinSecond}},
216 {{3, arrow::TimeUnit::SECOND}, {
kMULTIPLY, kMicrosecondsInSecond}},
217 {{3, arrow::TimeUnit::MICRO}, {
kDIVIDE, kMillisecondsInSecond}},
218 {{3, arrow::TimeUnit::NANO}, {
kDIVIDE, kMicrosecondsInSecond}},
219 {{6, arrow::TimeUnit::SECOND}, {
kMULTIPLY, kMicrosecondsInSecond}},
220 {{6, arrow::TimeUnit::MILLI}, {
kMULTIPLY, kMillisecondsInSecond}},
221 {{6, arrow::TimeUnit::NANO}, {
kDIVIDE, kMillisecondsInSecond}},
222 {{9, arrow::TimeUnit::SECOND}, {
kMULTIPLY, kNanosecondsinSecond}},
223 {{9, arrow::TimeUnit::MILLI}, {
kMULTIPLY, kMicrosecondsInSecond}},
224 {{9, arrow::TimeUnit::MICRO}, {
kMULTIPLY, kMillisecondsInSecond}}};
227 template <
typename VALUE_TYPE>
235 , dimension(data.cd->columnType.is_high_precision_timestamp()
236 ? data.cd->columnType.get_dimension()
238 template <bool enabled = std::is_integral<VALUE_TYPE>::value>
239 int64_t
resolve_time(
const VALUE_TYPE& v, std::enable_if_t<enabled>* = 0)
const {
240 const auto& type_id = data.arrow_type.id();
241 if (type_id == arrow::Type::DATE32 || type_id == arrow::Type::DATE64) {
242 auto& date_type =
static_cast<const arrow::DateType&
>(data.arrow_type);
243 switch (date_type.unit()) {
244 case arrow::DateUnit::DAY:
246 case arrow::DateUnit::MILLI:
249 }
else if (type_id == arrow::Type::TIME32 || type_id == arrow::Type::TIME64 ||
250 type_id == arrow::Type::TIMESTAMP) {
251 auto& time_type =
static_cast<const arrow::TimeType&
>(data.arrow_type);
255 const auto scale =
result->second;
256 return scale.first ==
kMULTIPLY ? v * scale.second : v / scale.second;
261 UNREACHABLE() << data.arrow_type <<
" is not a valid Arrow time or date type";
264 template <bool enabled = std::is_integral<VALUE_TYPE>::value>
265 int64_t
resolve_time(
const VALUE_TYPE& v, std::enable_if_t<!enabled>* = 0)
const {
266 static_assert(enabled,
"unreachable");
271 template <
typename VALUE_TYPE>
280 template <
typename DATA_TYPE>
281 explicit operator DATA_TYPE()
const {
282 if constexpr (std::is_integral<DATA_TYPE>::value) {
284 }
else if constexpr (std::is_floating_point<DATA_TYPE>::value) {
286 }
else if constexpr (std::is_same<DATA_TYPE, std::string>::value) {
287 return std::string();
298 template <
typename DATA_TYPE>
299 explicit operator DATA_TYPE()
const {
300 if constexpr (std::is_integral<DATA_TYPE>::value) {
301 if (!(data.cd->columnType.is_number() || data.cd->columnType.is_boolean())) {
305 }
else if constexpr (std::is_floating_point<DATA_TYPE>::value) {
307 }
else if constexpr (std::is_same<DATA_TYPE, std::string>::value) {
308 return v ?
"T" :
"F";
319 template <
typename DATA_TYPE>
320 explicit operator DATA_TYPE()
const {
321 if constexpr (std::is_integral<DATA_TYPE>::value) {
322 const auto ti = data.cd->columnType;
323 DATA_TYPE v = ti.is_decimal() ? this->v * pow(10, ti.get_scale()) : this->v;
324 if (!(std::numeric_limits<DATA_TYPE>::lowest() < v &&
325 v <= std::numeric_limits<DATA_TYPE>::max())) {
326 data_conversion_error<DATA_TYPE>(v, data.cd, data.bad_rows_tracker);
329 }
else if constexpr (std::is_floating_point<DATA_TYPE>::value) {
331 }
else if constexpr (std::is_same<DATA_TYPE, std::string>::value) {
343 template <
typename DATA_TYPE>
344 explicit operator DATA_TYPE()
const {
345 if constexpr (std::is_integral<DATA_TYPE>::value) {
346 const auto ti = data.cd->columnType;
347 DATA_TYPE v = ti.is_decimal() ? this->v * pow(10, ti.get_scale()) : this->v;
348 if (!(std::numeric_limits<DATA_TYPE>::lowest() < v &&
349 v <= std::numeric_limits<DATA_TYPE>::max())) {
350 data_conversion_error<DATA_TYPE>(v, data.cd, data.bad_rows_tracker);
353 }
else if constexpr (std::is_floating_point<DATA_TYPE>::value) {
354 if (std::is_same<DATA_TYPE, float>::value) {
355 if (!(std::numeric_limits<float>::lowest() < v &&
356 v <= std::numeric_limits<float>::max())) {
357 data_conversion_error<float>(v, data.cd, data.bad_rows_tracker);
361 }
else if constexpr (std::is_same<DATA_TYPE, std::string>::value) {
373 template <
typename DATA_TYPE>
374 explicit operator DATA_TYPE()
const {
375 if constexpr (std::is_integral<DATA_TYPE>::value) {
377 if (std::is_same<int64_t, DATA_TYPE>::value) {
378 }
else if (std::numeric_limits<DATA_TYPE>::lowest() < v &&
379 v <= std::numeric_limits<DATA_TYPE>::max()) {
381 data_conversion_error<DATA_TYPE>(v, data.cd, data.bad_rows_tracker);
383 if (data.cd->columnType.is_time()) {
384 v = this->resolve_time(v);
387 }
else if constexpr (std::is_floating_point<DATA_TYPE>::value) {
389 }
else if constexpr (std::is_same<DATA_TYPE, std::string>::value) {
390 const auto& type_id = data.arrow_type.id();
391 if (type_id == arrow::Type::DATE32 || type_id == arrow::Type::DATE64) {
392 auto& date_type =
static_cast<const arrow::DateType&
>(data.arrow_type);
396 date_type.unit() == arrow::DateUnit::MILLI ? v / kMicrosecondsInSecond : v;
398 }
else if (type_id == arrow::Type::TIME32 || type_id == arrow::Type::TIME64 ||
399 type_id == arrow::Type::TIMESTAMP) {
400 auto& time_type =
static_cast<const arrow::TimeType&
>(data.arrow_type);
405 divisor =
result->second.second;
411 if (divisor != 1 && v % divisor) {
427 template <
typename DATA_TYPE>
428 explicit operator DATA_TYPE()
const {
429 if constexpr (std::is_same<DATA_TYPE, bool>::value) {
431 return inline_int_null_value<int8_t>();
436 return datum.boolval;
441 }
else if constexpr (std::is_integral<DATA_TYPE>::value) {
446 auto ti = data.cd->columnType;
448 return datum.bigintval;
453 }
else if constexpr (std::is_floating_point<DATA_TYPE>::value) {
454 return atof(v.data());
455 }
else if constexpr (std::is_same<DATA_TYPE, std::string>::value) {
469 "Truncation error on Arrow Decimal128 value");
472 template <
typename DATA_TYPE>
473 explicit operator DATA_TYPE()
const {
474 if constexpr (std::is_integral<DATA_TYPE>::value) {
475 int64_t v =
static_cast<int64_t
>(this->v);
476 if (data.cd->columnType.is_decimal()) {
479 if (data.arrow_decimal_scale) {
480 v = std::llround(v / pow(10, data.arrow_decimal_scale));
482 if (std::is_same<int64_t, DATA_TYPE>::value) {
483 }
else if (std::numeric_limits<DATA_TYPE>::lowest() < v &&
484 v <= std::numeric_limits<DATA_TYPE>::max()) {
486 data_conversion_error<DATA_TYPE>(v, data.cd, data.bad_rows_tracker);
489 }
else if constexpr (std::is_floating_point<DATA_TYPE>::value) {
490 int64_t v =
static_cast<int64_t
>(this->v);
491 return data.arrow_decimal_scale ? v / pow(10, data.arrow_decimal_scale) : v;
492 }
else if constexpr (std::is_same<DATA_TYPE, std::string>::value) {
493 return v.ToString(data.arrow_decimal_scale);
499 template <
typename DATA_TYPE>
500 inline auto& operator<<(DataBuffer<DATA_TYPE>& data,
const VarValue& var) {
501 boost::apply_visitor(
502 [&data](
const auto& v) {
510 #endif // ARROW_IMPORTER_H
constexpr int64_t kMillisecondsInSecond
void data_conversion_error(const VALUE_TYPE v, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
constexpr int64_t kMicrosecondsInSecond
boost::variant< bool, float, double, int64_t, std::string, void *, arrow::Decimal128 > VarValue
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
const DataBufferBase & data
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
const arrow::Array & array
DataBufferBase(const ColumnDescriptor *cd, const arrow::Array &array, import_export::BadRowsTracker *const bad_rows_tracker)
auto value_getter(const arrow::Array &array, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
DEVICE double get_scale(const double domain_min, const double domain_max, const int32_t num_bins)
Constants for Builtin SQL Types supported by HEAVY.AI.
const int arrow_decimal_scale
#define NUMERIC_CASE(tid, src_type, var_type)
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
static const std::map< std::pair< int32_t, arrow::TimeUnit::type >, std::pair< SQLOps, int64_t > > _precision_scale_lookup
const SQLTypeInfo new_type
typename std::enable_if_t< std::is_floating_point< T >::value, T > enable_if_floating
void type_conversion_error(const std::string pt, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
import_export::BadRowsTracker *const bad_rows_tracker
DataBuffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, import_export::BadRowsTracker *const bad_rows_tracker)
Datum StringToDatum(const std::string_view s, SQLTypeInfo &ti)
specifies the content in-memory of a row in the column metadata table
std::vector< DATA_TYPE > & buffer
bool g_enable_smem_group_by true
VarValue get_string_value(const arrow::Array &array, const int64_t idx)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
constexpr int32_t kSecondsInDay
int64_t convert_decimal_value_to_scale(const int64_t decimal_value, const SQLTypeInfo &type_info, const SQLTypeInfo &new_type_info)
int64_t resolve_time(const VALUE_TYPE &v, std::enable_if_t< enabled > *=0) const
std::string get_type_name() const
typename std::enable_if_t< std::is_integral< T >::value, T > enable_if_integral
#define STRING_CASE(tid, src_type)
int64_t resolve_time(const VALUE_TYPE &v, std::enable_if_t<!enabled > *=0) const
const arrow::DataType & arrow_type
VarValue get_numeric_value(const arrow::Array &array, const int64_t idx)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
typename std::enable_if_t< std::is_integral< T >::value &&!std::is_same< T, bool >::value, T > enable_if_integral_not_bool
const SQLTypeInfo old_type
std::string error_context(const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
constexpr int64_t kNanosecondsinSecond
const ColumnDescriptor * cd
ArrowValueBase(const DataBufferBase &data, const VALUE_TYPE &v)
void arrow_throw_if(const bool cond, const std::string &message)
arrow::Decimal128 VALUE_TYPE