25 #include <tbb/parallel_for.h>
26 #include <tbb/parallel_sort.h>
30 namespace TableFunctions_Namespace {
32 struct ColumnMetadata {
36 int64_t null_sentinel;
38 : min(std::numeric_limits<int64_t>::max())
39 , max(std::numeric_limits<int64_t>::min())
41 inline uint64_t map_to_compressed_range(
const int64_t val)
const {
42 if (has_nulls && val == null_sentinel) {
48 inline int64_t map_to_uncompressed_range(
const int64_t val)
const {
49 if (has_nulls && val == (max - min + 1)) {
56 ColumnMetadata column_metadata;
58 const int64_t composite_prefix_range;
59 KeyMetadata(
const ColumnMetadata& column_metadata_in,
60 const int64_t composite_prefix_range_in)
61 : column_metadata(column_metadata_in)
62 , range(column_metadata.max - column_metadata.min + 1 +
63 (column_metadata.has_nulls ? 1 : 0))
64 , composite_prefix_range(composite_prefix_range_in) {}
68 inline ColumnMetadata get_integer_column_metadata(
const Column<T>& col) {
69 ColumnMetadata column_metadata;
71 column_metadata.min = col_min;
72 column_metadata.max = col_max;
73 column_metadata.has_nulls = has_nulls;
74 column_metadata.null_sentinel = inline_null_value<T>();
75 return column_metadata;
78 struct CompositeKeyMetadata {
79 std::vector<KeyMetadata> keys_metadata;
84 inline uint64_t map_to_compressed_range(
86 const CompositeKeyMetadata& composite_key_metadata,
89 const uint64_t num_keys = keys.
numCols();
90 for (uint64_t k = 0; k < num_keys; ++k) {
92 composite_key_metadata.keys_metadata[k].column_metadata.map_to_compressed_range(
94 composite_key_metadata.keys_metadata[k].composite_prefix_range;
100 inline uint64_t map_to_compressed_range_separate_nulls(
102 const CompositeKeyMetadata& composite_key_metadata,
104 const uint64_t separated_null_val) {
106 const uint64_t num_keys = keys.
numCols();
107 for (uint64_t k = 0; k < num_keys; ++k) {
108 const K key = keys[k][idx];
109 if (key == composite_key_metadata.keys_metadata[k].column_metadata.null_sentinel) {
110 return separated_null_val;
113 composite_key_metadata.keys_metadata[k].column_metadata.map_to_compressed_range(
115 composite_key_metadata.keys_metadata[k].composite_prefix_range;
120 template <
typename K>
121 inline uint64_t map_to_compressed_range(
123 const CompositeKeyMetadata& composite_key_metadata,
125 return composite_key_metadata.keys_metadata[0].column_metadata.map_to_compressed_range(
129 template <
typename K>
130 inline uint64_t map_to_compressed_range(
132 const CompositeKeyMetadata& composite_key_metadata) {
133 return composite_key_metadata.keys_metadata[0].column_metadata.map_to_compressed_range(
137 template <
typename K>
138 inline CompositeKeyMetadata getCompositeKeyMetadata(
const ColumnList<K>& keys) {
139 CompositeKeyMetadata composite_key_metadata;
140 const size_t num_keys = keys.
numCols();
141 composite_key_metadata.num_keys = 1;
142 for (
size_t k = 0; k != num_keys; ++k) {
143 composite_key_metadata.keys_metadata.emplace_back(
144 get_integer_column_metadata(keys[k]), composite_key_metadata.num_keys);
145 composite_key_metadata.num_keys *= composite_key_metadata.keys_metadata[k].range;
147 return composite_key_metadata;
150 template <
typename K>
151 inline CompositeKeyMetadata getCompositeKeyMetadata(
const Column<K>& key) {
152 CompositeKeyMetadata composite_key_metadata;
153 composite_key_metadata.keys_metadata.emplace_back(get_integer_column_metadata(key), 1);
154 composite_key_metadata.num_keys = composite_key_metadata.keys_metadata[0].range;
155 return composite_key_metadata;
158 inline CompositeKeyMetadata unionCompositeKeyMetadata(
159 const CompositeKeyMetadata& composite_key_metadata1,
160 const CompositeKeyMetadata& composite_key_metadata2) {
162 CHECK_EQ(composite_key_metadata1.keys_metadata.size(),
163 composite_key_metadata2.keys_metadata.size());
164 const size_t num_keys = composite_key_metadata1.keys_metadata.size();
165 CompositeKeyMetadata unioned_composite_key_metadata;
166 unioned_composite_key_metadata.num_keys = 1;
167 for (
size_t k = 0; k != num_keys; ++k) {
168 const KeyMetadata& key_metadata1 = composite_key_metadata1.keys_metadata[k];
169 const KeyMetadata& key_metadata2 = composite_key_metadata2.keys_metadata[k];
170 ColumnMetadata unioned_column_metadata;
171 unioned_column_metadata.min =
172 std::min(key_metadata1.column_metadata.min, key_metadata2.column_metadata.min);
173 unioned_column_metadata.max =
174 std::max(key_metadata1.column_metadata.max, key_metadata2.column_metadata.max);
175 unioned_column_metadata.has_nulls = key_metadata1.column_metadata.has_nulls ||
176 key_metadata2.column_metadata.has_nulls;
179 unioned_composite_key_metadata.keys_metadata.emplace_back(
180 unioned_column_metadata, unioned_composite_key_metadata.num_keys);
181 unioned_composite_key_metadata.num_keys *=
182 unioned_composite_key_metadata.keys_metadata[k].range;
184 return unioned_composite_key_metadata;
187 inline void copyCompositeKeyMetadataNulls(CompositeKeyMetadata& dest_metadata,
188 const CompositeKeyMetadata& src_metadata) {
189 CHECK_EQ(dest_metadata.keys_metadata.size(), src_metadata.keys_metadata.size());
190 for (
size_t k = 0; k != dest_metadata.keys_metadata.size(); ++k) {
191 dest_metadata.keys_metadata[k].column_metadata.null_sentinel =
192 src_metadata.keys_metadata[k].column_metadata.null_sentinel;
196 template <
typename U,
typename S>
197 struct SparseVector {
198 std::vector<U> row_indices;
200 SparseVector(
const size_t size) : row_indices(size), data(size) {}
203 template <
typename U,
typename S>
204 struct SparseMatrixCsc {
205 std::vector<uint64_t> col_offsets;
206 std::vector<U> col_values;
207 std::vector<U> row_indices;
209 SparseMatrixCsc(
const size_t size) : row_indices(size), data(size) {}
212 template <
typename M>
214 const uint64_t num_rows;
215 const uint64_t num_cols;
218 DenseMatrix(
const uint64_t n_rows,
const uint64_t n_cols)
219 : num_rows(n_rows), num_cols(n_cols), data(num_rows * num_cols, 0) {}
221 inline M
get(
const uint64_t r,
const uint64_t c)
const {
222 return data[c * num_cols + r];
224 inline void set(
const uint64_t r,
const uint64_t c,
const M val) {
225 data[c * num_cols + r] = val;
228 template <
typename W>
229 std::string to_string_with_precision(
const W
field,
const size_t field_width)
const {
230 std::ostringstream out;
231 out.precision(field_width);
232 out << std::fixed <<
field;
236 template <
typename W>
237 void print_field(
const W field,
const size_t field_width)
const {
239 const std::string string_field =
240 to_string_with_precision(field, field_width >= 9 ? field_width - 4 : 5);
241 std::cout << std::string(
242 std::max(field_width - string_field.size(),
static_cast<size_t>(0)),
247 void print(
const uint64_t min_r,
248 const uint64_t max_r,
249 const uint64_t min_c,
250 const uint64_t max_c)
const {
251 const uint64_t safe_max_r = std::max(std::min(max_r, num_rows - 1), min_r);
252 const uint64_t safe_max_c = std::max(std::min(max_c, num_cols - 1), min_r);
253 const size_t field_width{10};
255 std::cout << std::endl << std::endl << std::string(field_width,
' ');
256 for (uint64_t c = min_c; c <= safe_max_c; c++) {
257 print_field(c, field_width);
259 for (uint64_t r = min_r; r <= safe_max_r; r++) {
260 std::cout << std::endl;
261 print_field(r, field_width);
262 for (uint64_t c = min_c; c <= safe_max_c; c++) {
263 const M field =
get(r, c);
264 print_field(field, field_width);
267 std::cout << std::endl << std::endl;
271 template <
typename F,
typename M,
typename U,
typename S>
272 SparseVector<U, S> pivot_table_to_sparse_vector(
275 const CompositeKeyMetadata& key_metadata) {
276 const uint64_t num_rows = key_col.
size();
277 std::vector<U> compressed_keys(num_rows);
278 std::vector<uint64_t> sort_permutation(num_rows);
280 const size_t loop_grain_size{4096};
281 const U separated_null_val = std::numeric_limits<U>::max();
283 [&](
const tbb::blocked_range<uint64_t>& i) {
284 for (uint64_t r = i.begin(); r != i.end(); ++r) {
285 compressed_keys[r] = map_to_compressed_range_separate_nulls(
286 key_col, key_metadata, r, separated_null_val);
287 sort_permutation[r] = r;
291 tbb::parallel_sort(sort_permutation.begin(),
292 sort_permutation.end(),
293 [&](
const uint64_t&
a,
const uint64_t& b) {
294 return (compressed_keys[a] < compressed_keys[b]);
298 uint64_t num_nulls = 0;
299 for (; num_nulls < num_rows; num_nulls++) {
300 if (compressed_keys[sort_permutation[num_rows - num_nulls - 1]] !=
301 separated_null_val) {
306 const uint64_t num_non_null_rows = num_rows - num_nulls;
308 SparseVector<U, S> sparse_vector(num_non_null_rows);
309 tbb::parallel_for(tbb::blocked_range<uint64_t>(0, num_non_null_rows, loop_grain_size),
310 [&](
const tbb::blocked_range<uint64_t>& i) {
311 const uint64_t i_end{i.end()};
312 for (uint64_t r = i.begin(); r != i_end; ++r) {
313 const uint64_t permute_idx = sort_permutation[r];
314 sparse_vector.row_indices[r] = compressed_keys[permute_idx];
315 sparse_vector.data[r] = metric_col[permute_idx];
319 return sparse_vector;
340 template <
typename K,
typename F,
typename M,
typename U,
typename S>
341 SparseMatrixCsc<U, S> pivot_table_to_sparse_csc_matrix(
345 const CompositeKeyMetadata& primary_key_metadata,
346 const CompositeKeyMetadata& secondary_key_metadata) {
347 const uint64_t num_rows = primary_key_col.
size();
348 std::vector<U> compressed_primary_keys(num_rows);
349 std::vector<U> compressed_secondary_keys(num_rows);
350 std::vector<uint64_t> sort_permutation(num_rows);
352 const size_t loop_grain_size{4096};
354 [&](
const tbb::blocked_range<uint64_t>& i) {
355 for (uint64_t r = i.begin(); r != i.end(); ++r) {
356 compressed_primary_keys[r] = map_to_compressed_range(
357 primary_key_col, primary_key_metadata, r);
358 compressed_secondary_keys[r] = map_to_compressed_range(
359 secondary_key_cols, secondary_key_metadata, r);
360 sort_permutation[r] = r;
365 sort_permutation.begin(),
366 sort_permutation.end(),
367 [&](
const uint64_t&
a,
const uint64_t& b) {
368 if (compressed_primary_keys[a] < compressed_primary_keys[b]) {
370 }
else if (compressed_primary_keys[a] > compressed_primary_keys[b]) {
373 return (compressed_secondary_keys[a] < compressed_secondary_keys[b]);
376 SparseMatrixCsc<U, S> sparse_matrix_csc(num_rows);
378 [&](
const tbb::blocked_range<uint64_t>& i) {
379 const uint64_t i_end{i.end()};
380 for (uint64_t r = i.begin(); r != i_end; ++r) {
381 const uint64_t permute_idx = sort_permutation[r];
382 sparse_matrix_csc.row_indices[r] =
383 compressed_secondary_keys[permute_idx];
384 sparse_matrix_csc.data[r] = metric_col[permute_idx];
388 U last_primary_key_compressed = std::numeric_limits<U>::max();
389 for (uint64_t r = 0; r < num_rows; ++r) {
390 const U primary_key_compressed = compressed_primary_keys[sort_permutation[r]];
391 if (primary_key_compressed != last_primary_key_compressed) {
392 sparse_matrix_csc.col_offsets.emplace_back(r);
393 sparse_matrix_csc.col_values.emplace_back(primary_key_compressed);
394 last_primary_key_compressed = primary_key_compressed;
397 sparse_matrix_csc.col_offsets.emplace_back(num_rows);
399 return sparse_matrix_csc;
402 template <
typename U,
typename S>
403 std::vector<double> idf_normalize(SparseMatrixCsc<U, S>& sparse_matrix,
404 const U num_secondary_keys) {
405 const U num_primary_keys = sparse_matrix.col_values.size();
406 const uint64_t num_rows = sparse_matrix.data.size();
407 std::vector<double> secondary_key_idf(num_secondary_keys, 0.);
409 for (uint64_t r = 0; r < num_rows; ++r) {
410 secondary_key_idf[sparse_matrix.row_indices[r]] +=
411 (sparse_matrix.data[r] > 0.001 ? 1 : 0);
413 const size_t loop_grain_size{1000};
416 [&](
const tbb::blocked_range<U>& i) {
417 for (U k = i.begin(); k != i.end(); ++k) {
418 secondary_key_idf[k] =
419 log((num_primary_keys + 1.0) / secondary_key_idf[k]) + 1.;
424 [&](
const tbb::blocked_range<uint64_t>& i) {
425 for (uint64_t r = i.begin(); r != i.end(); ++r) {
426 sparse_matrix.data[r] *=
427 secondary_key_idf[sparse_matrix.row_indices[r]];
430 return secondary_key_idf;
433 template <
typename U,
typename S>
434 std::vector<S> get_matrix_col_mags(
const SparseMatrixCsc<U, S>& sparse_matrix) {
435 const U num_cols = sparse_matrix.col_values.size();
436 std::vector<S> column_mags(num_cols);
438 tbb::blocked_range<U>(0, num_cols), [&](
const tbb::blocked_range<U>& c) {
439 const U c_end = c.end();
440 for (U col_idx = c.begin(); col_idx != c_end; ++col_idx) {
441 const U end_col_offset = sparse_matrix.col_offsets[col_idx + 1];
443 for (U col_pos = sparse_matrix.col_offsets[col_idx]; col_pos < end_col_offset;
445 column_mag_sq += sparse_matrix.data[col_pos] * sparse_matrix.data[col_pos];
447 column_mags[col_idx] = sqrt(column_mag_sq);
453 template <
typename U,
typename S>
454 S get_vector_mag(
const SparseVector<U, S>& sparse_vector) {
455 const U num_vals = sparse_vector.row_indices.size();
457 for (U row_idx = 0; row_idx != num_vals; ++row_idx) {
458 vec_mag_sq += sparse_vector.data[row_idx] * sparse_vector.data[row_idx];
460 return sqrt(vec_mag_sq);
463 template <
typename U,
typename S>
464 std::vector<S> multiply_matrix_by_vector(
const SparseMatrixCsc<U, S>& sparse_matrix,
465 const SparseVector<U, S>& sparse_vector,
466 const bool unit_normalize) {
467 const U num_cols = sparse_matrix.col_values.size();
468 std::vector<S> dot_product_vec(num_cols);
469 const uint64_t vec_length = sparse_vector.row_indices.size();
471 tbb::blocked_range<U>(0, num_cols), [&](
const tbb::blocked_range<U>& c) {
472 const uint64_t c_end = c.end();
473 for (U col_idx = c.begin(); col_idx != c_end; ++col_idx) {
474 const U matrix_start_col_offset = sparse_matrix.col_offsets[col_idx];
475 const U matrix_end_col_offset = sparse_matrix.col_offsets[col_idx + 1];
477 U m_pos = matrix_start_col_offset;
479 while (m_pos < matrix_end_col_offset && v_pos < vec_length) {
480 const U m_row_index = sparse_matrix.row_indices[m_pos];
481 const U v_row_index = sparse_vector.row_indices[v_pos];
482 if (m_row_index < v_row_index) {
484 }
else if (v_row_index < m_row_index) {
487 dot_product += sparse_matrix.data[m_pos++] * sparse_vector.data[v_pos++];
490 dot_product_vec[col_idx] = dot_product;
493 if (unit_normalize) {
494 const std::vector<S> matrix_mags = get_matrix_col_mags(sparse_matrix);
495 const S vector_mag = get_vector_mag(sparse_vector);
496 for (U c = 0; c != num_cols; ++c) {
497 const S co_mag = matrix_mags[c] * vector_mag;
499 dot_product_vec[c] /= co_mag;
503 return dot_product_vec;
506 template <
typename U,
typename S>
507 DenseMatrix<S> multiply_matrix_by_transpose(
const SparseMatrixCsc<U, S>& sparse_matrix,
508 const bool unit_normalize) {
509 const U num_cols = sparse_matrix.col_values.size();
510 const U num_non_zero_entries = sparse_matrix.data.size();
511 const U avg_non_zero_entries_per_col =
512 std::max(num_non_zero_entries / num_cols, static_cast<U>(1));
513 const U cache_size = 1 << 21;
514 const double bytes_per_entry =
515 sizeof(U) +
sizeof(S) +
516 (
sizeof(U) +
sizeof(uint64_t)) *
517 (sparse_matrix.col_offsets.size() / sparse_matrix.row_indices.size());
518 const U cols_per_partition =
519 cache_size / (avg_non_zero_entries_per_col * bytes_per_entry) + 1;
520 const U num_partitions = (num_cols + cols_per_partition - 1) / cols_per_partition;
521 const U num_mn_partitions = num_partitions * num_partitions;
523 DenseMatrix<S> similarity_matrix(num_cols, num_cols);
525 tbb::blocked_range<U>(0, num_mn_partitions), [&](
const tbb::blocked_range<U>& p) {
526 for (U mn_part_idx = p.begin(); mn_part_idx != p.end(); ++mn_part_idx) {
527 const U m_p = mn_part_idx / num_partitions;
528 const U n_p = mn_part_idx % num_partitions;
532 const U n_start = n_p * cols_per_partition;
533 const U n_end = std::min((n_p + 1) * cols_per_partition, num_cols);
534 const U m_start = m_p * cols_per_partition;
535 const U m_block_end = std::min((m_p + 1) * cols_per_partition, num_cols);
536 for (U
n = n_start;
n < n_end; ++
n) {
537 const U m_end = std::min(m_block_end,
n + 1);
538 for (U m = m_start; m < m_end; ++m) {
540 const U n_pos_end = sparse_matrix.col_offsets[
n + 1];
541 const U m_pos_end = sparse_matrix.col_offsets[m + 1];
542 U m_pos = sparse_matrix.col_offsets[m];
543 U n_pos = sparse_matrix.col_offsets[
n];
544 while (m_pos < m_pos_end && n_pos < n_pos_end) {
545 const U m_row_index = sparse_matrix.row_indices[m_pos];
546 const U n_row_index = sparse_matrix.row_indices[n_pos];
547 if (m_row_index < n_row_index) {
549 }
else if (n_row_index < m_row_index) {
553 sparse_matrix.data[m_pos++] * sparse_matrix.data[n_pos++];
556 similarity_matrix.set(m,
n, dot_product);
562 if (unit_normalize) {
563 std::vector<S> inv_norms(similarity_matrix.num_cols);
564 for (U c = 0; c != similarity_matrix.num_cols; ++c) {
565 const S col_length = similarity_matrix.get(c, c);
566 const S col_length_sqrt = sqrt(col_length);
567 inv_norms[c] = col_length_sqrt > 0 ? 1.0 / col_length_sqrt : 0;
570 for (U c = 0; c != similarity_matrix.num_cols; ++c) {
571 const S inv_norm_c = inv_norms[c];
572 const U max_row = c + 1;
573 for (U r = 0; r != max_row; ++r) {
574 similarity_matrix.set(
575 r, c, similarity_matrix.get(r, c) * inv_norms[r] * inv_norm_c);
580 return similarity_matrix;
585 #endif // #ifdef HAVE_TBB
586 #endif // #ifndef __CUDACC__
DEVICE int64_t size() const
DEVICE int64_t numCols() const
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
NEVER_INLINE HOST std::tuple< T, T, bool > get_column_metadata(const Column< T > &col)
bool g_enable_watchdog false
DEVICE int64_t size() const