25 #include <tbb/parallel_for.h>
26 #include <tbb/parallel_reduce.h>
27 #include <tbb/parallel_sort.h>
32 return (end.
time - start.
time) / 1000000000;
49 template <
typename I,
typename S>
55 const int64_t min_dwell_points,
56 const int64_t min_dwell_seconds,
57 const int64_t max_inactive_seconds,
69 const I id_null_val = inline_null_value<I>();
70 const S site_id_null_val = inline_null_value<S>();
72 const int32_t num_input_rows = input_id.
size();
78 if (num_input_rows == 0) {
82 std::vector<int32_t> permutation_idxs(num_input_rows);
84 auto permute_creation_timer =
DEBUG_TIMER(
"Create permutation index");
86 [&](
const tbb::blocked_range<int32_t>& r) {
87 const int32_t r_end = r.end();
88 for (int32_t p = r.begin(); p < r_end; ++p) {
89 permutation_idxs[p] = p;
95 auto permute_sort_timer =
DEBUG_TIMER(
"Sort permutation index");
97 tbb::parallel_sort(permutation_idxs.begin(),
98 permutation_idxs.begin() + num_input_rows,
99 [&](
const int32_t&
a,
const int32_t& b) {
100 return input_id[
a] == input_id[b] ? input_ts[
a] < input_ts[b]
101 : input_id[
a] < input_id[b];
105 constexpr int64_t target_rows_per_thread{20000};
107 std::thread::hardware_concurrency(), num_input_rows, target_rows_per_thread);
108 CHECK_GT(thread_info.num_threads, 0);
109 std::vector<int32_t> per_thread_session_counts(thread_info.num_threads, 0);
110 std::vector<std::pair<int32_t, int32_t>> per_thread_actual_idx_offsets(
111 thread_info.num_threads);
112 std::vector<std::future<void>> worker_threads;
113 int32_t start_row_idx = 0;
117 auto pre_flight_dwell_count_timer =
DEBUG_TIMER(
"Pre-flight Dwell Count");
118 for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
119 const int32_t end_row_idx =
120 std::min(start_row_idx + thread_info.num_elems_per_thread,
121 static_cast<int64_t>(num_input_rows));
124 [&, min_dwell_points, min_dwell_seconds, num_input_rows, max_inactive_seconds](
125 const auto start_idx,
const auto end_idx,
const auto thread_idx) {
126 int32_t thread_session_count = per_thread_session_counts[thread_idx];
128 int32_t first_valid_idx = start_idx;
131 I first_id = input_id[permutation_idxs[first_valid_idx]];
132 for (; first_valid_idx < end_idx; ++first_valid_idx) {
133 const int32_t permuted_idx = permutation_idxs[first_valid_idx];
134 if (!input_id.
isNull(permuted_idx) &&
135 input_id[permuted_idx] != first_id) {
140 per_thread_actual_idx_offsets[thread_idx].first = first_valid_idx;
142 auto last_id = input_id[permutation_idxs[first_valid_idx]];
143 auto last_site_id = input_site_id[permutation_idxs[first_valid_idx]];
145 int32_t i = first_valid_idx;
146 int32_t session_num_points = 0;
147 auto session_start_ts = input_ts[permutation_idxs[i]];
148 auto last_ts = input_ts[permutation_idxs[i]];
149 for (; i < end_idx; ++i) {
150 const auto permuted_idx = permutation_idxs[i];
151 const auto&
id = input_id[permuted_idx];
152 const auto& site_id = input_site_id[permuted_idx];
153 const auto& current_ts = input_ts[permuted_idx];
154 if (
id != last_id || site_id != last_site_id ||
155 get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
156 if (last_id != id_null_val && last_site_id != site_id_null_val) {
157 if (session_num_points >= min_dwell_points &&
158 get_elapsed_seconds(session_start_ts, last_ts) >=
160 thread_session_count++;
163 session_num_points = 1;
164 session_start_ts = current_ts;
166 session_num_points++;
170 last_site_id = site_id;
171 last_ts = current_ts;
176 if (end_idx < num_input_rows) {
177 const int32_t max_transitions = (input_id[permutation_idxs[end_idx]] !=
178 input_id[permutation_idxs[end_idx - 1]])
181 int32_t transition_count = 0;
182 for (; i < num_input_rows; ++i) {
183 const auto permuted_idx = permutation_idxs[i];
184 const auto&
id = input_id[permuted_idx];
185 const auto& site_id = input_site_id[permuted_idx];
186 const auto& current_ts = input_ts[permuted_idx];
188 if (
id != last_id || site_id != last_site_id ||
189 get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
192 if (transition_count == max_transitions) {
196 if (last_id != id_null_val && last_site_id != site_id_null_val) {
197 if (session_num_points >= min_dwell_points &&
198 get_elapsed_seconds(session_start_ts, last_ts) >=
200 thread_session_count++;
204 last_site_id = site_id;
205 session_num_points = 1;
206 session_start_ts = current_ts;
208 session_num_points++;
210 last_ts = current_ts;
213 if (last_id != id_null_val && last_site_id != site_id_null_val) {
214 if (session_num_points >= min_dwell_points &&
215 get_elapsed_seconds(session_start_ts, last_ts) >= min_dwell_seconds) {
216 thread_session_count++;
219 per_thread_actual_idx_offsets[thread_idx].second = i;
220 per_thread_session_counts[thread_idx] = thread_session_count;
226 start_row_idx += thread_info.num_elems_per_thread;
229 for (
auto& worker_thread : worker_threads) {
230 worker_thread.wait();
232 worker_threads.clear();
235 std::vector<int32_t> session_counts_prefix_sums(thread_info.num_threads + 1);
236 session_counts_prefix_sums[0] = 0;
237 for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
238 session_counts_prefix_sums[thread_idx + 1] =
239 session_counts_prefix_sums[thread_idx] + per_thread_session_counts[thread_idx];
241 const auto num_output_rows = session_counts_prefix_sums[thread_info.num_threads];
243 if (num_output_rows == 0) {
244 return num_output_rows;
251 for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
252 const int32_t start_row_idx = per_thread_actual_idx_offsets[thread_idx].first;
253 const int32_t end_row_idx = per_thread_actual_idx_offsets[thread_idx].second;
254 const int32_t output_start_offset = session_counts_prefix_sums[thread_idx];
255 const int32_t num_sessions = session_counts_prefix_sums[thread_idx + 1] -
256 session_counts_prefix_sums[thread_idx];
265 &output_start_seq_id,
267 &output_dwell_time_sec,
268 &output_dwell_points,
274 max_inactive_seconds](
const auto start_row_idx,
275 const auto end_row_idx,
276 const auto output_start_offset,
277 const auto num_sessions) {
278 if (!(end_row_idx > start_row_idx)) {
281 int32_t output_offset = output_start_offset;
282 int32_t session_start_seq_id = 1;
283 int32_t session_seq_id = 1;
284 int32_t session_id = 1;
285 auto last_id = input_id[permutation_idxs[start_row_idx]];
286 auto last_site_id = input_site_id[permutation_idxs[start_row_idx]];
287 auto session_start_ts = input_ts[permutation_idxs[start_row_idx]];
288 auto last_ts = input_ts[permutation_idxs[start_row_idx]];
289 for (int32_t idx = start_row_idx; idx < end_row_idx; ++idx) {
290 const auto permuted_idx = permutation_idxs[idx];
291 const auto&
id = input_id[permuted_idx];
292 const auto& site_id = input_site_id[permuted_idx];
293 const auto& current_ts = input_ts[permuted_idx];
294 if (
id != last_id || site_id != last_site_id ||
295 get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
296 if (last_id != id_null_val && last_site_id != site_id_null_val) {
297 const int32_t num_dwell_points = session_seq_id - session_start_seq_id;
298 const int32_t num_dwell_seconds =
299 get_elapsed_seconds(session_start_ts, last_ts);
300 if (num_dwell_points >= min_dwell_points &&
301 num_dwell_seconds >= min_dwell_seconds) {
302 output_id[output_offset] = last_id;
303 output_site_id[output_offset] = last_site_id;
304 output_session_id[output_offset] = session_id++;
305 output_start_seq_id[output_offset] = session_start_seq_id;
306 output_start_ts[output_offset] = session_start_ts;
307 output_dwell_time_sec[output_offset] = num_dwell_seconds;
308 output_dwell_points[output_offset] = num_dwell_points;
312 last_site_id = site_id;
313 session_start_ts = input_ts[permuted_idx];
316 session_start_seq_id = 1;
320 session_start_seq_id = session_seq_id;
324 last_ts = current_ts;
326 if (last_id != id_null_val && last_site_id != site_id_null_val) {
327 const int32_t num_dwell_points = session_seq_id - session_start_seq_id;
328 const int32_t num_dwell_seconds =
329 get_elapsed_seconds(session_start_ts, last_ts);
330 if (num_dwell_points >= min_dwell_points &&
331 num_dwell_seconds >= min_dwell_seconds) {
332 output_id[output_offset] = last_id;
333 output_site_id[output_offset] = last_site_id;
334 output_session_id[output_offset] = session_id++;
335 output_start_seq_id[output_offset] = session_start_seq_id;
336 output_start_ts[output_offset] = session_start_ts;
337 output_dwell_time_sec[output_offset] = num_dwell_seconds;
338 output_dwell_points[output_offset] = num_dwell_points;
342 CHECK_EQ(output_offset - output_start_offset, num_sessions);
350 for (
auto& worker_thread : worker_threads) {
351 worker_thread.wait();
355 output_prev_site_id[0] = site_id_null_val;
356 output_next_site_id[0] = num_output_rows > 1 && output_id[0] == output_id[1]
359 output_prev_site_id[num_output_rows - 1] =
360 num_output_rows > 1 &&
361 output_id[num_output_rows - 1] == output_id[num_output_rows - 2]
362 ? output_site_id[num_output_rows - 2]
364 output_next_site_id[num_output_rows - 1] = site_id_null_val;
366 auto permute_creation_timer =
DEBUG_TIMER(
"Fill lagged and lead site ids");
368 [&](
const tbb::blocked_range<int32_t>& r) {
369 const int32_t r_end = r.end();
370 for (int32_t p = r.begin(); p < r_end; ++p) {
371 output_prev_site_id[p] = output_id[p] == output_id[p - 1]
372 ? output_site_id[p - 1]
374 output_next_site_id[p] = output_id[p] == output_id[p + 1]
375 ? output_site_id[p + 1]
381 return num_output_rows;
384 #endif // #ifdef HAVE_TBB
385 #endif // #ifndef __CUDACC__
void set_output_row_size(int64_t num_rows)
DEVICE int64_t size() const
future< Result > async(Fn &&fn, Args &&...args)
DEVICE bool isNull(int64_t index) const
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define DEBUG_TIMER(name)