25 #include <arrow/api.h>
26 #include <arrow/filesystem/localfs.h>
27 #include <arrow/io/api.h>
29 #include <ogrsf_frmts.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/geometry.hpp>
34 #include <boost/variant.hpp>
49 #include <unordered_map>
50 #include <unordered_set>
58 #ifdef ENABLE_IMPORT_PARQUET
61 #if defined(ENABLE_IMPORT_PARQUET)
65 #ifdef ENABLE_IMPORT_PARQUET
93 #include "gen-cpp/Heavy.h"
100 #define TIMER_STOP(t) \
101 (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
116 boost::filesystem::path boost_file_path{file_path};
119 return ec ? 0 : filesize;
127 executor->getSessionLock());
128 return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
137 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
139 for (
size_t i = 0; i < row.size(); ++i) {
140 out << (i ?
", " :
"") << row[i];
148 namespace import_export {
154 #define DEBUG_TIMING false
155 #define DEBUG_RENDER_GROUP_ANALYZER 0
156 #define DEBUG_AWS_AUTHENTICATION 0
158 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
174 const std::string&
f,
188 auto is_array = std::unique_ptr<bool[]>(
new bool[
loader->get_column_descs().size()]);
190 bool has_array =
false;
192 int skip_physical_cols = 0;
193 for (
auto& p :
loader->get_column_descs()) {
195 if (skip_physical_cols-- > 0) {
200 if (p->isVirtualCol || p->isDeletedCol) {
203 skip_physical_cols = p->columnType.get_physical_cols();
204 if (p->columnType.get_type() ==
kARRAY) {
205 is_array.get()[i] =
true;
208 is_array.get()[i] =
false;
213 is_array_a = std::unique_ptr<bool[]>(is_array.release());
215 is_array_a = std::unique_ptr<bool[]>(
nullptr);
223 if (
buffer[0] !=
nullptr) {
226 if (
buffer[1] !=
nullptr) {
235 throw std::runtime_error(
"Import status not found for id: " + import_id);
242 is.
end = std::chrono::steady_clock::now();
243 is.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.
end - is.
start);
250 while (i < j && (field[i] ==
' ' || field[i] ==
'\r')) {
253 while (i < j && (field[j - 1] ==
' ' || field[j - 1] ==
'\r')) {
256 return std::string(field + i, j - i);
309 throw std::runtime_error(
"Internal error: geometry type in NullArrayDatum.");
311 throw std::runtime_error(
"Internal error: invalid type in NullArrayDatum.");
320 if (s == copy_params.
null_str || s ==
"NULL" || s.empty()) {
327 std::vector<std::string> elem_strs;
329 for (
size_t i = s.find(copy_params.
array_delim, 1); i != std::string::npos;
331 elem_strs.push_back(s.substr(last, i - last));
334 if (last + 1 <= s.size()) {
335 elem_strs.push_back(s.substr(last, s.size() - 1 - last));
337 if (elem_strs.size() == 1) {
338 auto str = elem_strs.front();
339 auto str_trimmed =
trim_space(str.c_str(), str.length());
340 if (str_trimmed ==
"") {
345 size_t len = elem_strs.size() * elem_ti.
get_size();
346 std::unique_ptr<int8_t, FreeDeleter> buf(
348 int8_t* p = buf.get();
349 for (
auto& es : elem_strs) {
356 if (!isdigit(e[0]) && e[0] !=
'-') {
384 while ((p - buf) < len) {
388 CHECK((p - buf) == len);
405 const size_t len = compressed_null_coords.size();
407 memcpy(buf, compressed_null_coords.data(), len);
410 auto modified_ti = coords_ti;
416 const auto& arr = datum.val.arr_val;
417 for (
const auto& elem_datum : arr) {
418 string_vec.push_back(elem_datum.val.str_val);
462 throw std::runtime_error(
"Internal error: geometry type in TDatumToDatum.");
464 throw std::runtime_error(
"Internal error: invalid type in TDatumToDatum.");
478 size_t len = datum.val.arr_val.size() * elem_ti.
get_size();
481 for (
auto& e : datum.val.arr_val) {
491 std::vector<std::string_view> string_view_vec;
492 string_view_vec.reserve(string_vec.size());
493 for (
const auto& str : string_vec) {
495 std::ostringstream oss;
497 <<
" a string was detected too long for encoding, string length = "
498 << str.size() <<
", first 100 characters are '" << str.substr(0, 100) <<
"'";
499 throw std::runtime_error(oss.str());
501 string_view_vec.push_back(str);
520 }
catch (std::exception& e) {
521 std::ostringstream oss;
523 <<
" : " << e.what();
525 throw std::runtime_error(oss.str());
530 const std::string_view val,
533 const bool check_not_null) {
539 throw std::runtime_error(
"NULL for column " + cd->
columnName);
550 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
556 throw std::runtime_error(
"NULL for column " + cd->
columnName);
563 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
569 throw std::runtime_error(
"NULL for column " + cd->
columnName);
576 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
582 throw std::runtime_error(
"NULL for column " + cd->
columnName);
589 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
595 throw std::runtime_error(
"NULL for column " + cd->
columnName);
611 throw std::runtime_error(
"NULL for column " + cd->
columnName);
618 if (!is_null && (val[0] ==
'.' || isdigit(val[0]) || val[0] ==
'-')) {
619 addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
622 throw std::runtime_error(
"NULL for column " + cd->
columnName);
628 if (!is_null && (val[0] ==
'.' || isdigit(val[0]) || val[0] ==
'-')) {
629 addDouble(std::atof(std::string(val).c_str()));
632 throw std::runtime_error(
"NULL for column " + cd->
columnName);
643 throw std::runtime_error(
"NULL for column " + cd->
columnName);
648 throw std::runtime_error(
"String too long for column " + cd->
columnName +
659 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
665 throw std::runtime_error(
"NULL for column " + cd->
columnName);
672 throw std::runtime_error(
"NULL for column " + cd->
columnName);
676 std::vector<std::string> string_vec;
679 std::string(val), copy_params, string_vec);
683 size_t expected_size = ti.
get_size() / sti.get_size();
684 size_t actual_size = string_vec.size();
685 if (actual_size != expected_size) {
686 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
688 " values, received " +
703 throw std::runtime_error(
"Fixed length array for column " + cd->
columnName +
704 " has incorrect length: " + std::string(val));
723 CHECK(
false) <<
"TypedImportBuffer::add_value() does not support type " <<
type;
779 CHECK(
false) <<
"TypedImportBuffer::pop_value() does not support type " <<
type;
784 using std::runtime_error::runtime_error;
788 template <
typename DATA_TYPE>
792 std::vector<DATA_TYPE>& buffer,
796 std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
797 auto f_value_getter =
value_getter(array, cd, bad_rows_tracker);
798 std::function<void(const int64_t)> f_add_geo_phy_cols = [&](
const int64_t row) {};
799 if (bad_rows_tracker && cd->columnType.is_geometry()) {
800 f_add_geo_phy_cols = [&](
const int64_t row) {
802 std::vector<double> coords, bounds;
803 std::vector<int> ring_sizes, poly_rings;
810 if (array.IsNull(row)) {
812 import_ti, coords, bounds, ring_sizes, poly_rings);
814 const bool validate_with_geos_if_available =
false;
815 arrow_throw_if<GeoImportException>(
823 validate_with_geos_if_available),
825 arrow_throw_if<GeoImportException>(
826 cd->columnType.get_type() != ti.
get_type(),
827 error_context(cd, bad_rows_tracker) +
"Geometry type mismatch");
829 auto col_idx_workpad =
col_idx;
841 }
catch (std::runtime_error& e) {
843 }
catch (
const std::exception& e) {
850 auto f_mark_a_bad_row = [&](
const auto row) {
851 std::unique_lock<std::mutex> lck(bad_rows_tracker->
mutex);
852 bad_rows_tracker->
rows.insert(row - slice_range.first);
854 buffer.reserve(slice_range.second - slice_range.first);
855 for (
size_t row = slice_range.first; row < slice_range.second; ++row) {
857 *data << (array.IsNull(row) ?
nullptr : f_value_getter(array, row));
858 f_add_geo_phy_cols(row);
860 f_mark_a_bad_row(row);
863 if (bad_rows_tracker) {
865 f_mark_a_bad_row(row);
871 return buffer.size();
876 const bool exact_type_match,
887 if (exact_type_match) {
888 arrow_throw_if(col.type_id() != Type::BOOL,
"Expected boolean type");
893 if (exact_type_match) {
894 arrow_throw_if(col.type_id() != Type::INT8,
"Expected int8 type");
899 if (exact_type_match) {
900 arrow_throw_if(col.type_id() != Type::INT16,
"Expected int16 type");
905 if (exact_type_match) {
906 arrow_throw_if(col.type_id() != Type::INT32,
"Expected int32 type");
909 cd, col, *
int_buffer_, slice_range, bad_rows_tracker);
913 if (exact_type_match) {
914 arrow_throw_if(col.type_id() != Type::INT64,
"Expected int64 type");
919 if (exact_type_match) {
920 arrow_throw_if(col.type_id() != Type::FLOAT,
"Expected float type");
925 if (exact_type_match) {
926 arrow_throw_if(col.type_id() != Type::DOUBLE,
"Expected double type");
933 if (exact_type_match) {
935 "Expected string type");
940 if (exact_type_match) {
941 arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
942 "Expected time32 or time64 type");
947 if (exact_type_match) {
948 arrow_throw_if(col.type_id() != Type::TIMESTAMP,
"Expected timestamp type");
953 if (exact_type_match) {
954 arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
955 "Expected date32 or date64 type");
966 "Expected string type");
970 throw std::runtime_error(
"Arrow array appends not yet supported");
972 throw std::runtime_error(
"Invalid Type");
981 if (
std::any_of(col.nulls.begin(), col.nulls.end(), [](
int i) {
return i != 0; })) {
982 throw std::runtime_error(
"NULL for column " + cd->
columnName);
988 dataSize = col.data.int_col.size();
990 for (
size_t i = 0; i < dataSize; i++) {
1000 dataSize = col.data.int_col.size();
1002 for (
size_t i = 0; i < dataSize; i++) {
1012 dataSize = col.data.int_col.size();
1014 for (
size_t i = 0; i < dataSize; i++) {
1024 dataSize = col.data.int_col.size();
1026 for (
size_t i = 0; i < dataSize; i++) {
1030 int_buffer_->push_back((int32_t)col.data.int_col[i]);
1038 dataSize = col.data.int_col.size();
1040 for (
size_t i = 0; i < dataSize; i++) {
1050 dataSize = col.data.real_col.size();
1052 for (
size_t i = 0; i < dataSize; i++) {
1062 dataSize = col.data.real_col.size();
1064 for (
size_t i = 0; i < dataSize; i++) {
1077 dataSize = col.data.str_col.size();
1079 for (
size_t i = 0; i < dataSize; i++) {
1091 dataSize = col.data.int_col.size();
1093 for (
size_t i = 0; i < dataSize; i++) {
1097 bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1108 dataSize = col.data.str_col.size();
1110 for (
size_t i = 0; i < dataSize; i++) {
1121 dataSize = col.data.arr_col.size();
1123 for (
size_t i = 0; i < dataSize; i++) {
1125 if (!col.nulls[i]) {
1126 size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1127 for (
size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1128 string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1136 for (
size_t i = 0; i < dataSize; i++) {
1140 size_t len = col.data.arr_col[i].data.int_col.size();
1141 size_t byteSize = len *
sizeof(int8_t);
1144 for (
size_t j = 0; j < len; ++j) {
1148 if (col.data.arr_col[i].nulls[j]) {
1149 *p =
static_cast<int8_t
>(
1152 *(
bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1162 for (
size_t i = 0; i < dataSize; i++) {
1166 size_t len = col.data.arr_col[i].data.int_col.size();
1167 size_t byteSize = len *
sizeof(int8_t);
1170 for (
size_t j = 0; j < len; ++j) {
1171 *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1172 p +=
sizeof(int8_t);
1180 for (
size_t i = 0; i < dataSize; i++) {
1184 size_t len = col.data.arr_col[i].data.int_col.size();
1185 size_t byteSize = len *
sizeof(int16_t);
1188 for (
size_t j = 0; j < len; ++j) {
1190 static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1191 p +=
sizeof(int16_t);
1199 for (
size_t i = 0; i < dataSize; i++) {
1203 size_t len = col.data.arr_col[i].data.int_col.size();
1204 size_t byteSize = len *
sizeof(int32_t);
1207 for (
size_t j = 0; j < len; ++j) {
1209 static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1210 p +=
sizeof(int32_t);
1220 for (
size_t i = 0; i < dataSize; i++) {
1224 size_t len = col.data.arr_col[i].data.int_col.size();
1225 size_t byteSize = len *
sizeof(int64_t);
1228 for (
size_t j = 0; j < len; ++j) {
1230 static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1231 p +=
sizeof(int64_t);
1239 for (
size_t i = 0; i < dataSize; i++) {
1243 size_t len = col.data.arr_col[i].data.real_col.size();
1244 size_t byteSize = len *
sizeof(float);
1247 for (
size_t j = 0; j < len; ++j) {
1248 *(
float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1257 for (
size_t i = 0; i < dataSize; i++) {
1261 size_t len = col.data.arr_col[i].data.real_col.size();
1262 size_t byteSize = len *
sizeof(double);
1265 for (
size_t j = 0; j < len; ++j) {
1266 *(
double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1267 p +=
sizeof(double);
1277 for (
size_t i = 0; i < dataSize; i++) {
1281 size_t len = col.data.arr_col[i].data.int_col.size();
1282 size_t byteWidth =
sizeof(int64_t);
1283 size_t byteSize = len * byteWidth;
1286 for (
size_t j = 0; j < len; ++j) {
1287 *
reinterpret_cast<int64_t*
>(p) =
1288 static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1289 p +=
sizeof(int64_t);
1297 throw std::runtime_error(
"Invalid Array Type");
1303 throw std::runtime_error(
"Invalid Type");
1309 const TDatum& datum,
1317 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1330 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1340 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1347 addInt((int32_t)datum.val.int_val);
1350 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1360 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1367 addFloat((
float)datum.val.real_val);
1370 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1380 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1391 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1406 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1414 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1435 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1443 CHECK(
false) <<
"TypedImportBuffer::add_value() does not support type " <<
type;
1510 static_cast<float>(std::atof(std::string(val).c_str())));
1517 double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1528 if (val.length() > ti.get_max_strlen()) {
1529 throw std::runtime_error(
"String too long for column " + cd->
columnName +
1548 std::vector<std::string> string_vec;
1551 std::string(val), cp, string_vec);
1554 if (ti.get_size() > 0) {
1555 auto sti = ti.get_elem_type();
1556 size_t expected_size = ti.get_size() / sti.get_size();
1557 size_t actual_size = string_vec.size();
1558 if (actual_size != expected_size) {
1559 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
1561 " values, received " +
1567 if (ti.get_size() > 0) {
1569 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
1570 " currently cannot accept NULL arrays");
1582 if (ti.get_size() > 0 &&
static_cast<size_t>(ti.get_size()) != d.length) {
1583 throw std::runtime_error(
"Fixed length array for column " + cd->
columnName +
1584 " has incorrect length: " + std::string(val));
1603 CHECK(
false) <<
"TypedImportBuffer::addDefaultValues() does not support type "
1610 std::vector<double>& coords,
1611 std::vector<double>& bounds,
1613 if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1618 if (!pt.transform(ti)) {
1623 coords.push_back(lon);
1624 coords.push_back(lat);
1629 bounds.push_back(coords[0]);
1630 bounds.push_back(coords[1]);
1631 bounds.push_back(coords[0]);
1632 bounds.push_back(coords[1]);
1639 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1641 std::vector<double>& coords,
1642 std::vector<double>& bounds,
1643 std::vector<int>& ring_sizes,
1644 std::vector<int>& poly_rings,
1645 const bool force_null) {
1647 const auto col_type = col_ti.
get_type();
1650 bool is_null_geo =
false;
1652 if (!col_ti.get_notnull()) {
1655 is_null_point =
true;
1658 is_null_geo = coords.empty();
1659 if (is_null_point) {
1664 is_null_geo =
false;
1676 tdd_coords.val.arr_val.reserve(compressed_coords.size());
1677 for (
auto cc : compressed_coords) {
1678 tdd_coords.val.arr_val.emplace_back();
1679 tdd_coords.val.arr_val.back().val.int_val = cc;
1682 tdd_coords.is_null = is_null_geo;
1683 import_buffers[col_idx++]->add_value(cd_coords, tdd_coords,
false);
1688 TDatum tdd_ring_sizes;
1689 tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1691 for (
auto ring_size : ring_sizes) {
1692 tdd_ring_sizes.val.arr_val.emplace_back();
1693 tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1696 tdd_ring_sizes.is_null = is_null_geo;
1697 import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
1703 TDatum tdd_poly_rings;
1704 tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1706 for (
auto num_rings : poly_rings) {
1707 tdd_poly_rings.val.arr_val.emplace_back();
1708 tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1711 tdd_poly_rings.is_null = is_null_geo;
1712 import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings,
false);
1719 tdd_bounds.val.arr_val.reserve(bounds.size());
1721 for (
auto b : bounds) {
1722 tdd_bounds.val.arr_val.emplace_back();
1723 tdd_bounds.val.arr_val.back().val.real_val = b;
1726 tdd_bounds.is_null = is_null_geo;
1727 import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds,
false);
1734 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1736 std::vector<std::vector<double>>& coords_column,
1737 std::vector<std::vector<double>>& bounds_column,
1738 std::vector<std::vector<int>>& ring_sizes_column,
1739 std::vector<std::vector<int>>& poly_rings_column) {
1741 const auto col_type = col_ti.
get_type();
1744 auto coords_row_count = coords_column.size();
1746 for (
auto& coords : coords_column) {
1747 bool is_null_geo =
false;
1749 if (!col_ti.get_notnull()) {
1752 is_null_point =
true;
1755 is_null_geo = coords.empty();
1756 if (is_null_point) {
1761 is_null_geo =
false;
1764 std::vector<TDatum> td_coords_data;
1766 std::vector<uint8_t> compressed_coords =
1768 for (
auto const& cc : compressed_coords) {
1770 td_byte.val.int_val = cc;
1771 td_coords_data.push_back(td_byte);
1775 tdd_coords.val.arr_val = td_coords_data;
1776 tdd_coords.is_null = is_null_geo;
1777 import_buffers[col_idx]->add_value(cd_coords, tdd_coords,
false);
1782 if (ring_sizes_column.size() != coords_row_count) {
1783 CHECK(
false) <<
"Geometry import columnar: ring sizes column size mismatch";
1787 for (
auto const& ring_sizes : ring_sizes_column) {
1788 bool is_null_geo =
false;
1789 if (!col_ti.get_notnull()) {
1791 is_null_geo = ring_sizes.empty();
1793 std::vector<TDatum> td_ring_sizes;
1794 for (
auto const& ring_size : ring_sizes) {
1795 TDatum td_ring_size;
1796 td_ring_size.val.int_val = ring_size;
1797 td_ring_sizes.push_back(td_ring_size);
1799 TDatum tdd_ring_sizes;
1800 tdd_ring_sizes.val.arr_val = td_ring_sizes;
1801 tdd_ring_sizes.is_null = is_null_geo;
1802 import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
1808 if (poly_rings_column.size() != coords_row_count) {
1809 CHECK(
false) <<
"Geometry import columnar: poly rings column size mismatch";
1813 for (
auto const& poly_rings : poly_rings_column) {
1814 bool is_null_geo =
false;
1815 if (!col_ti.get_notnull()) {
1817 is_null_geo = poly_rings.empty();
1819 std::vector<TDatum> td_poly_rings;
1820 for (
auto const& num_rings : poly_rings) {
1821 TDatum td_num_rings;
1822 td_num_rings.val.int_val = num_rings;
1823 td_poly_rings.push_back(td_num_rings);
1825 TDatum tdd_poly_rings;
1826 tdd_poly_rings.val.arr_val = td_poly_rings;
1827 tdd_poly_rings.is_null = is_null_geo;
1828 import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings,
false);
1835 if (bounds_column.size() != coords_row_count) {
1836 CHECK(
false) <<
"Geometry import columnar: bounds column size mismatch";
1839 for (
auto const& bounds : bounds_column) {
1840 bool is_null_geo =
false;
1841 if (!col_ti.get_notnull()) {
1845 std::vector<TDatum> td_bounds_data;
1846 for (
auto const& b : bounds) {
1848 td_double.val.real_val = b;
1849 td_bounds_data.push_back(td_double);
1852 tdd_bounds.val.arr_val = td_bounds_data;
1853 tdd_bounds.is_null = is_null_geo;
1854 import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds,
false);
1863 const std::list<const ColumnDescriptor*>& col_descs) {
1867 int collection_col_idx = -1;
1869 std::string collection_col_name;
1871 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1872 auto const& cd = *cd_it;
1873 auto const col_type = cd->columnType.get_type();
1875 if (collection_col_idx >= 0) {
1876 throw std::runtime_error(
1877 "Explode Collections: Found more than one destination column");
1879 collection_col_idx = col_idx;
1880 collection_child_type = col_type;
1881 collection_col_name = cd->columnName;
1883 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1888 if (collection_col_idx < 0) {
1889 throw std::runtime_error(
1890 "Explode Collections: Failed to find a supported column type to explode "
1893 return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1897 OGRGeometry* ogr_geometry,
1899 const std::string& collection_col_name,
1900 size_t row_or_feature_idx,
1901 std::function<
void(OGRGeometry*)> execute_import_lambda) {
1902 auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1903 bool is_collection =
false;
1904 switch (collection_child_type) {
1906 switch (ogr_geometry_type) {
1908 is_collection =
true;
1913 throw std::runtime_error(
1914 "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1918 switch (ogr_geometry_type) {
1919 case wkbMultiLineString:
1920 is_collection =
true;
1925 throw std::runtime_error(
1926 "Explode Collections: Source geo type must be MULTILINESTRING or "
1931 switch (ogr_geometry_type) {
1932 case wkbMultiPolygon:
1933 is_collection =
true;
1938 throw std::runtime_error(
1939 "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1943 CHECK(
false) <<
"Unsupported geo child type " << collection_child_type;
1949 if (is_collection) {
1951 OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1952 CHECK(collection_geometry);
1954 #if LOG_EXPLODE_COLLECTIONS
1956 LOG(
INFO) <<
"Exploding row/feature " << row_or_feature_idx <<
" for column '"
1957 << explode_col_name <<
"' into " << collection_geometry->getNumGeometries()
1962 uint32_t child_geometry_count = 0;
1963 auto child_geometry_it = collection_geometry->begin();
1964 while (child_geometry_it != collection_geometry->end()) {
1966 OGRGeometry* import_geometry = *child_geometry_it;
1968 [&] { execute_import_lambda(import_geometry); });
1971 child_geometry_it++;
1972 child_geometry_count++;
1977 [&] { execute_import_lambda(ogr_geometry); });
1989 std::unique_ptr<
char[]> scratch_buffer,
1993 size_t first_row_index_this_buffer,
1995 Executor* executor) {
1997 int64_t total_get_row_time_us = 0;
1998 int64_t total_str_to_val_time_us = 0;
1999 auto query_session = session_info ? session_info->
get_session_id() :
"";
2000 CHECK(scratch_buffer);
2001 auto buffer = scratch_buffer.get();
2008 const std::list<const ColumnDescriptor*>& col_descs = importer->
get_column_descs();
2011 const char* thread_buf = buffer + begin_pos + begin;
2012 const char* thread_buf_end = buffer + end_pos;
2013 const char* buf_end = buffer + total_size;
2014 bool try_single_thread =
false;
2015 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2020 for (
const auto cd : col_descs) {
2021 const auto& col_ti = cd->columnType;
2022 phys_cols += col_ti.get_physical_cols();
2023 if (cd->columnType.get_type() ==
kPOINT ||
2028 auto num_cols = col_descs.size() - phys_cols;
2029 for (
const auto& p : import_buffers) {
2032 std::vector<std::string_view> row;
2033 size_t row_index_plus_one = 0;
2034 for (
const char* p = thread_buf; p < thread_buf_end; p++) {
2036 std::vector<std::unique_ptr<char[]>>
2038 row_index_plus_one++;
2051 total_get_row_time_us += us;
2064 if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2066 LOG(
ERROR) <<
"Incorrect Row (expected " << num_cols <<
" columns, has "
2078 auto execute_import_row = [&](OGRGeometry* import_geometry) {
2079 size_t import_idx = 0;
2082 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2084 const auto& col_ti = cd->columnType;
2097 if (!cd->columnType.is_string() && row[import_idx].empty()) {
2100 if (!cd->columnType.is_string() && !copy_params.
trim_spaces) {
2102 row[import_idx] =
sv_strip(row[import_idx]);
2105 if (col_ti.get_physical_cols() == 0) {
2108 import_buffers[col_idx]->add_value(
2109 cd, row[import_idx], is_null, copy_params);
2118 import_buffers[col_idx]->add_value(
2119 cd, copy_params.
null_str,
true, copy_params);
2122 auto const& geo_string = row[import_idx];
2128 SQLTypes col_type = col_ti.get_type();
2131 std::vector<double> coords;
2132 std::vector<double> bounds;
2133 std::vector<int> ring_sizes;
2134 std::vector<int> poly_rings;
2140 geo_string.size() > 0 &&
2141 (geo_string[0] ==
'.' || isdigit(geo_string[0]) ||
2142 geo_string[0] ==
'-') &&
2143 geo_string.find_first_of(
"ABCDEFabcdef") == std::string::npos) {
2144 double lon = std::atof(std::string(geo_string).c_str());
2146 auto lat_str = row[import_idx];
2148 if (lat_str.size() > 0 &&
2149 (lat_str[0] ==
'.' || isdigit(lat_str[0]) || lat_str[0] ==
'-')) {
2150 lat = std::atof(std::string(lat_str).c_str());
2153 if (!copy_params.
lonlat) {
2164 import_ti.get_output_srid() == 4326) {
2168 import_ti.set_input_srid(srid0);
2172 throw std::runtime_error(
2173 "Cannot read lon/lat to insert into POINT/MULTIPOINT column " +
2181 import_ti.get_output_srid() == 4326) {
2185 import_ti.set_input_srid(srid0);
2189 if (col_ti.get_notnull()) {
2190 throw std::runtime_error(
"NULL geo for column " + cd->columnName);
2193 import_ti, coords, bounds, ring_sizes, poly_rings);
2195 if (import_geometry) {
2206 "Failed to extract valid geometry from exploded row " +
2208 row_index_plus_one) +
2209 " for column " + cd->columnName;
2210 throw std::runtime_error(msg);
2215 std::string(geo_string),
2222 std::string msg =
"Failed to extract valid geometry from row " +
2224 row_index_plus_one) +
2225 " for column " + cd->columnName;
2226 throw std::runtime_error(msg);
2232 throw std::runtime_error(
2233 "Imported geometry doesn't match the type of column " +
2250 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2259 "Table load was cancelled via Query Interrupt";
2263 }
catch (
const std::exception& e) {
2264 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2265 import_buffers[col_idx_to_pop]->pop_value();
2268 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
2271 LOG(
ERROR) <<
"Load was cancelled due to max reject rows being reached";
2274 "Load was cancelled due to max reject rows being reached";
2281 auto const [collection_col_idx, collection_child_type, collection_col_name] =
2284 CHECK_LT(collection_col_idx, (
int)row.size()) <<
"column index out of range";
2285 auto const& collection_geo_string = row[collection_col_idx];
2287 OGRGeometry* ogr_geometry =
nullptr;
2290 OGRGeometryFactory::destroyGeometry(ogr_geometry);
2297 collection_child_type,
2298 collection_col_name,
2299 first_row_index_this_buffer + row_index_plus_one,
2300 execute_import_row);
2304 [&] { execute_import_row(
nullptr); });
2311 total_str_to_val_time_us += us;
2321 LOG(
INFO) <<
"Thread" << std::this_thread::get_id() <<
":"
2323 << (double)ms / 1000.0 <<
"sec, Insert Time: " << (
double)load_ms / 1000.0
2324 <<
"sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2325 <<
"sec, str_to_val: " << (
double)total_str_to_val_time_us / 1000000.0
2326 <<
"sec" << std::endl;
2329 return thread_import_status;
2335 : std::runtime_error(
"Column '" + column_name +
"' is not a geo column") {}
2341 OGRCoordinateTransformation* coordinate_transformation,
2343 size_t firstFeature,
2352 const std::list<const ColumnDescriptor*>& col_descs = importer->
get_column_descs();
2353 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2355 auto query_session = session_info ? session_info->
get_session_id() :
"";
2356 for (
const auto& p : import_buffers) {
2363 for (
size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2365 if (!features[iFeature]) {
2372 OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2373 if (pGeometry && coordinate_transformation) {
2374 pGeometry->transform(coordinate_transformation);
2381 auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2387 thread_import_status.
load_msg =
"Table load was cancelled via Query Interrupt";
2391 uint32_t field_column_count{0u};
2392 uint32_t metadata_column_count{0u};
2394 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2398 const auto& col_ti = cd->columnType;
2399 if (col_ti.is_geometry()) {
2404 SQLTypes col_type = col_ti.get_type();
2407 import_buffers[col_idx]->add_value(
2408 cd, copy_params.
null_str,
true, copy_params);
2412 std::vector<double> coords;
2413 std::vector<double> bounds;
2414 std::vector<int> ring_sizes;
2415 std::vector<int> poly_rings;
2419 bool is_null_geo = !import_geometry;
2421 if (col_ti.get_notnull()) {
2422 throw std::runtime_error(
"NULL geo for column " + cd->columnName);
2425 import_ti, coords, bounds, ring_sizes, poly_rings);
2435 std::string msg =
"Failed to extract valid geometry from feature " +
2437 " for column " + cd->columnName;
2438 throw std::runtime_error(msg);
2443 throw std::runtime_error(
2444 "Imported geometry doesn't match the type of column " +
2451 auto cd_coords = *cd_it;
2452 std::vector<TDatum> td_coord_data;
2454 std::vector<uint8_t> compressed_coords =
2456 for (
auto cc : compressed_coords) {
2458 td_byte.val.int_val = cc;
2459 td_coord_data.push_back(td_byte);
2463 tdd_coords.val.arr_val = td_coord_data;
2464 tdd_coords.is_null = is_null_geo;
2465 import_buffers[col_idx]->add_value(cd_coords, tdd_coords,
false);
2472 auto cd_ring_sizes = *cd_it;
2473 std::vector<TDatum> td_ring_sizes;
2475 for (
auto ring_size : ring_sizes) {
2476 TDatum td_ring_size;
2477 td_ring_size.val.int_val = ring_size;
2478 td_ring_sizes.push_back(td_ring_size);
2481 TDatum tdd_ring_sizes;
2482 tdd_ring_sizes.val.arr_val = td_ring_sizes;
2483 tdd_ring_sizes.is_null = is_null_geo;
2484 import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
2491 auto cd_poly_rings = *cd_it;
2492 std::vector<TDatum> td_poly_rings;
2494 for (
auto num_rings : poly_rings) {
2495 TDatum td_num_rings;
2496 td_num_rings.val.int_val = num_rings;
2497 td_poly_rings.push_back(td_num_rings);
2500 TDatum tdd_poly_rings;
2501 tdd_poly_rings.val.arr_val = td_poly_rings;
2502 tdd_poly_rings.is_null = is_null_geo;
2503 import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings,
false);
2512 auto cd_bounds = *cd_it;
2513 std::vector<TDatum> td_bounds_data;
2515 for (
auto b : bounds) {
2517 td_double.val.real_val = b;
2518 td_bounds_data.push_back(td_double);
2522 tdd_bounds.val.arr_val = td_bounds_data;
2523 tdd_bounds.is_null = is_null_geo;
2524 import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds,
false);
2527 }
else if (field_column_count < fieldNameToIndexMap.size()) {
2531 auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2532 CHECK(cit != columnNameToSourceNameMap.end());
2533 auto const& field_name = cit->second;
2535 auto const fit = fieldNameToIndexMap.find(field_name);
2536 if (fit == fieldNameToIndexMap.end()) {
2540 auto const& field_index = fit->second;
2541 CHECK(field_index < fieldNameToIndexMap.size());
2543 auto const& feature = features[iFeature];
2545 auto field_defn = feature->GetFieldDefnRef(field_index);
2552 std::string value_string;
2553 int array_index = 0, array_size = 0;
2555 auto stringify_numeric_list = [&](
auto* values) {
2557 while (array_index < array_size) {
2558 auto separator = (array_index > 0) ?
"," :
"";
2562 value_string +=
"}";
2565 auto field_type = field_defn->GetType();
2566 switch (field_type) {
2575 value_string = feature->GetFieldAsString(field_index);
2577 case OFTIntegerList: {
2578 auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2579 stringify_numeric_list(values);
2581 case OFTInteger64List: {
2582 auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2583 stringify_numeric_list(values);
2586 auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2587 stringify_numeric_list(values);
2589 case OFTStringList: {
2590 auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2592 if (array_of_strings) {
2593 while (
auto* this_string = array_of_strings[array_index]) {
2594 auto separator = (array_index > 0) ?
"," :
"";
2595 value_string +=
separator + std::string(this_string);
2599 value_string +=
"}";
2602 throw std::runtime_error(
"Unsupported geo file field type (" +
2607 import_buffers[col_idx]->add_value(cd, value_string,
false, copy_params);
2609 field_column_count++;
2610 }
else if (metadata_column_count < metadata_column_infos.size()) {
2614 auto const& mci = metadata_column_infos[metadata_column_count];
2615 if (mci.column_descriptor.columnName != cd->columnName) {
2616 throw std::runtime_error(
"Metadata column name mismatch");
2618 import_buffers[col_idx]->add_value(cd, mci.value,
false, copy_params);
2620 metadata_column_count++;
2622 throw std::runtime_error(
"Column count mismatch");
2631 LOG(
ERROR) <<
"Input exception thrown: " << e.what() <<
". Aborting import.";
2632 throw std::runtime_error(e.what());
2633 }
catch (
const std::exception& e) {
2634 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2635 import_buffers[col_idx_to_pop]->pop_value();
2638 LOG(
ERROR) <<
"Input exception thrown: " << e.what() <<
". Row discarded.";
2644 auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2647 collection_child_type,
2648 collection_col_name,
2649 firstFeature + iFeature + 1,
2650 execute_import_feature);
2653 execute_import_feature(pGeometry);
2659 float load_s = 0.0f;
2667 LOG(
INFO) <<
"DEBUG: Process " << convert_s <<
"s";
2668 LOG(
INFO) <<
"DEBUG: Load " << load_s <<
"s";
2669 LOG(
INFO) <<
"DEBUG: Total " << (convert_s + load_s) <<
"s";
2674 return thread_import_status;
2678 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2681 return loadImpl(import_buffers, row_count,
false, session_info);
2684 bool Loader::load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2687 return loadImpl(import_buffers, row_count,
true, session_info);
2694 const int8_t* values_buffer{
nullptr};
2695 if (ti.is_string()) {
2701 CHECK(values_buffer);
2702 const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2703 switch (logical_size) {
2705 return values_buffer[index];
2708 return reinterpret_cast<const int16_t*
>(values_buffer)[index];
2711 return reinterpret_cast<const int32_t*
>(values_buffer)[index];
2714 return reinterpret_cast<const int64_t*
>(values_buffer)[index];
2717 LOG(
FATAL) <<
"Unexpected size for shard key: " << logical_size;
2726 const auto values_buffer = import_buffer.
getAsBytes();
2727 return reinterpret_cast<const float*
>(may_alias_ptr(values_buffer))[index];
2733 const auto values_buffer = import_buffer.
getAsBytes();
2734 return reinterpret_cast<const double*
>(may_alias_ptr(values_buffer))[index];
2742 for (
size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2743 const auto& input_buffer = import_buffers[col_idx];
2744 const auto& col_ti = input_buffer->getTypeInfo();
2750 shard_output_buffers[col_idx]->addBoolean(
int_value_at(*input_buffer, row_index));
2753 shard_output_buffers[col_idx]->addTinyint(
int_value_at(*input_buffer, row_index));
2756 shard_output_buffers[col_idx]->addSmallint(
2760 shard_output_buffers[col_idx]->addInt(
int_value_at(*input_buffer, row_index));
2763 shard_output_buffers[col_idx]->addBigint(
int_value_at(*input_buffer, row_index));
2766 shard_output_buffers[col_idx]->addFloat(
float_value_at(*input_buffer, row_index));
2769 shard_output_buffers[col_idx]->addDouble(
2775 CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2776 shard_output_buffers[col_idx]->addString(
2777 (*input_buffer->getStringBuffer())[row_index]);
2783 shard_output_buffers[col_idx]->addBigint(
int_value_at(*input_buffer, row_index));
2787 CHECK(input_buffer->getStringArrayBuffer());
2788 CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2789 const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2790 shard_output_buffers[col_idx]->addStringArray(input_arr);
2792 shard_output_buffers[col_idx]->addArray(
2793 (*input_buffer->getArrayBuffer())[row_index]);
2802 CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2803 shard_output_buffers[col_idx]->addGeoString(
2804 (*input_buffer->getGeoStringBuffer())[row_index]);
2814 std::vector<OneShardBuffers>& all_shard_import_buffers,
2815 std::vector<size_t>& all_shard_row_counts,
2817 const size_t row_count,
2818 const size_t shard_count,
2825 shard_col_desc = col_desc;
2829 CHECK(shard_col_desc);
2832 const auto& shard_col_ti = shard_col_desc->columnType;
2833 CHECK(shard_col_ti.is_integer() ||
2834 (shard_col_ti.is_string() && shard_col_ti.get_compression() ==
kENCODING_DICT) ||
2835 shard_col_ti.is_time());
2836 if (shard_col_ti.is_string()) {
2837 const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2838 CHECK(payloads_ptr);
2839 shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2842 for (
size_t i = 0; i < row_count; ++i) {
2843 const size_t shard =
2845 auto& shard_output_buffers = all_shard_import_buffers[shard];
2847 ++all_shard_row_counts[shard];
2852 std::vector<OneShardBuffers>& all_shard_import_buffers,
2853 std::vector<size_t>& all_shard_row_counts,
2855 const size_t row_count,
2856 const size_t shard_count,
2859 CHECK(shard_tds.size() == shard_count);
2861 for (
size_t shard = 0; shard < shard_count; ++shard) {
2862 auto& shard_output_buffers = all_shard_import_buffers[shard];
2863 if (row_count != 0) {
2868 all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2873 std::vector<size_t>& all_shard_row_counts,
2875 const size_t row_count,
2876 const size_t shard_count,
2878 all_shard_row_counts.resize(shard_count);
2879 for (
size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2880 all_shard_import_buffers.emplace_back();
2881 for (
const auto& typed_import_buffer : import_buffers) {
2882 all_shard_import_buffers.back().emplace_back(
2884 typed_import_buffer->getStringDictionary()));
2890 all_shard_row_counts,
2897 all_shard_row_counts,
2906 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2915 std::vector<OneShardBuffers> all_shard_import_buffers;
2916 std::vector<size_t> all_shard_row_counts;
2919 all_shard_row_counts,
2922 shard_tables.size(),
2924 bool success =
true;
2925 for (
size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2926 success = success &&
loadToShard(all_shard_import_buffers[shard_idx],
2927 all_shard_row_counts[shard_idx],
2928 shard_tables[shard_idx],
2938 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2940 std::vector<std::pair<const size_t, std::future<int8_t*>>>
2941 encoded_data_block_ptrs_futures;
2943 for (
size_t buf_idx = 0; buf_idx <
import_buffers.size(); buf_idx++) {
2946 auto string_payload_ptr =
import_buffers[buf_idx]->getStringBuffer();
2949 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2952 import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2958 for (
size_t buf_idx = 0; buf_idx <
import_buffers.size(); buf_idx++) {
2965 auto string_payload_ptr =
import_buffers[buf_idx]->getStringBuffer();
2975 auto geo_payload_ptr =
import_buffers[buf_idx]->getGeoStringBuffer();
2992 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2993 result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2999 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3007 bool success =
false;
3010 }
catch (std::exception& e) {
3011 std::ostringstream oss;
3012 oss <<
"Exception when loading Table " << shard_table->
tableName <<
", issue was "
3023 for (
auto& buffer : import_buffers) {
3024 ins_data.
columnIds.push_back(buffer->getColumnDesc()->columnId);
3032 loader_lock.unlock();
3037 shard_table->
fragmenter->insertData(ins_data);
3039 shard_table->
fragmenter->insertDataNoCheckpoint(ins_data);
3041 }
catch (std::exception& e) {
3042 std::ostringstream oss;
3043 oss <<
"Fragmenter Insert Exception when processing Table "
3044 << shard_table->
tableName <<
" issue was " << e.what();
3056 std::vector<const TableDescriptor*> table_descs(1,
table_desc_);
3060 for (
auto table_desc : table_descs) {
3061 table_desc->fragmenter->dropColumns(columnIds);
3071 CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3074 dict_map_[cd->columnId] = dd->stringDict.get();
3087 const std::string& file_path,
3088 const bool decompressed,
3095 throw std::runtime_error(
"failed to open file '" + file_path +
3096 "': " + strerror(errno));
3102 line.reserve(1 * 1024 * 1024);
3103 auto end_time = std::chrono::steady_clock::now() +
3104 timeout * (boost::istarts_with(file_path,
"s3://") ? 3 : 1);
3110 if (n++ >= line.capacity()) {
3122 if (
line1.empty()) {
3124 }
else if (line ==
line1) {
3133 if (std::chrono::steady_clock::now() >
end_time) {
3140 }
catch (std::exception& e) {
3159 if (boost::filesystem::path(
file_path).extension() ==
".tsv") {
3166 const char* buf =
raw_data.c_str();
3167 const char* buf_end = buf +
raw_data.size();
3168 bool try_single_thread =
false;
3169 for (
const char* p = buf; p < buf_end; p++) {
3170 std::vector<std::string> row;
3171 std::vector<std::unique_ptr<char[]>> tmp_buffers;
3182 if (try_single_thread) {
3186 if (try_single_thread) {
3189 for (
const char* p = buf; p < buf_end; p++) {
3190 std::vector<std::string> row;
3191 std::vector<std::unique_ptr<char[]>> tmp_buffers;
3209 boost::lexical_cast<
T>(str);
3210 }
catch (
const boost::bad_lexical_cast& e) {
3218 if (try_cast<double>(str)) {
3223 if (try_cast<int16_t>(str)) {
3225 }
else if (try_cast<int32_t>(str)) {
3227 }
else if (try_cast<int64_t>(str)) {
3229 }
else if (try_cast<float>(str)) {
3235 if (type ==
kTEXT) {
3237 std::string str_upper_case = str;
3239 str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3242 if (str_upper_case.find(
"POINT") == 0) {
3245 }
else if (str_upper_case.find(
"MULTIPOINT") == 0) {
3247 }
else if (str_upper_case.find(
"LINESTRING") == 0) {
3250 }
else if (str_upper_case.find(
"MULTILINESTRING") == 0) {
3252 }
else if (str_upper_case.find(
"POLYGON") == 0) {
3255 }
else if (str_upper_case.find(
"MULTIPOLYGON") == 0) {
3257 }
else if (str_upper_case.find_first_not_of(
"0123456789ABCDEF") ==
3258 std::string::npos &&
3259 (str_upper_case.size() % 2) == 0) {
3261 if (str_upper_case.size() >= 10) {
3266 auto first_five_bytes = str_upper_case.substr(0, 10);
3267 if (first_five_bytes ==
"0000000001" || first_five_bytes ==
"0101000000") {
3269 }
else if (first_five_bytes ==
"0000000004" || first_five_bytes ==
"0104000000") {
3271 }
else if (first_five_bytes ==
"0000000002" || first_five_bytes ==
"0102000000") {
3273 }
else if (first_five_bytes ==
"0000000005" || first_five_bytes ==
"0105000000") {
3275 }
else if (first_five_bytes ==
"0000000003" || first_five_bytes ==
"0103000000") {
3277 }
else if (first_five_bytes ==
"0000000006" || first_five_bytes ==
"0106000000") {
3291 if (type ==
kTEXT) {
3306 std::vector<SQLTypes> types(row.size());
3307 for (
size_t i = 0; i < row.size(); i++) {
3314 static std::array<int, kSQLTYPE_LAST> typeorder;
3315 typeorder[
kCHAR] = 0;
3318 typeorder[
kINT] = 4;
3323 typeorder[
kTIME] = 9;
3324 typeorder[
kDATE] = 10;
3331 typeorder[
kTEXT] = 12;
3334 return typeorder[b] < typeorder[
a];
3365 const std::vector<std::vector<std::string>>& raw_rows,
3371 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3372 const std::vector<std::vector<std::string>>::const_iterator& row_end,
3375 throw std::runtime_error(
"No rows found in: " +
3376 boost::filesystem::path(
file_path).stem().
string());
3379 size_t num_cols =
raw_rows.front().size();
3380 std::vector<SQLTypes> best_types(num_cols,
kCHAR);
3381 std::vector<size_t> non_null_col_counts(num_cols, 0);
3382 for (
auto row = row_begin; row != row_end; row++) {
3383 while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3384 best_types.push_back(
kCHAR);
3385 non_null_col_counts.push_back(0);
3387 for (
size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3389 if (row->at(col_idx) ==
"" || !row->at(col_idx).compare(copy_params.
null_str)) {
3393 non_null_col_counts[col_idx]++;
3395 best_types[col_idx] = t;
3398 if (std::chrono::steady_clock::now() >
end_time) {
3402 for (
size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3405 if (non_null_col_counts[col_idx] == 0) {
3406 best_types[col_idx] =
kTEXT;
3414 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3415 const std::vector<std::vector<std::string>>::const_iterator& row_end,
3416 const std::vector<SQLTypes>& best_types) {
3418 throw std::runtime_error(
"No rows found in: " +
3419 boost::filesystem::path(
file_path).stem().
string());
3421 size_t num_cols = best_types.size();
3422 std::vector<EncodingType> best_encodes(num_cols,
kENCODING_NONE);
3423 std::vector<size_t> num_rows_per_col(num_cols, 1);
3424 std::vector<std::unordered_set<std::string>> count_set(num_cols);
3425 for (
auto row = row_begin; row != row_end; row++) {
3426 for (
size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3428 count_set[col_idx].insert(row->at(col_idx));
3429 num_rows_per_col[col_idx]++;
3433 for (
size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3436 static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3437 if (uniqueRatio < 0.75) {
3442 return best_encodes;
3450 const std::vector<SQLTypes>& tail_types) {
3451 if (head_types.size() != tail_types.size()) {
3455 for (
size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3456 if (head_types[col_idx] !=
kTEXT) {
3459 has_headers = has_headers || tail_types[col_idx] !=
kTEXT;
3465 #if defined(ENABLE_IMPORT_PARQUET)
3466 if (data_preview_.has_value()) {
3467 return data_preview_.value().sample_rows;
3473 std::vector<std::vector<std::string>> sample_rows(
raw_rows.begin() + offset,
3480 #if defined(ENABLE_IMPORT_PARQUET)
3481 if (data_preview_.has_value()) {
3482 return data_preview_.value().column_names;
3499 #if defined(ENABLE_IMPORT_PARQUET)
3500 if (data_preview_.has_value()) {
3501 return data_preview_.value().column_types;
3505 std::vector<SQLTypeInfo> types;
3514 void Importer::load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3517 if (!
loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3525 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3530 loader->setTableEpochs(table_epochs);
3536 if (
loader->getTableDesc()->persistenceLevel ==
3543 if (!p->stringDictCheckpoint()) {
3544 LOG(
ERROR) <<
"Checkpointing Dictionary for Column "
3545 << p->getColumnDesc()->columnName <<
" failed.";
3554 LOG(
INFO) <<
"Dictionary Checkpointing took " << (double)ms / 1000.0 <<
" Seconds."
3566 std::vector<std::string> file_paths;
3580 for (
const auto&
file_path : file_paths) {
3596 #ifdef ENABLE_IMPORT_PARQUET
3597 import_parquet(file_paths, session_info);
3599 throw std::runtime_error(
"Parquet not supported!");
3609 #ifdef ENABLE_IMPORT_PARQUET
3612 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3615 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3633 const CopyParams& copy_params) {
3638 auto [foreign_server, user_mapping, foreign_table] =
3641 std::shared_ptr<arrow::fs::FileSystem> file_system;
3642 auto& server_options = foreign_server->options;
3647 file_system = std::make_shared<arrow::fs::LocalFileSystem>();
3649 }
else if (server_options
3654 create_parquet_s3_detect_filesystem_config(foreign_server.get(), copy_params));
3660 auto parquet_data_wrapper = std::make_unique<foreign_storage::ParquetDataWrapper>(
3661 foreign_table.get(), file_system);
3672 #ifdef ENABLE_IMPORT_PARQUET
3674 !g_enable_legacy_parquet_import) {
3675 data_preview_ = get_parquet_data_preview(fp.string(), cp);
3684 #ifdef ENABLE_IMPORT_PARQUET
3686 std::shared_ptr<arrow::io::ReadableFile>& infile,
3687 std::unique_ptr<parquet::arrow::FileReader>& reader,
3688 std::shared_ptr<arrow::Table>& table) {
3689 using namespace parquet::arrow;
3690 auto file_result = arrow::io::ReadableFile::Open(file_path);
3691 PARQUET_THROW_NOT_OK(file_result.status());
3692 infile = file_result.ValueOrDie();
3694 PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3695 PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3696 const auto num_row_groups = reader->num_row_groups();
3697 const auto num_columns = table->num_columns();
3698 const auto num_rows = table->num_rows();
3699 LOG(
INFO) <<
"File " << file_path <<
" has " << num_rows <<
" rows and " << num_columns
3700 <<
" columns in " << num_row_groups <<
" groups.";
3701 return std::make_tuple(num_row_groups, num_columns, num_rows);
3704 void Detector::import_local_parquet(
const std::string& file_path,
3707 std::shared_ptr<arrow::io::ReadableFile> infile;
3708 std::unique_ptr<parquet::arrow::FileReader> reader;
3709 std::shared_ptr<arrow::Table> table;
3710 int num_row_groups, num_columns;
3712 std::tie(num_row_groups, num_columns, num_rows) =
3717 copy_params.line_delim =
'\n';
3718 copy_params.delimiter =
',';
3720 copy_params.quoted =
true;
3721 copy_params.quote =
'"';
3722 copy_params.escape =
'"';
3723 for (
int c = 0; c < num_columns; ++c) {
3727 raw_data += table->ColumnNames().at(c);
3729 raw_data += copy_params.line_delim;
3733 for (
int g = 0; g < num_row_groups; ++g) {
3735 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3736 std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3737 arrays.resize(num_columns);
3738 for (
int c = 0; c < num_columns; ++c) {
3739 PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3740 for (
auto chunk : arrays[c]->chunks()) {
3741 getters.push_back(
value_getter(*chunk,
nullptr,
nullptr));
3744 for (
int r = 0; r < num_rows; ++r) {
3745 for (
int c = 0; c < num_columns; ++c) {
3746 std::vector<std::string> buffer;
3747 for (
auto chunk : arrays[c]->chunks()) {
3748 DataBuffer<std::string> data(&cd, *chunk, buffer,
nullptr);
3752 if (!chunk->IsNull(r)) {
3754 raw_data += boost::replace_all_copy(
3755 (data << getters[c](*chunk, r)).buffer.front(),
"\"",
"\"\"");
3760 raw_data += copy_params.line_delim;
3772 template <
typename DATA_TYPE>
3774 std::vector<DATA_TYPE>& buffer,
3776 const auto old_size = buffer.size();
3778 for (
auto rit = bad_rows_tracker->
rows.crbegin(); rit != bad_rows_tracker->
rows.crend();
3780 buffer.erase(buffer.begin() + *rit);
3782 return std::make_tuple(old_size, buffer.size());
3786 BadRowsTracker*
const bad_rows_tracker) {
3821 throw std::runtime_error(
"Invalid Type");
3825 void Importer::import_local_parquet(
const std::string& file_path,
3827 std::shared_ptr<arrow::io::ReadableFile> infile;
3828 std::unique_ptr<parquet::arrow::FileReader> reader;
3829 std::shared_ptr<arrow::Table> table;
3830 int num_row_groups, num_columns;
3831 int64_t nrow_in_file;
3832 std::tie(num_row_groups, num_columns, nrow_in_file) =
3837 std::vector<const ColumnDescriptor*> cds;
3839 int num_physical_cols = 0;
3840 for (
auto& cd : column_list) {
3844 arrow_throw_if(num_columns != (
int)(column_list.size() - num_physical_cols),
3845 "Unmatched numbers of columns in parquet file " + file_path +
": " +
3848 " columns in table.");
3852 const int num_slices = std::max<decltype(max_threads)>(
max_threads, num_columns);
3855 size_t nrow_completed{0};
3858 auto get_physical_col_idx = [&cds](
const int logic_col_idx) ->
auto{
3859 int physical_col_idx = 0;
3860 for (
int i = 0; i < logic_col_idx; ++i) {
3861 physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3863 return physical_col_idx;
3866 auto query_session = session_info ? session_info->
get_session_id() :
"";
3868 for (
int row_group = 0; row_group < num_row_groups; ++row_group) {
3883 for (
int slice = 0; slice < num_slices; slice++) {
3885 for (
const auto cd : cds) {
3887 new TypedImportBuffer(cd,
loader->getStringDict(cd)));
3899 std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3900 for (
size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3901 auto& bad_rows_tracker = bad_rows_trackers[slice];
3903 bad_rows_tracker.row_group = slice;
3904 bad_rows_tracker.importer =
this;
3907 for (
int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3908 const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3909 const auto cd = cds[physical_col_idx];
3910 std::shared_ptr<arrow::ChunkedArray> array;
3911 PARQUET_THROW_NOT_OK(
3912 reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3913 const size_t array_size = array->length();
3914 const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3916 for (
int slice = 0; slice < num_slices; ++slice) {
3922 thread_controller.startThread([&, slice] {
3923 const auto slice_offset = slice % num_slices;
3925 std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3926 std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3927 auto& bad_rows_tracker = bad_rows_trackers[slice];
3930 import_buffer->col_idx = physical_col_idx + 1;
3931 for (
auto chunk : array->chunks()) {
3932 import_buffer->add_arrow_values(
3933 cd, *chunk,
false, slice_range, &bad_rows_tracker);
3937 thread_controller.finish();
3939 std::vector<size_t> nrow_in_slice_raw(num_slices);
3940 std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3942 for (
int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3943 const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3944 const auto cd = cds[physical_col_idx];
3945 for (
int slice = 0; slice < num_slices; ++slice) {
3946 auto& bad_rows_tracker = bad_rows_trackers[slice];
3948 std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3953 for (
int slice = 0; slice < num_slices; ++slice) {
3955 nrow_in_slice_successfully_loaded[slice],
3959 const auto nrow_original =
3960 std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3961 const auto nrow_imported =
3963 nrow_in_slice_successfully_loaded.end(),
3965 const auto nrow_dropped = nrow_original - nrow_imported;
3966 LOG(
INFO) <<
"row group " << row_group <<
": add " << nrow_imported
3967 <<
" rows, drop " << nrow_dropped <<
" rows.";
3975 ") rows rejected exceeded. Halting load.";
3976 LOG(
ERROR) <<
"Maximum (" << copy_params.max_reject
3977 <<
") rows rejected exceeded. Halting load.";
3982 nrow_completed += nrow_imported;
3984 nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3986 const auto total_file_offset =
3989 if (total_file_offset) {
3995 << total_file_offset;
3999 LOG(
INFO) <<
"Import " << nrow_in_file <<
" rows of parquet file " << file_path
4000 <<
" took " << (double)ms_load_a_file / 1000.0 <<
" secs";
4003 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4005 auto importer =
dynamic_cast<Importer*
>(
this);
4006 auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4007 : std::vector<Catalog_Namespace::TableEpochInfo>{};
4009 std::exception_ptr teptr;
4012 for (
auto const& file_path : file_paths) {
4013 std::map<int, std::string> url_parts;
4017 std::vector<std::string> objkeys;
4018 std::unique_ptr<S3ParquetArchive> us3arch;
4019 if (
"s3" == url_parts[2]) {
4022 copy_params.s3_access_key,
4023 copy_params.s3_secret_key,
4024 copy_params.s3_session_token,
4025 copy_params.s3_region,
4026 copy_params.s3_endpoint,
4027 copy_params.plain_text,
4028 copy_params.regex_path_filter,
4029 copy_params.file_sort_order_by,
4030 copy_params.file_sort_regex));
4031 us3arch->init_for_read();
4033 objkeys = us3arch->get_objkeys();
4035 throw std::runtime_error(
"AWS S3 support not available");
4036 #endif // HAVE_AWS_S3
4038 objkeys.emplace_back(file_path);
4043 for (
auto const& objkey : objkeys) {
4047 ? us3arch->land(objkey, teptr,
nullptr != dynamic_cast<Detector*>(
this))
4049 import_local_parquet(file_path, session_info);
4051 us3arch->vacuum(objkey);
4055 us3arch->vacuum(objkey);
4067 std::rethrow_exception(teptr);
4074 }
catch (
const std::exception& e) {
4081 importer->checkpoint(table_epochs);
4084 #endif // ENABLE_IMPORT_PARQUET
4087 std::vector<std::string>& file_paths,
4097 _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4099 auto pipe_res = pipe(fd);
4102 throw std::runtime_error(std::string(
"failed to create a pipe: ") + strerror(errno));
4105 signal(SIGPIPE, SIG_IGN);
4108 std::exception_ptr teptr;
4112 auto th_pipe_reader = std::thread([&]() {
4115 if (0 == (
p_file = fdopen(fd[0],
"r"))) {
4116 throw std::runtime_error(std::string(
"failed to open a pipe: ") +
4126 teptr = std::current_exception();
4139 auto th_pipe_writer = std::thread([&]() {
4140 std::unique_ptr<S3Archive> us3arch;
4142 for (
size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4144 auto file_path = file_paths[fi];
4145 std::unique_ptr<Archive> uarch;
4146 std::map<int, std::string> url_parts;
4148 const std::string S3_objkey_url_scheme =
"s3ok";
4149 if (
"file" == url_parts[2] ||
"" == url_parts[2]) {
4151 }
else if (
"s3" == url_parts[2]) {
4156 copy_params.s3_access_key,
4157 copy_params.s3_secret_key,
4158 copy_params.s3_session_token,
4159 copy_params.s3_region,
4160 copy_params.s3_endpoint,
4161 copy_params.plain_text,
4162 copy_params.regex_path_filter,
4163 copy_params.file_sort_order_by,
4164 copy_params.file_sort_regex));
4165 us3arch->init_for_read();
4168 for (
const auto& objkey : us3arch->get_objkeys()) {
4169 file_paths.emplace_back(std::string(S3_objkey_url_scheme) +
"://" + objkey);
4173 throw std::runtime_error(
"AWS S3 support not available");
4174 #endif // HAVE_AWS_S3
4175 }
else if (S3_objkey_url_scheme == url_parts[2]) {
4177 auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4179 us3arch->land(objkey, teptr,
nullptr != dynamic_cast<Detector*>(
this));
4180 if (0 == file_path.size()) {
4181 throw std::runtime_error(std::string(
"failed to land s3 object: ") + objkey);
4185 us3arch->vacuum(objkey);
4187 throw std::runtime_error(
"AWS S3 support not available");
4188 #endif // HAVE_AWS_S3
4190 #if 0 // TODO(ppan): implement and enable any other archive class
4192 if (
"hdfs" == url_parts[2])
4193 uarch.reset(
new HdfsArchive(file_path));
4196 throw std::runtime_error(std::string(
"unsupported archive url: ") + file_path);
4200 auto& arch = *uarch;
4206 bool just_saw_archive_header;
4207 bool is_detecting =
nullptr !=
dynamic_cast<Detector*
>(
this);
4208 bool first_text_header_skipped =
false;
4212 size_t num_block_read = 0;
4213 while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4214 bool insert_line_delim_after_this_file =
false;
4217 auto ok = arch.read_data_block(&buf, &size, &offset);
4233 const char* buf2 = (
const char*)buf;
4236 just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4237 while (size2-- > 0) {
4238 if (*buf2++ == copy_params.line_delim) {
4243 LOG(
WARNING) <<
"No line delimiter in block." << std::endl;
4245 just_saw_archive_header =
false;
4246 first_text_header_skipped =
true;
4255 int nremaining = size2;
4256 while (nremaining > 0) {
4258 int nwritten =
write(fd[1], buf2, nremaining);
4262 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4265 }
else if (errno == EPIPE &&
4272 throw std::runtime_error(
4273 std::string(
"failed or interrupted write to pipe: ") +
4276 }
else if (nwritten == nremaining) {
4281 nremaining -= nwritten;
4292 const char* plast =
static_cast<const char*
>(buf) + (size - 1);
4293 insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4301 if (insert_line_delim_after_this_file) {
4304 int nwritten =
write(fd[1], ©_params.line_delim, 1);
4308 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4311 }
else if (errno == EPIPE &&
4318 throw std::runtime_error(
4319 std::string(
"failed or interrupted write to pipe: ") +
4322 }
else if (nwritten == 1) {
4338 if (
nullptr != dynamic_cast<Detector*>(
this)) {
4343 teptr = std::current_exception();
4352 th_pipe_reader.join();
4353 th_pipe_writer.join();
4357 std::rethrow_exception(teptr);
4366 const std::string& file_path,
4367 const bool decompressed,
4370 auto query_session = session_info ? session_info->
get_session_id() :
"";
4376 throw std::runtime_error(
"failed to open file '" + file_path +
4377 "': " + strerror(errno));
4380 if (!decompressed) {
4381 (void)fseek(
p_file, 0, SEEK_END);
4389 size_t alloc_size = copy_params.buffer_size;
4390 if (!decompressed &&
file_size < alloc_size) {
4396 for (
const auto cd :
loader->get_column_descs()) {
4398 std::make_unique<TypedImportBuffer>(cd,
loader->getStringDict(cd)));
4402 auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4403 size_t current_pos = 0;
4405 size_t begin_pos = 0;
4407 (void)fseek(
p_file, current_pos, SEEK_SET);
4409 fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size,
p_file);
4412 loader->getTableDesc()->tableId};
4413 auto table_epochs =
loader->getTableEpochs();
4416 std::list<std::future<ImportStatus>> threads;
4420 std::stack<size_t> stack_thread_ids;
4422 stack_thread_ids.push(i);
4425 size_t first_row_index_this_buffer = 0;
4428 unsigned int num_rows_this_buffer = 0;
4429 CHECK(scratch_buffer);
4434 first_row_index_this_buffer,
4435 num_rows_this_buffer,
4439 int nresidual = size - end_pos;
4440 std::unique_ptr<char[]> unbuf;
4441 if (nresidual > 0) {
4442 unbuf = std::make_unique<char[]>(nresidual);
4443 memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4447 auto thread_id = stack_thread_ids.top();
4448 stack_thread_ids.pop();
4455 std::move(scratch_buffer),
4459 first_row_index_this_buffer,
4463 first_row_index_this_buffer += num_rows_this_buffer;
4465 current_pos += end_pos;
4466 scratch_buffer = std::make_unique<char[]>(alloc_size);
4467 CHECK(scratch_buffer);
4468 memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4470 fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual,
p_file);
4473 while (threads.size() > 0) {
4475 for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4476 it != threads.end();) {
4478 std::chrono::milliseconds span(0);
4479 if (p.wait_for(span) == std::future_status::ready) {
4480 auto ret_import_status = p.get();
4484 if (ret_import_status.load_failed) {
4489 size_t total_file_offset{0};
4493 total_file_offset += file_offset;
4497 if (decompressed ? total_file_offset : current_pos) {
4507 << total_file_offset;
4510 stack_thread_ids.push(ret_import_status.thread_id);
4511 threads.erase(it++);
4519 std::this_thread::yield();
4542 LOG(
ERROR) <<
"Maximum rows rejected exceeded. Halting load";
4552 for (
auto& p : threads) {
4578 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4584 const std::string& file_name,
4593 throw std::runtime_error(
"Unexpected CopyParams.source_type (" +
4604 const std::string& file_name) {
4606 OGRLayer* poLayer =
nullptr;
4607 if (geo_layer_name.size()) {
4608 poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4609 if (poLayer ==
nullptr) {
4610 throw std::runtime_error(
"Layer '" + geo_layer_name +
"' not found in " +
4614 poLayer = poDS->GetLayer(0);
4615 if (poLayer ==
nullptr) {
4616 throw std::runtime_error(
"No layers found in " + file_name);
4626 const std::string& file_name,
4627 const std::string& geo_column_name,
4628 std::map<std::string, std::vector<std::string>>& sample_data,
4633 if (datasource ==
nullptr) {
4634 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
4641 auto const* feature_defn = layer.GetLayerDefn();
4642 CHECK(feature_defn);
4645 auto const metadata_column_infos =
4650 auto const feature_count =
static_cast<uint64_t
>(layer.GetFeatureCount());
4651 auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4654 for (
int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4655 auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4657 sample_data[column_name] = {};
4659 sample_data[geo_column_name] = {};
4660 for (
auto const& mci : metadata_column_infos) {
4661 sample_data[mci.column_descriptor.columnName] = {};
4665 layer.ResetReading();
4668 uint64_t feature_index{0u};
4669 while (feature_index < num_features) {
4677 auto const* geometry = feature->GetGeometryRef();
4678 if (geometry ==
nullptr) {
4683 switch (wkbFlatten(geometry->getGeometryType())) {
4687 case wkbMultiLineString:
4689 case wkbMultiPolygon:
4692 throw std::runtime_error(
"Unsupported geometry type: " +
4693 std::string(geometry->getGeometryName()));
4697 for (
int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4698 auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4699 sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4703 for (
auto const& mci : metadata_column_infos) {
4704 sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4708 char* wkts =
nullptr;
4709 geometry->exportToWkt(&wkts);
4711 sample_data[geo_column_name].push_back(wkts);
4724 return std::make_pair(
kINT,
false);
4725 case OFTIntegerList:
4726 return std::make_pair(
kINT,
true);
4727 #if GDAL_VERSION_MAJOR > 1
4729 return std::make_pair(
kBIGINT,
false);
4730 case OFTInteger64List:
4731 return std::make_pair(
kBIGINT,
true);
4734 return std::make_pair(
kDOUBLE,
false);
4736 return std::make_pair(
kDOUBLE,
true);
4738 return std::make_pair(
kTEXT,
false);
4740 return std::make_pair(
kTEXT,
true);
4742 return std::make_pair(
kDATE,
false);
4744 return std::make_pair(
kTIME,
false);
4752 return std::make_pair(
kTINYINT,
true);
4756 throw std::runtime_error(
"Unknown OGR field type: " +
std::to_string(ogr_type));
4767 case wkbMultiLineString:
4771 case wkbMultiPolygon:
4776 throw std::runtime_error(
"Unknown OGR geom type: " +
std::to_string(ogr_type));
4781 switch (raster_point_type) {
4803 switch (raster_point_transform) {
4821 const std::string& file_name,
4822 const bool is_raster,
4823 const std::string& geo_column_name,
4833 const std::string& file_name,
4834 const std::string& geo_column_name,
4845 auto metadata_column_infos =
4858 metadata_column_infos);
4861 std::list<ColumnDescriptor> cds;
4867 for (
auto const& [col_name, sql_type] : point_names_and_sql_types) {
4872 if (sql_type ==
kPOINT) {
4886 for (
auto const& [band_name, sql_type] : band_names_and_types) {
4895 for (
auto& mci : metadata_column_infos) {
4896 cds.push_back(std::move(mci.column_descriptor));
4905 const std::string& file_name,
4906 const std::string& geo_column_name,
4908 std::list<ColumnDescriptor> cds;
4911 if (poDS ==
nullptr) {
4912 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
4915 if (poDS->GetLayerCount() == 0) {
4916 throw std::runtime_error(
"gdalToColumnDescriptors Error: Geo file " + file_name +
4923 layer.ResetReading();
4926 if (poFeature ==
nullptr) {
4927 throw std::runtime_error(
"No features found in " + file_name);
4930 OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4933 for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4934 OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4935 auto typePair =
ogr_to_type(poFieldDefn->GetType());
4938 cd.sourceName = poFieldDefn->GetNameRef();
4940 if (typePair.second) {
4946 if (typePair.first ==
kTEXT) {
4955 auto ogr_type = wkbFlatten(layer.GetGeomType());
4956 if (ogr_type == wkbUnknown) {
4958 auto const* ogr_geometry = poFeature->GetGeometryRef();
4960 ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
4964 if (ogr_type != wkbNone) {
4971 if (ogr_type == wkbMultiPolygon) {
4972 ogr_type = wkbPolygon;
4973 }
else if (ogr_type == wkbMultiLineString) {
4974 ogr_type = wkbLineString;
4975 }
else if (ogr_type == wkbMultiPoint) {
4976 ogr_type = wkbPoint;
5009 auto metadata_column_infos =
5011 for (
auto& mci : metadata_column_infos) {
5012 cds.push_back(std::move(mci.column_descriptor));
5029 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5033 VSICurlClearCache();
5038 int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5044 if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5046 }
else if (VSI_ISREG(sb.st_mode)) {
5064 std::vector<std::string>& files) {
5066 std::vector<std::string> subdirectories;
5069 char** entries = VSIReadDir(archive_path.c_str());
5071 LOG(
WARNING) <<
"Failed to get file listing at archive: " << archive_path;
5078 ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5084 char* entry_c = entries[index++];
5088 std::string entry(entry_c);
5091 if (entry ==
"." || entry ==
"..") {
5096 std::string entry_path = archive_path + std::string(
"/") + entry;
5100 int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5105 if (VSI_ISDIR(sb.st_mode)) {
5109 if (boost::iends_with(entry_path,
".gdb")) {
5111 files.push_back(entry_path);
5114 subdirectories.push_back(entry_path);
5118 files.push_back(entry_path);
5124 for (
const auto& subdirectory : subdirectories) {
5131 const std::string& archive_path,
5142 std::vector<std::string> files;
5148 for (
auto& file : files) {
5149 file.erase(0, archive_path.size() + 1);
5158 const std::string& file_name,
5169 std::vector<GeoFileLayerInfo> layer_info;
5173 if (poDS ==
nullptr) {
5174 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5179 for (
auto&& poLayer : poDS->GetLayers()) {
5182 poLayer->ResetReading();
5184 if (poLayer->GetFeatureCount() > 0) {
5186 auto ogr_type = wkbFlatten(poLayer->GetGeomType());
5187 if (ogr_type == wkbUnknown) {
5190 CHECK(first_feature);
5191 auto const* ogr_geometry = first_feature->GetGeometryRef();
5193 ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5206 case wkbMultiLineString:
5208 case wkbMultiPolygon:
5219 layer_info.emplace_back(poLayer->GetName(), contents);
5229 const bool is_raster) {
5233 return importGDALGeo(columnNameToSourceNameMap, session_info);
5242 if (poDS ==
nullptr) {
5243 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5251 size_t numFeatures = layer.GetFeatureCount();
5256 OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5258 size_t numFields = poFDefn->GetFieldCount();
5259 for (
size_t iField = 0; iField < numFields; iField++) {
5260 OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5261 fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5266 poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5268 #if GDAL_VERSION_MAJOR >= 3
5272 poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5275 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5285 auto const metadata_column_infos =
5292 auto query_session = session_info ? session_info->
get_session_id() :
"";
5293 auto query_submitted_time =
::toString(std::chrono::system_clock::now());
5295 auto is_session_already_registered =
false;
5298 executor->getSessionLock());
5299 is_session_already_registered =
5300 executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5303 !is_session_already_registered) {
5304 executor->enrollQuerySession(query_session,
5306 query_submitted_time,
5308 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5310 ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5313 executor->clearQuerySessionStatus(query_session, query_submitted_time);
5321 for (
const auto cd :
loader->get_column_descs()) {
5327 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5329 std::list<std::future<ImportStatus>> threads;
5333 std::stack<size_t> stack_thread_ids;
5335 stack_thread_ids.push(i);
5340 auto table_epochs =
loader->getTableEpochs();
5343 layer.ResetReading();
5345 static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5348 std::vector<FeaturePtrVector> features(max_threads);
5351 std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5355 size_t firstFeatureThisChunk = 0;
5356 while (firstFeatureThisChunk < numFeatures) {
5358 size_t numFeaturesThisChunk =
5359 std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5362 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5365 auto thread_id = stack_thread_ids.top();
5366 stack_thread_ids.pop();
5367 CHECK(thread_id < max_threads);
5371 for (
size_t i = 0; i < numFeaturesThisChunk; i++) {
5372 features[
thread_id].emplace_back(layer.GetNextFeature());
5377 if (coordinate_transformations[thread_id] ==
nullptr) {
5378 for (
auto const& feature : features[thread_id]) {
5379 auto const* geometry = feature->GetGeometryRef();
5381 auto const* geometry_sr = geometry->getSpatialReference();
5385 #
if GDAL_VERSION_MAJOR >= 3
5386 !geometry_sr->IsEmpty() &&
5388 !geometry_sr->IsSame(poGeographicSR.get())) {
5390 if (geometry_sr->Validate() != OGRERR_NONE) {
5391 throw std::runtime_error(
"Incoming geo has invalid Spatial Reference");
5395 coordinate_transformations[
thread_id].reset(
5396 OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR.get()));
5397 if (coordinate_transformations[thread_id] ==
nullptr) {
5398 throw std::runtime_error(
5399 "Failed to create a GDAL CoordinateTransformation for incoming geo");
5408 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5410 auto ret_import_status =
5413 coordinate_transformations[thread_id].
get(),
5414 std::move(features[thread_id]),
5415 firstFeatureThisChunk,
5416 numFeaturesThisChunk,
5417 fieldNameToIndexMap,
5418 columnNameToSourceNameMap,
5421 metadata_column_infos);
5422 import_status += ret_import_status;
5423 import_status.
rows_estimated = ((float)firstFeatureThisChunk / (
float)numFeatures) *
5424 import_status.rows_completed;
5432 coordinate_transformations[thread_id].
get(),
5433 std::move(features[thread_id]),
5434 firstFeatureThisChunk,
5435 numFeaturesThisChunk,
5436 fieldNameToIndexMap,
5437 columnNameToSourceNameMap,
5440 metadata_column_infos));
5443 while (threads.size() > 0) {
5445 for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5446 it != threads.end();) {
5448 std::chrono::milliseconds span(
5450 if (p.wait_for(span) == std::future_status::ready) {
5451 auto ret_import_status = p.get();
5456 ((float)firstFeatureThisChunk / (
float)numFeatures) *
5464 stack_thread_ids.push(ret_import_status.thread_id);
5466 threads.erase(it++);
5474 std::this_thread::yield();
5492 LOG(
ERROR) <<
"Maximum rows rejected exceeded. Halting load";
5496 LOG(
ERROR) <<
"A call to the Loader failed in GDAL, Please review the logs for "
5501 firstFeatureThisChunk += numFeaturesThisChunk;
5504 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5506 if (threads.size()) {
5507 for (
auto& p : threads) {
5511 auto ret_import_status = p.get();
5530 auto const metadata_column_infos =
5537 copy_params.raster_import_bands,
5538 copy_params.raster_import_dimensions,
5541 copy_params.raster_point_compute_angle,
5543 metadata_column_infos);
5546 auto const& column_descs =
loader->get_column_descs();
5547 uint32_t num_table_cols{0u};
5548 for (
auto const* cd : column_descs) {
5561 auto num_expected_cols = num_bands;
5562 num_expected_cols += point_names_and_sql_types.size();
5563 num_expected_cols += metadata_column_infos.size();
5564 if (num_expected_cols != num_table_cols) {
5565 throw std::runtime_error(
5566 "Raster Import aborted. Band/Column count mismatch (file requires " +
5576 auto cd_itr = column_descs.begin();
5577 for (
auto const& [col_name, sql_type] : point_names_and_sql_types) {
5578 if (sql_type ==
kPOINT) {
5581 auto const* cd = *cd_itr++;
5583 throw std::runtime_error(
"Column '" + cd->
columnName +
5584 "' overridden name invalid (must be '" + col_name +
5589 throw std::runtime_error(
"Column '" + cd->
columnName +
5590 "' overridden type invalid (must be POINT)");
5593 throw std::runtime_error(
"Column '" + cd->
columnName +
5594 "' overridden SRID invalid (must be 4326)");
5600 auto const* cd = *cd_itr++;
5606 auto const* cd = *cd_itr++;
5608 throw std::runtime_error(
"Column '" + cd->
columnName +
5609 "' overridden name invalid (must be '" + col_name +
5613 if (cd_type != sql_type) {
5614 throw std::runtime_error(
"Column '" + cd->
columnName +
5615 "' overridden type invalid (must be " +
5624 if (band_names_and_types.size() != num_bands) {
5625 throw std::runtime_error(
"Column/Band count mismatch when validating types");
5627 for (uint32_t i = 0; i < num_bands; i++) {
5628 auto const* cd = *cd_itr++;
5630 auto const sql_type = band_names_and_types[i].second;
5631 if (cd_type != sql_type) {
5632 throw std::runtime_error(
"Band Column '" + cd->
columnName +
5633 "' overridden type invalid (must be " +
5639 for (
auto const& mci : metadata_column_infos) {
5640 auto const* cd = *cd_itr++;
5641 if (mci.column_descriptor.columnName != cd->
columnName) {
5642 throw std::runtime_error(
"Metadata Column '" + cd->
columnName +
5643 "' overridden name invalid (must be '" +
5644 mci.column_descriptor.columnName +
"')");
5647 auto const md_type = mci.column_descriptor.columnType.get_type();
5648 if (cd_type != md_type) {
5649 throw std::runtime_error(
"Metadata Column '" + cd->
columnName +
5650 "' overridden type invalid (must be " +
5659 auto query_session = session_info ? session_info->
get_session_id() :
"";
5660 auto query_submitted_time =
::toString(std::chrono::system_clock::now());
5662 auto is_session_already_registered =
false;
5665 executor->getSessionLock());
5666 is_session_already_registered =
5667 executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5670 !is_session_already_registered) {
5671 executor->enrollQuerySession(query_session,
5673 query_submitted_time,
5675 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5677 ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5680 executor->clearQuerySessionStatus(query_session, query_submitted_time);
5688 if (copy_params.raster_scanlines_per_thread < 0) {
5689 throw std::runtime_error(
"Invalid CopyParams.raster_scanlines_per_thread! (" +
5693 const int max_scanlines_per_thread =
5694 copy_params.raster_scanlines_per_thread == 0
5696 : std::min(copy_params.raster_scanlines_per_thread,
5698 VLOG(1) <<
"Raster Importer: Max scanlines per thread: " << max_scanlines_per_thread;
5701 auto table_epochs =
loader->getTableEpochs();
5709 copy_params.threads == 0);
5715 for (
auto const& cd :
loader->get_column_descs()) {
5722 using ThreadReturn = std::tuple<ImportStatus, std::array<float, 3>>;
5729 std::vector<RasterImporter::RawPixels> raw_pixel_bytes_per_thread(max_threads);
5731 raw_pixel_bytes_per_thread[i].resize(band_size_x * max_scanlines_per_thread *
5736 auto const point_sql_type = point_names_and_sql_types.size()
5737 ? point_names_and_sql_types.begin()->second
5742 [&](
const size_t thread_idx,
const int y_start,
const int y_end) -> ThreadReturn {
5747 auto& raw_pixel_bytes = raw_pixel_bytes_per_thread[thread_idx];
5750 for (
auto& col_buffer : import_buffers) {
5751 col_buffer->clear();
5755 auto col_itr = column_descs.begin();
5759 if (point_sql_type !=
kNULLT) {
5761 auto const* cd_col0 = *col_itr++;
5762 auto const* cd_col1 = *col_itr++;
5763 auto const* cd_angle =
5764 copy_params.raster_point_compute_angle ? *col_itr++ :
nullptr;
5768 for (
int y = y_start; y < y_end; y++) {
5773 for (
int x = 0; x < band_size_x; x++) {
5775 auto const& [dx, dy, angle] = coords[x];
5778 switch (point_sql_type) {
5782 import_buffers[0]->add_value(cd_col0, td_point,
false);
5786 auto const compressed_coords =
5788 std::vector<TDatum> td_coords_data;
5789 for (
auto const& cc : compressed_coords) {
5791 td_byte.val.int_val = cc;
5792 td_coords_data.push_back(td_byte);
5795 td_coords.val.arr_val = td_coords_data;
5796 td_coords.is_null =
false;
5797 import_buffers[1]->add_value(cd_col1, td_coords,
false);
5803 td.val.real_val = dx;
5804 import_buffers[0]->add_value(cd_col0, td,
false);
5805 td.val.real_val = dy;
5806 import_buffers[1]->add_value(cd_col1, td,
false);
5812 td.val.int_val =
static_cast<int64_t
>(x);
5813 import_buffers[0]->add_value(cd_col0, td,
false);
5814 td.val.int_val =
static_cast<int64_t
>(y);
5815 import_buffers[1]->add_value(cd_col1, td,
false);
5822 if (copy_params.raster_point_compute_angle) {
5826 td.val.real_val =
static_cast<double>(angle);
5827 import_buffers[2]->add_value(cd_angle, td,
false);
5832 col_idx += (copy_params.raster_point_compute_angle ? 3 : 2);
5840 auto const num_rows = y_end - y_start;
5841 auto const num_elems = band_size_x * num_rows;
5845 bool read_block_failed =
false;
5848 boost::dynamic_bitset<> row_band_nulls;
5849 if (copy_params.raster_drop_if_all_null) {
5850 row_band_nulls.resize(num_elems * num_bands);
5853 auto set_row_band_null = [&](
const int row,
const uint32_t band) {
5854 auto const bit_index = (row * num_bands) + band;
5855 row_band_nulls.set(bit_index);
5857 auto all_row_bands_null = [&](
const int row) ->
bool {
5858 auto const first_bit_index = row * num_bands;
5859 bool all_null =
true;
5860 for (
auto i = first_bit_index; i < first_bit_index + num_bands; i++) {
5861 all_null = all_null && row_band_nulls.test(i);
5867 for (uint32_t band_idx = 0; band_idx < num_bands; band_idx++) {
5869 auto const* cd_band = *col_itr;
5873 auto const cd_type = cd_band->columnType.get_type();
5879 thread_idx, band_idx, y_start, num_rows, cd_type, raw_pixel_bytes);
5881 }
catch (std::runtime_error& e) {
5885 read_block_failed =
true;
5890 auto const [null_value, null_value_valid] =
5899 const int16_t* values =
5900 reinterpret_cast<const int16_t*
>(raw_pixel_bytes.data());
5901 for (
int idx = 0; idx < num_elems; idx++) {
5902 auto const& value = values[idx];
5903 if (null_value_valid && value == static_cast<int16_t>(null_value)) {
5906 if (copy_params.raster_drop_if_all_null) {
5907 set_row_band_null(idx, band_idx);
5911 td.val.int_val =
static_cast<int64_t
>(value);
5913 import_buffers[col_idx]->add_value(cd_band, td,
false);
5917 const int32_t* values =
5918 reinterpret_cast<const int32_t*
>(raw_pixel_bytes.data());
5919 for (
int idx = 0; idx < num_elems; idx++) {
5920 auto const& value = values[idx];
5921 if (null_value_valid && value == static_cast<int32_t>(null_value)) {
5924 if (copy_params.raster_drop_if_all_null) {
5925 set_row_band_null(idx, band_idx);
5929 td.val.int_val =
static_cast<int64_t
>(value);
5931 import_buffers[col_idx]->add_value(cd_band, td,
false);
5935 const uint32_t* values =
5936 reinterpret_cast<const uint32_t*
>(raw_pixel_bytes.data());
5937 for (
int idx = 0; idx < num_elems; idx++) {
5938 auto const& value = values[idx];
5939 if (null_value_valid && value == static_cast<uint32_t>(null_value)) {
5942 if (copy_params.raster_drop_if_all_null) {
5943 set_row_band_null(idx, band_idx);
5947 td.val.int_val =
static_cast<int64_t
>(value);
5949 import_buffers[col_idx]->add_value(cd_band, td,
false);
5953 const float* values =
reinterpret_cast<const float*
>(raw_pixel_bytes.data());
5954 for (
int idx = 0; idx < num_elems; idx++) {
5955 auto const& value = values[idx];
5956 if (null_value_valid && value == static_cast<float>(null_value)) {
5959 if (copy_params.raster_drop_if_all_null) {
5960 set_row_band_null(idx, band_idx);
5964 td.val.real_val =
static_cast<double>(value);
5966 import_buffers[col_idx]->add_value(cd_band, td,
false);
5970 const double* values =
reinterpret_cast<const double*
>(raw_pixel_bytes.data());
5971 for (
int idx = 0; idx < num_elems; idx++) {
5972 auto const& value = values[idx];
5973 if (null_value_valid && value == null_value) {
5976 if (copy_params.raster_drop_if_all_null) {
5977 set_row_band_null(idx, band_idx);
5981 td.val.real_val = value;
5983 import_buffers[col_idx]->add_value(cd_band, td,
false);
5996 if (read_block_failed) {
5998 for (
auto& col_buffer : import_buffers) {
5999 col_buffer->clear();
6006 for (
auto const& mci : metadata_column_infos) {
6007 auto const* cd_band = *col_itr++;
6009 for (
int i = 0; i < num_elems; i++) {
6010 import_buffers[col_idx]->add_value(cd_band, mci.value,
false, copy_params);
6016 int num_dropped_as_all_null = 0;
6017 if (copy_params.raster_drop_if_all_null) {
6022 for (
int row = 0; row < num_elems; row++) {
6023 if (all_row_bands_null(row)) {
6024 num_dropped_as_all_null++;
6028 if (num_dropped_as_all_null == num_elems) {
6030 for (
auto& col_buffer : import_buffers) {
6031 col_buffer->clear();
6033 }
else if (num_dropped_as_all_null > 0) {
6037 for (
int row = 0; row < num_elems; row++) {
6038 if (all_row_bands_null(row)) {
6039 bad_rows_tracker.
rows.emplace(static_cast<int64_t>(row));
6043 for (
auto& col_buffer : import_buffers) {
6044 auto const* cd = col_buffer->getColumnDesc();
6047 col_buffer->del_values(col_type, &bad_rows_tracker);
6053 CHECK_LE(num_dropped_as_all_null, num_elems);
6054 auto const actual_num_elems = num_elems - num_dropped_as_all_null;
6061 return {std::move(thread_import_status), {proj_s, read_s, conv_s}};
6065 float total_proj_s{0.0f};
6066 float total_read_s{0.0f};
6067 float total_conv_s{0.0f};
6068 float total_load_s{0.0f};
6070 const int min_scanlines_per_thread = 8;
6071 const int max_scanlines_per_block = max_scanlines_per_thread *
max_threads;
6072 for (
int block_y = 0; block_y < band_size_y;
6073 block_y += (max_threads * max_scanlines_per_thread)) {
6074 using Future = std::future<ThreadReturn>;
6075 std::vector<Future> futures;
6076 const int scanlines_in_block =
6077 std::min(band_size_y - block_y, max_scanlines_per_block);
6078 const int pixels_in_block = scanlines_in_block * band_size_x;
6079 const int block_max_scanlines_per_thread =
6080 std::max((scanlines_in_block + static_cast<int>(max_threads) - 1) /
6081 static_cast<int>(max_threads),
6082 min_scanlines_per_thread);
6083 VLOG(1) <<
"Raster Importer: scanlines_in_block: " << scanlines_in_block
6084 <<
", block_max_scanlines_per_thread: " << block_max_scanlines_per_thread;
6089 const int y_start = block_y +
thread_id * block_max_scanlines_per_thread;
6090 if (y_start < band_size_y) {
6091 const int y_end = std::min(y_start + block_max_scanlines_per_thread, band_size_y);
6092 if (y_start < y_end) {
6093 futures.emplace_back(
6101 float proj_s{0.0f}, read_s{0.0f}, conv_s{0.0f}, load_s{0.0f};
6102 size_t thread_idx = 0;
6103 for (
auto& future : futures) {
6104 auto const [import_status, times] = future.get();
6115 if (import_status.rows_completed > 0) {
6123 total_proj_s += (proj_s / float(futures.size()));
6124 total_read_s += (read_s / float(futures.size()));
6125 total_conv_s += (conv_s / float(futures.size()));
6126 total_load_s += load_s;
6132 auto const block_wall_s =
TIMER_STOP(block_wall_timer);
6133 auto const scanlines_per_second = scanlines_in_block / block_wall_s;
6134 auto const rows_per_second = pixels_in_block / block_wall_s;
6135 LOG(
INFO) <<
"Raster Importer: Loaded " << scanlines_in_block
6136 <<
" scanlines starting at " << block_y <<
" out of " << band_size_y
6137 <<
" in " << block_wall_s <<
"s at " << scanlines_per_second
6138 <<
" scanlines/s and " << rows_per_second <<
" rows/s";
6156 auto const checkpoint_s =
TIMER_STOP(checkpoint_timer);
6159 auto const total_wall_s =
TIMER_STOP(wall_timer);
6162 auto const total_scanlines_per_second = float(band_size_y) / total_wall_s;
6163 auto const total_rows_per_second =
6164 float(band_size_x) * float(band_size_y) / total_wall_s;
6165 LOG(
INFO) <<
"Raster Importer: Imported "
6166 <<
static_cast<uint64_t
>(band_size_x) * static_cast<uint64_t>(band_size_y)
6168 LOG(
INFO) <<
"Raster Importer: Total Import Time " << total_wall_s <<
"s at "
6169 << total_scanlines_per_second <<
" scanlines/s and " << total_rows_per_second
6175 std::string msg =
"Raster Importer: Import aborted after failing to read " +
6177 " rows/pixels (limit " +
std::to_string(copy_params.max_reject) +
6181 throw std::runtime_error(msg);
6185 auto proj_pct = float(
int(total_proj_s / total_wall_s * 1000.0
f) * 0.1f);
6186 auto read_pct = float(
int(total_read_s / total_wall_s * 1000.0f) * 0.1f);
6187 auto conv_pct = float(
int(total_conv_s / total_wall_s * 1000.0f) * 0.1f);
6188 auto load_pct = float(
int(total_load_s / total_wall_s * 1000.0f) * 0.1f);
6189 auto cpnt_pct = float(
int(checkpoint_s / total_wall_s * 1000.0f) * 0.1f);
6191 VLOG(1) <<
"Raster Importer: Import timing breakdown:";
6192 VLOG(1) <<
" Project " << total_proj_s <<
"s (" << proj_pct <<
"%)";
6193 VLOG(1) <<
" Read " << total_read_s <<
"s (" << read_pct <<
"%)";
6194 VLOG(1) <<
" Convert " << total_conv_s <<
"s (" << conv_pct <<
"%)";
6195 VLOG(1) <<
" Load " << total_load_s <<
"s (" << load_pct <<
"%)";
6196 VLOG(1) <<
" Checkpoint " << checkpoint_s <<
"s (" << cpnt_pct <<
"%)";
6208 std::vector<std::unique_ptr<TypedImportBuffer>> import_buffers;
6209 for (
auto cd : col_descs) {
6210 import_buffers.emplace_back(
6211 std::make_unique<TypedImportBuffer>(cd, loader->
getStringDict(cd)));
6214 return import_buffers;
6220 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> defaults_buffers;
6226 if (cds.size() == insert_data.
columnIds.size()) {
6228 return defaults_buffers;
6230 for (
auto cd : cds) {
6231 if (std::find(insert_data.
columnIds.begin(),
6237 throw std::runtime_error(
"Cannot omit column \"" + cd->
columnName +
6238 "\": omitting TEXT arrays is not supported yet");
6243 defaults_buffers.emplace_back(std::make_unique<TypedImportBuffer>(cd, dict));
6248 defaults_buffers.end(),
6249 [](decltype(defaults_buffers[0])&
a, decltype(defaults_buffers[0])& b) {
6250 return a->getColumnDesc()->columnId < b->getColumnDesc()->columnId;
6252 for (
size_t i = 0; i < defaults_buffers.size(); ++i) {
6253 auto cd = defaults_buffers[i]->getColumnDesc();
6254 std::string default_value = cd->
default_value.value_or(
"NULL");
6255 defaults_buffers[i]->add_value(
6258 std::vector<double> coords, bounds;
6259 std::vector<int> ring_sizes, poly_rings;
6261 const bool validate_with_geos_if_available =
false;
6268 validate_with_geos_if_available));
6270 auto next_col = i + 1;
6272 *cat, cd, defaults_buffers, next_col, coords, bounds, ring_sizes, poly_rings);
6278 CHECK(data.size() == defaults_buffers.size());
6279 for (
size_t i = 0; i < defaults_buffers.size(); ++i) {
6280 insert_data.
data.push_back(data[i]);
6281 insert_data.
columnIds.push_back(defaults_buffers[i]->getColumnDesc()->columnId);
6284 return defaults_buffers;
6290 const std::string& copy_from_source,
6293 #ifdef ENABLE_IMPORT_PARQUET
6294 if (!g_enable_legacy_parquet_import) {
6295 return std::make_unique<import_export::ForeignDataImporter>(
6296 copy_from_source, copy_params, td);
6299 throw std::runtime_error(
"Parquet not supported!");
6305 return std::make_unique<import_export::ForeignDataImporter>(
6306 copy_from_source, copy_params, td);
6311 return std::make_unique<import_export::ForeignDataImporter>(
6312 copy_from_source, copy_params, td);
6314 throw std::runtime_error(
6315 "Regex parsed import only supported using 'fsi-regex-import' flag");
6319 return std::make_unique<import_export::Importer>(
6320 catalog, td, copy_from_source, copy_params);
std::lock_guard< T > lock_guard
std::pair< size_t, size_t > ArraySliceRange
const std::list< const ColumnDescriptor * > & get_column_descs() const
bool geo_promoted_type_match(const SQLTypes a, const SQLTypes b)
HOST DEVICE SQLTypes get_subtype() const
bool is_null_datum(const DatumStringType &datum, const std::string &null_indicator)
void set_compression(EncodingType c)
std::map< std::string, size_t > FieldNameToIndexMapType
virtual std::vector< Catalog_Namespace::TableEpochInfo > getTableEpochs() const
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
std::string s3_secret_key
ImportStatus importGDAL(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
std::vector< int > ChunkKey
const SQLTypeInfo & getTypeInfo() const
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
ColumnNotGeoError(const std::string &column_name)
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
HOST DEVICE int get_size() const
void addBigint(const int64_t v)
int8_t * append_datum(int8_t *buf, const Datum &d, const SQLTypeInfo &ti)
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
OptionalStringVector & addStringArray()
void addSmallint(const int16_t v)
class for a per-database catalog. also includes metadata for the current database and the current use...
const NamesAndSQLTypes getBandNamesAndSQLTypes() const
std::vector< std::string > * stringsPtr
std::vector< Geospatial::GDAL::FeatureUqPtr > FeaturePtrVector
static TimeT::rep execution(F func, Args &&...args)
std::vector< ArrayDatum > * arraysPtr
void import_compressed(std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
void detect_row_delimiter()
auto get_filesize(const std::string &file_path)
bool raster_point_compute_angle
static const std::string LOCAL_FILE_STORAGE_TYPE
const int getBandsWidth() const
const NullValue getBandNullValue(const int band_idx) const
int64_t explode_collections_step2(OGRGeometry *ogr_geometry, SQLTypes collection_child_type, const std::string &collection_col_name, size_t row_or_feature_idx, std::function< void(OGRGeometry *)> execute_import_lambda)
bool g_enable_legacy_delimited_import
const TableDescriptor * getTableDesc() const
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column)
bool geo_explode_collections
void dropColumns(const std::vector< int > &columns)
std::vector< std::string > * string_buffer_
void addString(const std::string_view v)
std::vector< ArrayDatum > * array_buffer_
void find_best_sqltypes_and_headers()
std::vector< SQLTypeInfo > getBestColumnTypes() const
StringDictionary * string_dict_
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
bool importGeoFromLonLat(double lon, double lat, std::vector< double > &coords, std::vector< double > &bounds, SQLTypeInfo &ti)
std::optional< std::vector< std::string >> OptionalStringVector
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
void addDouble(const double v)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams ©_params)
Finds the closest possible row beginning in the given buffer.
ArrayDatum NullArray(const SQLTypeInfo &ti)
auto value_getter(const arrow::Array &array, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
std::vector< int16_t > * smallint_buffer_
void validate_sort_options(const FilePathOptions &options)
Datum TDatumToDatum(const TDatum &datum, SQLTypeInfo &ti)
const bool * get_is_array() const
const TableDescriptor * table_desc_
virtual void checkpoint()
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
HOST DEVICE void set_subtype(SQLTypes st)
DEVICE void sort(ARGS &&...args)
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, size_t first_row_index_this_buffer, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
std::vector< bool > is_default
ImportStatus importGDALGeo(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
static std::map< std::string, ImportStatus > import_status_map
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
const uint32_t getNumBands() const
std::vector< SQLTypes > best_sqltypes
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings)
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams ©_params)
std::chrono::duration< size_t, std::milli > elapsed
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRCoordinateTransformation *coordinate_transformation, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const Catalog_Namespace::SessionInfo *session_info, Executor *executor, const MetadataColumnInfos &metadata_column_infos)
std::vector< float > * float_buffer_
bool g_enable_non_kernel_time_query_interrupt
HOST DEVICE SQLTypes get_type() const
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
static bool gdalStatInternal(const std::string &path, const CopyParams ©_params, bool also_dir)
std::string raster_import_dimensions
void getColumns(std::vector< double > &coords) const
ImportStatus import_status_
static bool gdalFileExists(const std::string &path, const CopyParams ©_params)
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string ©_from_source, const import_export::CopyParams ©_params)
std::vector< double > * double_buffer_
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
void addFloat(const float v)
Fragmenter_Namespace::InsertData insert_data_
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string ©_from_source, const import_export::CopyParams ©_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
int tableId
identifies the database into which the data is being inserted
std::shared_lock< T > shared_lock
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
std::string add_metadata_columns
size_t numRows
a vector of column ids for the row(s) being inserted
void set_input_srid(int d)
void addGeoString(const std::string_view v)
ImportHeaderRow has_header
std::optional< int64_t > dateTimeParseOptional< kTIME >(std::string_view str, unsigned const dim)
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
static SQLTypes detect_sqltype(const std::string &str)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
std::optional< std::string > regex_path_filter
int get_physical_cols() const
std::vector< int32_t > * int_buffer_
future< Result > async(Fn &&fn, Args &&...args)
ArrayDatum TDatumToArrayDatum(const TDatum &datum, const SQLTypeInfo &ti)
static const std::string STORAGE_TYPE_KEY
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
CONSTEXPR DEVICE bool is_null(const T &value)
RasterPointType raster_point_type
const DBMetadata & getCurrentDB() const
std::optional< int64_t > dateTimeParseOptional< kDATE >(std::string_view str, unsigned const dim)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
void addBoolean(const int8_t v)
void * checked_malloc(const size_t size)
bool geo_validate_geometry
const int getBandsHeight() const
std::vector< uint8_t > * string_dict_i8_buffer_
void addTinyint(const int8_t v)
std::vector< OptionalStringVector > * string_array_buffer_
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams ©_params)
std::vector< int64_t > * bigint_buffer_
::FILE * fopen(const char *filename, const char *mode)
int8_t * getAsBytes() const
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
void addInt(const int32_t v)
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
bool hasErrorCode(ErrorCode const ec) const
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const NamesAndSQLTypes getPointNamesAndSQLTypes() const
static constexpr bool PROMOTE_LINESTRING_TO_MULTILINESTRING
DEVICE auto accumulate(ARGS &&...args)
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool validate_with_geos_if_available)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams ©_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void parse_string_array(const std::string &s, const import_export::CopyParams ©_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.
Datum StringToDatum(const std::string_view s, SQLTypeInfo &ti)
import_export::SourceType source_type
Catalog_Namespace::Catalog & getCatalog()
void find_best_sqltypes()
std::vector< std::vector< std::string > > raw_rows
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
OUTPUT transform(INPUT const &input, FUNC const &func)
std::optional< int64_t > dateTimeParseOptional< kTIMESTAMP >(std::string_view str, unsigned const dim)
bool isAddingColumns() const
std::vector< EncodingType > best_encodings
std::vector< int8_t > * bool_buffer_
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
std::mutex file_offsets_mutex
std::string toString(const Executor::ExtModuleKinds &kind)
void checkpointWithAutoRollback(const int logical_table_id) const
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
size_t num_import_threads(const int32_t copy_params_threads)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
boost::filesystem::path file_path
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
Datum NullDatum(const SQLTypeInfo &ti)
void set_output_srid(int s)
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
int8_t * getStringDictBuffer() const
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams ©_params)
std::unique_ptr< bool[]> is_array_a
bool try_cast(const std::string &str)
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
size_t g_archive_read_buf_size
void set_comp_param(int p)
std::string get_session_id() const
std::string geo_layer_name
static void set_import_status(const std::string &id, const ImportStatus is)
std::optional< std::string > default_value
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
Detector(const boost::filesystem::path &fp, CopyParams &cp)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams ©_params)
ArrayDatum StringToArray(const std::string &s, const SQLTypeInfo &ti, const CopyParams ©_params)
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
HOST DEVICE EncodingType get_compression() const
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
void getRawPixels(const uint32_t thread_idx, const uint32_t band_idx, const int y_start, const int num_rows, const SQLTypes column_sql_type, RawPixels &raw_pixel_bytes)
std::unique_ptr< OGRSpatialReference, SpatialReferenceDeleter > SpatialReferenceUqPtr
std::vector< int32_t > * string_dict_i32_buffer_
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
std::vector< DataBlockPtr > data
the number of rows being inserted
int64_t convert_decimal_value_to_scale(const int64_t decimal_value, const SQLTypeInfo &type_info, const SQLTypeInfo &new_type_info)
bool g_enable_fsi_regex_import
std::list< const ColumnDescriptor * > column_descs_
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsGeo(const std::string &fileName, const std::string &geoColumnName, const CopyParams ©_params)
static constexpr bool PROMOTE_POINT_TO_MULTIPOINT
static ArrayDatum composeNullPointCoords(const SQLTypeInfo &coords_ti, const SQLTypeInfo &geo_ti)
void addArray(const ArrayDatum &v)
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
std::optional< size_t > g_detect_test_sample_size
Catalog_Namespace::Catalog & getCatalog() const
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
static heavyai::shared_mutex status_mutex
static OGRGeometry * createOGRGeometry(const std::string &wkt_or_wkb_hex, const bool validate_with_geos_if_available)
std::string s3_session_token
int32_t geo_coords_comp_param
std::map< std::string, std::string > ColumnNameToSourceNameMapType
static const size_t kImportRowLimit
std::tuple< int, SQLTypes, std::string > explode_collections_step1(const std::list< const ColumnDescriptor * > &col_descs)
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::chrono::duration< double > timeout
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
const Coords getProjectedPixelCoords(const uint32_t thread_idx, const int y) const
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
int64_t inline_fixed_encoding_null_array_val(const SQL_TYPE_INFO &ti)
const ColumnDescriptor * column_desc_
HOST DEVICE int get_comp_param() const
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
std::chrono::steady_clock::time_point start
std::vector< uint16_t > * string_dict_i16_buffer_
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsRaster(const std::string &fileName, const std::string &geoColumnName, const CopyParams ©_params)
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
#define NULL_ARRAY_DOUBLE
std::vector< std::string > get_headers()
Catalog_Namespace::Catalog & catalog_
std::string raster_import_bands
const CopyParams & get_copy_params() const
std::string QuerySessionId
std::vector< int8_t > * tinyint_buffer_
ImportStatus importGDALRaster(const Catalog_Namespace::SessionInfo *session_info)
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
static ImportStatus get_import_status(const std::string &id)
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
const ColumnDescriptor * getColumnDesc() const
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams ©_params)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams ©_params)
The data to be inserted using the fragment manager.
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
static constexpr size_t MAX_STRLEN
bool is_dict_encoded_string() const
void addDefaultValues(const ColumnDescriptor *cd, size_t num_rows)
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
static constexpr ExecutorId UNITARY_EXECUTOR_ID
std::vector< OptionalStringVector > * getStringArrayBuffer() const
std::string error_context(const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
LoadCallbackType load_callback_
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
std::shared_timed_mutex shared_mutex
HOST DEVICE bool get_notnull() const
SQLTypes get_type_for_datum(const SQLTypeInfo &ti)
static constexpr char const * FOREIGN_TABLE
HOST DEVICE size_t get_max_strlen() const
static DataSourceUqPtr openDataSource(const std::string &name, const import_export::SourceType source_type)
void validate(T value) const
Datum NullArrayDatum(SQLTypeInfo &ti)
static constexpr size_t kDefaultSampleRowsCount
std::string s3_access_key
SQLTypeInfo get_elem_type() const
std::vector< int > columnIds
identifies the table into which the data is being inserted
size_t file_size(const int fd)
void detect(const std::string &file_name, const std::string &specified_band_names, const std::string &specified_band_dimensions, const PointType point_type, const PointTransform point_transform, const bool point_compute_angle, const bool throw_on_error, const MetadataColumnInfos &metadata_column_infos)
void addBinaryStringArray(const TDatum &datum, std::vector< std::string > &string_vec)
RasterPointTransform raster_point_transform
static const std::string trim_space(const char *field, const size_t len)
std::vector< size_t > file_offsets
#define SHARD_FOR_KEY(key, num_shards)
std::optional< std::string > file_sort_order_by
void import(size_t &max_threads, const bool max_threads_using_default)
DEVICE void swap(ARGS &&...args)
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams ©_params, const bool check_not_null=true)
size_t g_max_import_threads
void arrow_throw_if(const bool cond, const std::string &message)
std::map< int, StringDictionary * > dict_map_
std::vector< std::string > * geo_string_buffer_
std::chrono::steady_clock::time_point end
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
static const std::string S3_STORAGE_TYPE
heavyai::shared_mutex import_mutex_
static constexpr int kMaxRasterScanlinesPerThread
HOST DEVICE int get_output_srid() const
std::optional< std::string > file_sort_regex
std::unique_ptr< Loader > loader
void addDictEncodedString(const std::vector< std::string > &string_vec)
static void parse_url(const std::string url, std::map< int, std::string > &url_parts)
const std::list< const ColumnDescriptor * > & get_column_descs() const
const std::string file_path
std::vector< MetadataColumnInfo > MetadataColumnInfos
EncodingType geo_coords_encoding
virtual void setTableEpochs(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
SQLTypes string_dict_to_int_type(const SQLTypeInfo &ti)
HOST DEVICE void set_type(SQLTypes t)