63 MaskedData<T> masked_data;
64 const auto input_num_rows = input_data.num_rows;
65 masked_data.unmasked_num_rows = input_num_rows;
66 masked_data.index_map.resize(input_num_rows);
67 auto& index_map = masked_data.index_map;
68 const int32_t num_cols = input_data.col_ptrs.size();
69 int32_t valid_row_count = 0;
70 masked_data.reverse_index_map.reserve(masked_data.unmasked_num_rows);
71 auto& reverse_index_map = masked_data.reverse_index_map;
72 const auto null_val = input_data.null_val;
73 constexpr int64_t target_rows_per_thread{20000};
75 std::thread::hardware_concurrency(), input_num_rows, target_rows_per_thread);
76 CHECK_GE(thread_info.num_threads, 1L);
77 CHECK_GE(thread_info.num_elems_per_thread, 1L);
78 std::vector<std::vector<int32_t>> per_thread_reverse_index_maps(
79 thread_info.num_threads);
80 tbb::task_arena limited_arena(thread_info.num_threads);
81 limited_arena.execute([&] {
83 tbb::blocked_range<int64_t>(
84 0, input_num_rows, static_cast<int32_t>(thread_info.num_elems_per_thread)),
85 [&](
const tbb::blocked_range<int64_t>& r) {
86 size_t thread_idx = tbb::this_task_arena::current_thread_index();
87 auto& local_reverse_index_map = per_thread_reverse_index_maps[thread_idx];
88 int32_t local_valid_row_count = 0;
89 const int32_t start_idx = r.begin();
90 const int32_t end_idx = r.end();
91 for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
93 for (; col_idx < num_cols; ++col_idx) {
94 if (input_data.col_ptrs[col_idx][row_idx] == null_val) {
99 if (col_idx == num_cols) {
100 local_reverse_index_map.emplace_back(row_idx);
101 index_map[row_idx] = local_valid_row_count++;
105 tbb::simple_partitioner());
108 for (
const auto& per_thread_reverse_index_map : per_thread_reverse_index_maps) {
109 valid_row_count += per_thread_reverse_index_map.size();
111 masked_data.masked_num_rows = valid_row_count;
113 masked_data.data.resize(num_cols,
nullptr);
115 if (masked_data.masked_num_rows == masked_data.unmasked_num_rows) {
116 for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
117 masked_data.data[col_idx] = input_data.col_ptrs[col_idx];
122 masked_data.reverse_index_map.resize(valid_row_count);
124 int32_t start_reverse_index_offset = 0;
125 std::vector<std::future<void>> worker_threads;
126 for (
const auto& per_thread_reverse_index_map : per_thread_reverse_index_maps) {
127 const int32_t local_reverse_index_map_size = per_thread_reverse_index_map.size();
130 [&reverse_index_map](
const auto& local_map,
131 const int32_t local_map_offset,
132 const int32_t local_map_size) {
133 for (int32_t map_idx = 0; map_idx < local_map_size; ++map_idx) {
134 reverse_index_map[map_idx + local_map_offset] = local_map[map_idx];
137 per_thread_reverse_index_map,
138 start_reverse_index_offset,
139 local_reverse_index_map_size));
140 start_reverse_index_offset += local_reverse_index_map_size;
142 for (
auto& worker_thread : worker_threads) {
143 worker_thread.wait();
145 masked_data.data_allocations.resize(num_cols);
146 for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
147 masked_data.data_allocations[col_idx].resize(valid_row_count);
148 masked_data.data[col_idx] = masked_data.data_allocations[col_idx].data();
151 auto& denulled_data = masked_data.data;
154 [&](
const tbb::blocked_range<int32_t>& r) {
155 const int32_t start_idx = r.begin();
156 const int32_t end_idx = r.end();
157 for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
158 const auto input_row_idx = reverse_index_map[row_idx];
159 for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
160 denulled_data[col_idx][row_idx] =
161 input_data.col_ptrs[col_idx][input_row_idx];
future< Result > async(Fn &&fn, Args &&...args)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
constexpr int32_t NULL_ROW_IDX