OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExecutorResourceMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <iostream>
18 #include <thread>
19 
20 #include "ExecutorResourceMgr.h"
21 #include "Logger/Logger.h"
22 
23 namespace ExecutorResourceMgr_Namespace {
24 
26  const std::vector<std::pair<ResourceType, size_t>>& total_resources,
27  const std::vector<ConcurrentResourceGrantPolicy>& concurrent_resource_grant_policies,
28  const std::vector<ResourceGrantPolicy>& max_per_request_resource_grant_policies,
29  const double max_available_resource_use_ratio)
30  : executor_resource_pool_(total_resources,
31  concurrent_resource_grant_policies,
32  max_per_request_resource_grant_policies)
33  , max_available_resource_use_ratio_(max_available_resource_use_ratio) {
37  LOG(EXECUTOR) << "Executor Resource Manager queue proccessing thread started";
38 }
39 
42 }
43 
44 std::unique_ptr<ExecutorResourceHandle>
46  const size_t timeout_in_ms) {
47  std::pair<ResourceGrant, ResourceGrant> min_max_resource_grants;
48 
49  // Following can throw
50  // Should we put in stats to track errors?
51  min_max_resource_grants =
53 
54  const auto request_id = enqueue_request(request_info,
55  timeout_in_ms,
56  min_max_resource_grants.first,
57  min_max_resource_grants.second);
58 
60  std::unique_lock<std::mutex> print_lock(print_mutex_);
61  std::cout << std::endl << "Min resource grant";
62  min_max_resource_grants.first.print();
63  std::cout << std::endl << "Max resource grant";
64  min_max_resource_grants.second.print();
65  }
66 
68  processor_queue_condition_.notify_one();
69 
70  // Following queue_request methods will block until ExecutorResourceMgr lets them
71  // execute
72  if (timeout_in_ms > 0) {
73  try {
75  timeout_in_ms);
76  } catch (QueryTimedOutWaitingInQueue& timeout_exception) {
77  // Need to annotate request and executor stats accordingly
79  throw;
80  }
81  } else {
83  }
84 
85  auto this_ptr = shared_from_this();
86  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
87  RequestStats const& request_stats = requests_stats_[request_id];
88  if (request_stats.error) {
89  throw std::runtime_error("RequestStats error: " + *request_stats.error);
90  }
91  const ResourceGrant& actual_resource_grant = request_stats.actual_resource_grant;
92  // Ensure each resource granted was at least the minimum requested
93  CHECK_GE(actual_resource_grant.cpu_slots, min_max_resource_grants.first.cpu_slots);
94  CHECK_GE(actual_resource_grant.gpu_slots, min_max_resource_grants.first.gpu_slots);
95  CHECK_GE(actual_resource_grant.cpu_result_mem,
96  min_max_resource_grants.first.cpu_result_mem);
97  return std::make_unique<ExecutorResourceHandle>(
98  this_ptr, request_id, actual_resource_grant);
99 }
100 
101 std::unique_ptr<ExecutorResourceHandle> ExecutorResourceMgr::request_resources(
102  const RequestInfo& request_info) {
104  request_info,
105  static_cast<size_t>(0)); // 0 signifies no timeout
106 }
107 
109  const ResourceGrant& resource_grant) {
110  if (!resource_grant.is_empty()) { // Should only be empty if request times out, should
111  // we CHECK for this
112  const auto chunk_request_info = get_chunk_request_info(request_id);
113  executor_resource_pool_.deallocate_resources(resource_grant, chunk_request_info);
114  }
115  mark_request_finished(request_id);
117  processor_queue_condition_.notify_one();
118 }
119 
121  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
122  CHECK_LT(request_id, requests_stats_.size());
123  return requests_stats_[request_id];
124 }
125 
127  std::string error_msg) {
128  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
129  CHECK_LT(request_id, requests_stats_.size());
130  requests_stats_[request_id].error = std::move(error_msg);
131 }
132 
134  const auto request_ids = get_requests_for_stage(ExecutionRequestStage::QUEUED);
135  LOG(EXECUTOR) << "ExecutorResourceMgr Queue Itr: " << process_queue_counter_ - 1
136  << " Queued requests: " << request_ids.size();
137  std::unique_lock<std::shared_mutex> queue_stats_lock(queue_stats_mutex_);
138  for (const auto request_id : request_ids) {
139  auto& request_stats = requests_stats_[request_id];
140  try {
141  const auto actual_resource_grant =
143  request_stats.min_resource_grant,
144  request_stats.max_resource_grant,
145  request_stats.request_info.chunk_request_info,
147  // boolean sentinel first member of returned pair says whether
148  // a resource grant was able to be made at all
149  if (actual_resource_grant.first) {
150  request_stats.actual_resource_grant = actual_resource_grant.second;
151  LOG(EXECUTOR) << "ExecutorResourceMgr Queue chosen request ID: " << request_id
152  << " from " << request_ids.size() << " queued requests.";
153  LOG(EXECUTOR) << "Request grant: " << actual_resource_grant.second.to_string();
155  std::unique_lock<std::mutex> print_lock(print_mutex_);
156  std::cout << std::endl << "Actual grant";
157  actual_resource_grant.second.print();
158  }
159  return request_id;
160  }
161  } catch (std::runtime_error const& e) {
162  throw ExecutorResourceMgrError(request_id, e.what());
163  }
164  }
165  return INVALID_REQUEST_ID;
166 }
167 
169  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
170  return executor_stats_; // Will make copy
171 }
172 
174  // Get atomic copy of executor_stats_ first
175  const auto executor_stats = get_executor_stats();
176  std::unique_lock<std::mutex> print_lock(print_mutex_);
177  std::cout << std::endl << "Executor Stats" << std::endl;
178  std::cout << "Requests: " << executor_stats.requests << std::endl;
179  std::cout << "CPU Requests: " << executor_stats.cpu_requests << std::endl;
180  std::cout << "GPU Requests: " << executor_stats.gpu_requests << std::endl;
181  std::cout << "Queue Length: " << executor_stats.queue_length << std::endl;
182  std::cout << "CPU Queue Length: " << executor_stats.cpu_queue_length << std::endl;
183  std::cout << "GPU Queue Length: " << executor_stats.gpu_queue_length << std::endl;
184  std::cout << "Total Queue Time(ms): " << executor_stats.total_queue_time_ms
185  << std::endl;
186  std::cout << "Total CPU Queue Time(ms): " << executor_stats.total_cpu_queue_time_ms
187  << std::endl;
188  std::cout << "Total GPU Queue Time(ms): " << executor_stats.total_gpu_queue_time_ms
189  << std::endl;
190  std::cout << "Requests Actually Queued: " << executor_stats.requests_actually_queued
191  << std::endl;
192  std::cout << "Requests Executing: " << executor_stats.requests_executing << std::endl;
193  std::cout << "Requests Executed: " << executor_stats.requests_executed << std::endl;
194  std::cout << "Total Execution Time(ms): " << executor_stats.total_execution_time_ms
195  << std::endl;
196  std::cout << "Total CPU Execution Time(ms): "
197  << executor_stats.total_cpu_execution_time_ms << std::endl;
198  std::cout << "Total GPU Execution Time(ms): "
199  << executor_stats.total_gpu_execution_time_ms << std::endl;
200  std::cout << "Total Time(ms): " << executor_stats.total_time_ms << std::endl;
201  std::cout << "Total CPU Time(ms): " << executor_stats.total_cpu_time_ms << std::endl;
202  std::cout << "Total GPU Time(ms): " << executor_stats.total_gpu_time_ms << std::endl;
203 
204  // Below technically not thread safe, but called from process_queue_loop for now so ok
205 
206  const double avg_execution_time_ms =
207  executor_stats.total_execution_time_ms /
208  std::max(executor_stats.requests_executed, size_t(1));
209  const double avg_cpu_execution_time_ms =
210  executor_stats.total_cpu_execution_time_ms /
211  std::max(executor_stats.cpu_requests_executed, size_t(1));
212  const double avg_gpu_execution_time_ms =
213  executor_stats.total_gpu_execution_time_ms /
214  std::max(executor_stats.gpu_requests_executed, size_t(1));
215  const double avg_total_time_ms = executor_stats.total_time_ms /
216  std::max(executor_stats.requests_executed, size_t(1));
217  const double avg_cpu_total_time_ms =
218  executor_stats.total_cpu_time_ms /
219  std::max(executor_stats.cpu_requests_executed, size_t(1));
220  const double avg_gpu_total_time_ms =
221  executor_stats.total_gpu_time_ms /
222  std::max(executor_stats.gpu_requests_executed, size_t(1));
223 
224  std::cout << "Avg Execution Time(ms): " << avg_execution_time_ms << std::endl;
225  std::cout << "Avg CPU Execution Time(ms): " << avg_cpu_execution_time_ms << std::endl;
226  std::cout << "Avg GPU Execution Time(ms): " << avg_gpu_execution_time_ms << std::endl;
227 
228  std::cout << "Avg Total Time(ms): " << avg_total_time_ms << std::endl;
229  std::cout << "Avg CPU Total Time(ms): " << avg_cpu_total_time_ms << std::endl;
230  std::cout << "Avg GPU Total Time(ms): " << avg_gpu_total_time_ms << std::endl;
231 
232  std::cout << "Process queue loop counter: " << process_queue_counter_ << std::endl
233  << std::endl;
234 }
235 
237  {
238  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
240  }
241  processor_queue_condition_.notify_one();
242  process_queue_thread_.join();
243 }
244 
246  {
247  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
248  if (pause_process_queue_ || process_queue_is_paused_) { // Was already true, abort
249  LOG(INFO)
250  << "Pause of ExecutorResourceMgr queue was called, but was already paused. "
251  "Taking no action.";
252  return;
253  }
254  pause_process_queue_ = true;
255  }
256  processor_queue_condition_.notify_one();
257 
258  std::unique_lock<std::mutex> pause_queue_lock(pause_processor_queue_mutex_);
259  pause_processor_queue_condition_.wait(pause_queue_lock,
260  [=] { return process_queue_is_paused_; });
261 
263 }
264 
266  {
267  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
269  LOG(INFO)
270  << "Resume of ExecutorResourceMgr queue was called, but was not paused. Taking "
271  "no action.";
272  return;
273  }
275  process_queue_is_paused_ = false;
276  pause_process_queue_ = false;
277  should_process_queue_ = true;
278  }
279  processor_queue_condition_.notify_one();
280 }
281 
283  const size_t resource_quantity) {
285  CHECK_EQ(get_resource_info(resource_type).first, size_t(0));
286  executor_resource_pool_.set_resource(resource_type, resource_quantity);
287  const auto resource_info = get_resource_info(resource_type);
288  CHECK_EQ(resource_info.first, size_t(0));
289  CHECK_EQ(resource_info.second, resource_quantity);
291 }
292 
294  const ResourceType resource_type) const {
296 }
297 
299  const ConcurrentResourceGrantPolicy& concurrent_resource_grant_policy) {
302  concurrent_resource_grant_policy);
303  const auto applied_concurrent_resource_grant_policy =
305  concurrent_resource_grant_policy.resource_type);
306  CHECK(concurrent_resource_grant_policy.concurrency_policy ==
307  applied_concurrent_resource_grant_policy.concurrency_policy);
308  CHECK(concurrent_resource_grant_policy.oversubscription_concurrency_policy ==
309  applied_concurrent_resource_grant_policy.oversubscription_concurrency_policy);
311 }
312 
314  const size_t min_ms_between_print_stats{5000}; // 5 sec
317  }
318  std::chrono::steady_clock::time_point last_print_time =
319  std::chrono::steady_clock::now();
320  while (true) {
321  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
322  processor_queue_condition_.wait(queue_lock, [=] {
324  });
325  // Use the following flag to know when to exit
326  // (to prevent leaving this thread dangling at server shutdown)
329  false; // not strictly neccessary, but would be if we add threads
330  return;
331  }
332 
333  if (pause_process_queue_) {
334  should_process_queue_ = false;
336  {
337  std::unique_lock<std::mutex> pause_queue_lock(pause_processor_queue_mutex_);
339  }
341  }
342  continue;
343  }
344 
346  RequestId chosen_request_id;
347  try {
348  chosen_request_id = choose_next_request();
349  } catch (ExecutorResourceMgrError const& e) {
350  chosen_request_id = e.getRequestId();
351  mark_request_error(chosen_request_id, e.getErrorMsg());
352  }
354  std::unique_lock<std::mutex> print_lock(print_mutex_);
355  std::cout << "Process loop iteration: " << process_queue_counter_ - 1 << std::endl;
356  std::cout << "Process loop chosen request_id: " << chosen_request_id << std::endl;
357  }
358  if (chosen_request_id == INVALID_REQUEST_ID) {
359  // Means no query was found that could be currently run
360  // Below is safe as we hold an exclusive lock on processor_queue_mutex_
361  should_process_queue_ = false;
362  continue;
363  }
364  // If here we have a valid request id
365  mark_request_dequed(chosen_request_id);
366  const auto request_stats = get_request_for_id(chosen_request_id);
367  if (!request_stats.error) {
369  request_stats.actual_resource_grant,
370  request_stats.request_info.chunk_request_info);
371  }
373 
375  std::chrono::steady_clock::time_point current_time =
376  std::chrono::steady_clock::now();
377  const size_t ms_since_last_print_stats =
378  std::chrono::duration_cast<std::chrono::milliseconds>(current_time -
379  last_print_time)
380  .count();
381  if (ms_since_last_print_stats >= min_ms_between_print_stats) {
383  last_print_time = current_time;
384  }
385  }
386  // Leave should_process_queue_ as true to see if we can allocate resources for another
387  // request
388  }
389 }
390 
392  const size_t timeout_in_ms,
393  const ResourceGrant& min_resource_grant,
394  const ResourceGrant& max_resource_grant) {
395  const std::chrono::steady_clock::time_point enqueue_time =
396  std::chrono::steady_clock::now();
397  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
398  const RequestId request_id = requests_count_.fetch_add(1, std::memory_order_relaxed);
400  if (timeout_in_ms > 0) {
402  }
403  const size_t queue_length_at_entry = executor_stats_.queue_length++;
404  executor_stats_.sum_queue_size_at_entry += queue_length_at_entry;
405  size_t device_type_queue_length_at_entry{0};
406  switch (request_info.request_device_type) {
409  device_type_queue_length_at_entry = executor_stats_.cpu_queue_length++;
410  executor_stats_.sum_cpu_queue_size_at_entry += device_type_queue_length_at_entry;
411  break;
412  }
415  device_type_queue_length_at_entry = executor_stats_.gpu_queue_length++;
416  executor_stats_.sum_gpu_queue_size_at_entry += device_type_queue_length_at_entry;
417  break;
418  }
419  default:
420  UNREACHABLE();
421  }
422 
423  requests_stats_.emplace_back(RequestStats(request_id,
424  request_info,
425  min_resource_grant,
426  max_resource_grant,
427  enqueue_time,
428  queue_length_at_entry,
429  device_type_queue_length_at_entry,
430  timeout_in_ms));
432  return request_id;
433 }
434 
436  const std::chrono::steady_clock::time_point deque_time =
437  std::chrono::steady_clock::now();
438  // Below is only to CHECK our request_id against high water mark... should be
439  // relatively inexpensive though
440  const size_t current_request_count = requests_count_.load(std::memory_order_relaxed);
441  CHECK_LT(request_id, current_request_count);
442  {
443  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
444  RequestStats& request_stats = requests_stats_[request_id];
445  request_stats.deque_time = deque_time;
446  request_stats.finished_queueing = true;
447  request_stats.queue_time_ms =
448  std::chrono::duration_cast<std::chrono::milliseconds>(request_stats.deque_time -
449  request_stats.enqueue_time)
450  .count();
451  }
454 
455  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
456  const RequestStats& request_stats = requests_stats_[request_id];
459  if (request_stats.queue_time_ms <= ACTUALLY_QUEUED_MIN_MS) {
462  }
463  switch (request_stats.request_info.request_device_type) {
467  if (request_stats.queue_time_ms <= ACTUALLY_QUEUED_MIN_MS) {
470  }
471  break;
475  if (request_stats.queue_time_ms <= ACTUALLY_QUEUED_MIN_MS) {
478  }
479  break;
480  default:
481  UNREACHABLE();
482  }
483 }
484 
486  const size_t current_request_count = requests_count_.load(std::memory_order_relaxed);
487  CHECK_LT(request_id, current_request_count);
488  {
489  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
490  RequestStats& request_stats = requests_stats_[request_id];
491  CHECK(!request_stats.finished_queueing);
492  CHECK_GT(request_stats.timeout_in_ms, size_t(0));
493  request_stats.timed_out = true;
494  }
496  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
497  const RequestStats& request_stats = requests_stats_[request_id];
501  switch (request_stats.request_info.request_device_type) {
505  break;
506  }
510  break;
511  }
512  default:
513  UNREACHABLE();
514  }
515 }
516 
518  const std::chrono::steady_clock::time_point execution_finished_time =
519  std::chrono::steady_clock::now();
520  // Below is only to CHECK our request_id against high water mark... should be
521  // relatively inexpensive though
522  const size_t current_request_count = requests_count_.load(std::memory_order_relaxed);
523  CHECK_LT(request_id, current_request_count);
524  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
525  RequestStats& request_stats = requests_stats_[request_id];
526  request_stats.execution_finished_time = execution_finished_time;
527  request_stats.finished_executing = true;
528  request_stats.execution_time_ms =
529  std::chrono::duration_cast<std::chrono::milliseconds>(
530  request_stats.execution_finished_time - request_stats.deque_time)
531  .count();
532  request_stats.total_time_ms =
533  std::chrono::duration_cast<std::chrono::milliseconds>(
534  request_stats.execution_finished_time - request_stats.enqueue_time)
535  .count();
537 
542  switch (request_stats.request_info.request_device_type) {
548  break;
549  }
555  break;
556  }
557  default:
558  UNREACHABLE();
559  }
560 }
561 
563  const ExecutionRequestStage request_stage) const {
564  auto& chosen_set = request_stage == ExecutionRequestStage::QUEUED ? queued_requests_
566  auto& chosen_mutex = request_stage == ExecutionRequestStage::QUEUED
569  std::shared_lock<std::shared_mutex> set_read_lock(chosen_mutex);
570 
571  const std::vector<RequestId> request_ids_for_stage(chosen_set.begin(),
572  chosen_set.end());
573  return request_ids_for_stage;
574 }
575 
577  const RequestId request_id,
578  const ExecutionRequestStage request_stage) {
579  auto& chosen_set = request_stage == ExecutionRequestStage::QUEUED ? queued_requests_
581  auto& chosen_mutex = request_stage == ExecutionRequestStage::QUEUED
584  std::unique_lock<std::shared_mutex> set_write_lock(chosen_mutex);
585 
586  CHECK(chosen_set.insert(request_id)
587  .second); // Should return true as element should not exist in set
588 }
589 
591  const RequestId request_id,
592  const ExecutionRequestStage request_stage) {
593  auto& chosen_set = request_stage == ExecutionRequestStage::QUEUED ? queued_requests_
595  auto& chosen_mutex = request_stage == ExecutionRequestStage::QUEUED
598  std::unique_lock<std::shared_mutex> set_write_lock(chosen_mutex);
599 
600  CHECK_EQ(chosen_set.erase(request_id),
601  size_t(1)); // Should return 1 as element must be in set
602 }
603 
605  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
606  return requests_stats_[request_id].request_info.chunk_request_info;
607 }
608 
609 std::shared_ptr<ExecutorResourceMgr> generate_executor_resource_mgr(
610  const size_t num_cpu_slots,
611  const size_t num_gpu_slots,
612  const size_t cpu_result_mem,
613  const size_t cpu_buffer_pool_mem,
614  const size_t gpu_buffer_pool_mem,
615  const double per_query_max_cpu_slots_ratio,
616  const double per_query_max_cpu_result_mem_ratio,
617  const double per_query_max_pinned_cpu_buffer_pool_mem_ratio,
618  const double per_query_max_pageable_cpu_buffer_pool_mem_ratio,
619  const bool allow_cpu_kernel_concurrency,
620  const bool allow_cpu_gpu_kernel_concurrency,
621  const bool allow_cpu_slot_oversubscription_concurrency,
622  const bool allow_gpu_slot_oversubscription,
623  const bool allow_cpu_result_mem_oversubscription_concurrency,
624  const double max_available_resource_use_ratio) {
625  CHECK_GT(num_cpu_slots, size_t(0));
626  CHECK_GT(cpu_result_mem, size_t(0));
627  CHECK_GT(cpu_buffer_pool_mem, size_t(0));
628  CHECK_GT(per_query_max_cpu_slots_ratio, size_t(0));
629  CHECK_EQ(!(allow_cpu_kernel_concurrency || allow_cpu_gpu_kernel_concurrency) &&
630  allow_cpu_slot_oversubscription_concurrency,
631  false);
632  CHECK_EQ(!(allow_cpu_kernel_concurrency || allow_cpu_gpu_kernel_concurrency) &&
633  allow_cpu_result_mem_oversubscription_concurrency,
634  false);
635  CHECK_GT(max_available_resource_use_ratio, 0.0);
636  CHECK_LE(max_available_resource_use_ratio, 1.0);
637 
638  const std::vector<std::pair<ResourceType, size_t>> total_resources = {
639  std::make_pair(ResourceType::CPU_SLOTS, num_cpu_slots),
640  std::make_pair(ResourceType::GPU_SLOTS, num_gpu_slots),
641  std::make_pair(ResourceType::CPU_RESULT_MEM, cpu_result_mem),
642  std::make_pair(ResourceType::CPU_BUFFER_POOL_MEM, cpu_buffer_pool_mem),
643  std::make_pair(ResourceType::GPU_BUFFER_POOL_MEM, gpu_buffer_pool_mem)};
644 
645  const auto max_per_request_cpu_slots_grant_policy = gen_ratio_resource_grant_policy(
646  ResourceSubtype::CPU_SLOTS, per_query_max_cpu_slots_ratio);
647 
648  // Use unlimited policy for now as some GPU query plans can need more kernels than gpus
649  const auto max_per_request_gpu_slots_grant_policy =
651  const auto max_per_request_cpu_result_mem_grant_policy =
653  per_query_max_cpu_result_mem_ratio);
654 
655  const auto max_per_request_pinned_cpu_buffer_pool_mem =
657  per_query_max_pinned_cpu_buffer_pool_mem_ratio);
658  const auto max_per_request_pageable_cpu_buffer_pool_mem =
660  per_query_max_pageable_cpu_buffer_pool_mem_ratio);
661 
662  const std::vector<ResourceGrantPolicy> max_per_request_resource_grant_policies = {
663  max_per_request_cpu_slots_grant_policy,
664  max_per_request_gpu_slots_grant_policy,
665  max_per_request_cpu_result_mem_grant_policy,
666  max_per_request_pinned_cpu_buffer_pool_mem,
667  max_per_request_pageable_cpu_buffer_pool_mem};
668 
669  const auto cpu_slots_undersubscription_concurrency_policy =
670  allow_cpu_kernel_concurrency ? ResourceConcurrencyPolicy::ALLOW_CONCURRENT_REQUESTS
672  // Whether a single query can oversubscribe CPU slots should be controlled with
673  // per_query_max_cpu_slots_ratio
674  const auto cpu_slots_oversubscription_concurrency_policy =
675  allow_cpu_slot_oversubscription_concurrency
678  const auto gpu_slots_undersubscription_concurrency_policy =
679  allow_cpu_gpu_kernel_concurrency
682  const auto gpu_slots_oversubscription_concurrency_policy =
683  !allow_gpu_slot_oversubscription
685  : (allow_cpu_gpu_kernel_concurrency
688 
689  // Whether a single query can oversubscribe CPU memory should be controlled with
690  // per_query_max_cpu_result_mem_ratio
691  const auto cpu_result_mem_oversubscription_concurrency_policy =
692  allow_cpu_result_mem_oversubscription_concurrency
695 
696  const auto concurrent_cpu_slots_grant_policy =
698  cpu_slots_undersubscription_concurrency_policy,
699  cpu_slots_oversubscription_concurrency_policy);
700  const ConcurrentResourceGrantPolicy concurrent_gpu_slots_grant_policy(
702  gpu_slots_undersubscription_concurrency_policy,
703  gpu_slots_oversubscription_concurrency_policy);
704 
705  const auto concurrent_cpu_result_mem_grant_policy =
708  cpu_result_mem_oversubscription_concurrency_policy);
709 
710  const std::vector<ConcurrentResourceGrantPolicy> concurrent_resource_grant_policies{
711  concurrent_cpu_slots_grant_policy,
712  concurrent_gpu_slots_grant_policy,
713  concurrent_cpu_result_mem_grant_policy};
714 
715  return std::make_shared<ExecutorResourceMgr>(total_resources,
716  concurrent_resource_grant_policies,
717  max_per_request_resource_grant_policies,
718  max_available_resource_use_ratio);
719 }
720 
721 } // namespace ExecutorResourceMgr_Namespace
void release_resources(const RequestId request_id, const ResourceGrant &resource_grant)
Instructs ExecutorResourceMgr that the resources held by the requestor with the given request_id can ...
A container to store requested and minimum neccessary resource requests across all resource types cur...
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::shared_ptr< ExecutorResourceMgr > generate_executor_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const double per_query_max_pinned_cpu_buffer_pool_mem_ratio, const double per_query_max_pageable_cpu_buffer_pool_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_gpu_slot_oversubscription, const bool allow_cpu_result_mem_oversubscription_concurrency, const double max_available_resource_use_ratio)
Convenience factory-esque method that allows us to use the same logic to generate an ExecutorResource...
ChunkRequestInfo get_chunk_request_info(const RequestId request_id)
Get the DataMgr chunk ids and associated sizes pertaining to the input data needed by a request...
void remove_request_from_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Removes the request specified by the provided request_id from the specified stage...
void queue_request_and_wait(const RequestId request_id)
Submits a request with id request_id into the queue, waiting on a BinarySemaphore until ExecutorResou...
std::unique_ptr< ExecutorResourceHandle > request_resources(const RequestInfo &request_info)
Requests resources from ExecutorResourceMgr, with no timeout (unlike request_resources_with_timeout) ...
#define LOG(tag)
Definition: Logger.h:285
void add_request_to_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Adds the request specified by the provided request_id to the specified stage...
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
ResourceType resource_type
The type of a resource this concurrent resource grant policy pertains to.
void set_resource(const ResourceType resource_type, const size_t resource_quantity)
Sets the quantity of resource_type to resource_quantity. If pool has outstanding requests, will throw. Responsibility of allowing the pool to empty and preventing concurrent requests while this operation is running is left to the caller (in particular, ExecutorResourceMgr::set_resource pauses the process queue, which waits until all executing requests are finished before yielding to the caller, before calling this method).
void deallocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Deallocates resources granted to a requestor such that they can be used for other requests...
#define UNREACHABLE()
Definition: Logger.h:338
#define CHECK_GE(x, y)
Definition: Logger.h:306
ResourceType
Stores the resource type for a ExecutorResourcePool request.
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
void process_queue_loop()
Internal method: A thread is assigned to run this function in the constructor of ExecutorResourceMgr...
Specifies the policies for resource grants in the presence of other requests, both under situations o...
ExecutorResourceMgr(const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies, const double max_available_resource_use_ratio)
The constructor instantiates an ExecutorResourcePool with the provided parameters, and starts the process queue by launching a thread to invoke process_queue_loop.
ResourceGrantPolicy gen_unlimited_resource_grant_policy(const ResourceSubtype resource_subtype)
Generates a ResourceGrantPolicy with ResourceGrantPolicySizeType::UNLIMITED
void stop_process_queue_thread()
Internal method: Invoked from ExecutorResourceMgr destructor, sets stop_process_queue_thread_ to true...
#define CHECK_GT(x, y)
Definition: Logger.h:305
~ExecutorResourceMgr()
The destructor ensures that the process queue thread (process_queue_thread) is stopped and that any t...
ResourceConcurrencyPolicy concurrency_policy
The grant policy in effect when there are concurrent requests for the resource specified by resource_...
std::pair< ResourceGrant, ResourceGrant > calc_min_max_resource_grants_for_request(const RequestInfo &resource_request) const
Given the provided resource_request, statically calculate the minimum and maximum grantable resources...
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Set the concurrent resource grant policy for a given resource type (stored in ConcurrentResourceGrant...
void allocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Given a resource grant (assumed to be computed in determine_dynamic_resource_grant), actually allocate (reserve) the resources in the pool so other requestors (queries) cannot use those resources until returned to the pool.
void mark_request_error(const RequestId request_id, std::string error_msg)
std::chrono::steady_clock::time_point deque_time
ResourceGrantPolicy gen_ratio_resource_grant_policy(const ResourceSubtype resource_subtype, const double ratio_grant)
Generates a ResourceGrantPolicy with ResourceGrantPolicySizeType::RATIO_TO_TOTAL
std::vector< RequestId > get_requests_for_stage(const ExecutionRequestStage request_status) const
Internal method: Get the request ids for a given stage (QUEUED or EXECUTING)
Specifies the resources of each type for a given resource grant.
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Resets the concurrent resource grant policy object, which specifies a ResourceType as well as normal ...
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
ResourcePoolInfo get_resource_info() const
Returns a struct containing the total and allocated amounts of all resources tracked by ExecutorResou...
std::set< RequestId > executing_requests_
Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_.
std::atomic< size_t > requests_count_
An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming r...
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const
Get the concurrent resource grant policy for a given resource type.
void mark_request_dequed(const RequestId request_id)
Internal method: Moves the request from the QUEUED stage to EXECUTING stage and performs other bookke...
RequestId choose_next_request()
Internal method: Invoked from process_queue_loop, chooses the next resource request to grant...
ResourceConcurrencyPolicy oversubscription_concurrency_policy
The grant policy in effect when there are concurrent requests for the resource specified by resource_...
std::shared_mutex queued_set_mutex_
RW mutex that protects access to queued_requests_
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::pair< bool, ResourceGrant > determine_dynamic_resource_grant(const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant, const ChunkRequestInfo &chunk_request_info, const double max_request_backoff_ratio) const
Determines the actual resource grant to give a query (which will be somewhere between the provided mi...
#define CHECK_LE(x, y)
Definition: Logger.h:304
OutstandingQueueRequests outstanding_queue_requests_
Stores and manages a map of request ids to BinarySemaphore objects to allow threads waiting for resou...
Stores current key statistics relating to ExecutorResourceMgr state, particularly around the number o...
void wake_request_by_id(const RequestId request_id)
Wakes a waiting thread in the queue. Invoked by ExecutorResourceMgr::process_queue_loop() ...
void pause_process_queue()
Pauses the process queue in a thread-safe manner, waiting for all queries in the executing stage to f...
RequestId request_id()
Definition: Logger.cpp:876
ExecutorStats get_executor_stats() const
Returns a copy of the ExecutorStats struct held by ExecutorResourceMgr. Used for testing currently...
void mark_request_finished(const RequestId request_id)
Internal method: Invoked on successful completion of a query step from release_resources method...
std::unique_ptr< ExecutorResourceHandle > request_resources_with_timeout(const RequestInfo &request_info, const size_t timeout_in_ms)
Requests resources from ExecutorResourceMgr, will throw if request takes longer than time specified b...
std::shared_mutex executing_set_mutex_
RW mutex that protects access to executing_requests_
#define CHECK(condition)
Definition: Logger.h:291
void queue_request_and_wait_with_timeout(const RequestId request_id, const size_t max_wait_in_ms)
Submits a request with id request_id into the queue, waiting on a BinarySemaphore until ExecutorResou...
std::chrono::steady_clock::time_point enqueue_time
void set_resource(const ResourceType resource_type, const size_t resoure_quantity)
Used to change the total amount available of a specified resource after construction of ExecutorResou...
std::chrono::steady_clock::time_point execution_finished_time
void mark_request_timed_out(const RequestId request_id)
Internal method: Called if the request times out (i.e. request was made via request_resources_with_ti...
void resume_process_queue()
Resumes the process queue in a thread-safe manner. If the process queue is already paused...
std::thread process_queue_thread_
The thread started in the ExecutorResourceMgr constructor that continuously loops inside of process_q...
Specifies all DataMgr chunks needed for a query step/request, along with their sizes in bytes...
void set_process_queue_flag()
Internal method: Set the should_process_queue_ flag to true, signifying that the queue should be proc...
void print_executor_stats() const
Prints the ExecutorStats struct. Use for debugging.
std::set< RequestId > queued_requests_
Set of all request ids that are currently queued. Protected by queued_set_mutex_. ...
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...
Stores info pertaining to a single request made to ExecutorResourceMgr, including its request_id...
RequestId enqueue_request(const RequestInfo &request_info, const size_t timeout_in_ms, const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant)
Internal method: Invoked from request_resource/request_resource_with_timeout, places request in the r...
RequestStats get_request_for_id(const RequestId request_id) const
Internal method: Returns the RequestStats for a request specified by request_id.
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const