20 #include <tbb/parallel_for.h>
23 namespace TableFunctions_Namespace {
29 input_data.
null_val = inline_null_value<T>();
30 for (int64_t c = 0; c < input_features.
numCols(); ++c) {
31 input_data.
col_ptrs.push_back(reinterpret_cast<T*>(input_features.
ptrs_[c]));
46 input_data.
null_val = inline_null_value<T>();
48 for (int64_t c = 0; c < input_features.
numCols(); ++c) {
49 input_data.
col_ptrs.push_back(reinterpret_cast<T*>(input_features.
ptrs_[c]));
64 const auto input_num_rows = input_data.
num_rows;
66 masked_data.
index_map.resize(input_num_rows);
68 const int32_t num_cols = input_data.
col_ptrs.size();
69 int32_t valid_row_count = 0;
72 const auto null_val = input_data.
null_val;
73 constexpr int64_t target_rows_per_thread{20000};
75 std::thread::hardware_concurrency(), input_num_rows, target_rows_per_thread);
78 std::vector<std::vector<int32_t>> per_thread_reverse_index_maps(
80 tbb::task_arena limited_arena(thread_info.
num_threads);
81 limited_arena.execute([&] {
83 tbb::blocked_range<int64_t>(
85 [&](
const tbb::blocked_range<int64_t>& r) {
86 size_t thread_idx = tbb::this_task_arena::current_thread_index();
87 auto& local_reverse_index_map = per_thread_reverse_index_maps[thread_idx];
88 int32_t local_valid_row_count = 0;
89 const int32_t start_idx = r.begin();
90 const int32_t end_idx = r.end();
91 for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
93 for (; col_idx < num_cols; ++col_idx) {
94 if (input_data.
col_ptrs[col_idx][row_idx] == null_val) {
99 if (col_idx == num_cols) {
100 local_reverse_index_map.emplace_back(row_idx);
101 index_map[row_idx] = local_valid_row_count++;
105 tbb::simple_partitioner());
108 for (
const auto& per_thread_reverse_index_map : per_thread_reverse_index_maps) {
109 valid_row_count += per_thread_reverse_index_map.size();
111 masked_data.masked_num_rows = valid_row_count;
113 masked_data.data.resize(num_cols,
nullptr);
115 if (masked_data.masked_num_rows == masked_data.unmasked_num_rows) {
116 for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
117 masked_data.data[col_idx] = input_data.col_ptrs[col_idx];
122 masked_data.reverse_index_map.resize(valid_row_count);
124 int32_t start_reverse_index_offset = 0;
125 std::vector<std::future<void>> worker_threads;
126 for (
const auto& per_thread_reverse_index_map : per_thread_reverse_index_maps) {
127 const int32_t local_reverse_index_map_size = per_thread_reverse_index_map.size();
130 [&reverse_index_map](
const auto& local_map,
131 const int32_t local_map_offset,
132 const int32_t local_map_size) {
133 for (int32_t map_idx = 0; map_idx < local_map_size; ++map_idx) {
134 reverse_index_map[map_idx + local_map_offset] = local_map[map_idx];
137 per_thread_reverse_index_map,
138 start_reverse_index_offset,
139 local_reverse_index_map_size));
140 start_reverse_index_offset += local_reverse_index_map_size;
142 for (
auto& worker_thread : worker_threads) {
143 worker_thread.wait();
145 masked_data.data_allocations.resize(num_cols);
146 for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
147 masked_data.data_allocations[col_idx].resize(valid_row_count);
148 masked_data.data[col_idx] = masked_data.data_allocations[col_idx].data();
151 auto& denulled_data = masked_data.data;
154 [&](
const tbb::blocked_range<int32_t>& r) {
155 const int32_t start_idx = r.begin();
156 const int32_t end_idx = r.end();
157 for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
158 const auto input_row_idx = reverse_index_map[row_idx];
159 for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
160 denulled_data[col_idx][row_idx] =
161 input_data.col_ptrs[col_idx][input_row_idx];
168 template MaskedData<float>
remove_null_rows(
const InputData<float>& input_data);
169 template MaskedData<double>
remove_null_rows(
const InputData<double>& input_data);
171 template <
typename T>
173 const std::vector<int32_t>& reverse_index_map,
175 const int64_t num_unmasked_rows,
180 tbb::parallel_for(tbb::blocked_range<size_t>(0, static_cast<size_t>(num_unmasked_rows)),
181 [&](
const tbb::blocked_range<size_t>& r) {
182 const int32_t start_idx = r.begin();
183 const int32_t end_idx = r.end();
184 for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
185 unmasked_output[row_idx] = null_val;
189 const auto num_masked_rows = reverse_index_map.size();
191 [&](
const tbb::blocked_range<size_t>& r) {
192 const int32_t start_idx = r.begin();
193 const int32_t end_idx = r.end();
194 for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
195 unmasked_output[reverse_index_map[row_idx]] =
196 masked_input[row_idx];
201 template void unmask_data(
const int32_t* masked_input,
202 const std::vector<int32_t>& reverse_index_map,
203 int32_t* unmasked_output,
204 const int64_t num_unmasked_rows,
205 const int32_t null_val);
207 template void unmask_data(
const float* masked_input,
208 const std::vector<int32_t>& reverse_index_map,
209 float* unmasked_output,
210 const int64_t num_unmasked_rows,
211 const float null_val);
213 template void unmask_data(
const double* masked_input,
214 const std::vector<int32_t>& reverse_index_map,
215 double* unmasked_output,
216 const int64_t num_unmasked_rows,
217 const double null_val);
int64_t num_elems_per_thread
DEVICE int64_t size() const
DEVICE int64_t numCols() const
std::vector< int32_t > reverse_index_map
future< Result > async(Fn &&fn, Args &&...args)
int32_t unmasked_num_rows
void unmask_data(const T *masked_input, const std::vector< int32_t > &reverse_index_map, T *unmasked_output, const int64_t num_unmasked_rows, const T null_val)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
constexpr int32_t NULL_ROW_IDX
MaskedData< T > remove_null_rows(const InputData< T > &input_data)
DEVICE int64_t size() const
InputData< T > strip_column_metadata(const ColumnList< T > &input_features)
std::vector< int32_t > index_map