23 namespace ExecutorResourceMgr_Namespace {
26 const std::vector<std::pair<ResourceType, size_t>>& total_resources,
27 const std::vector<ConcurrentResourceGrantPolicy>& concurrent_resource_grant_policies,
28 const std::vector<ResourceGrantPolicy>& max_per_request_resource_grant_policies,
29 const double max_available_resource_use_ratio)
30 : executor_resource_pool_(total_resources,
31 concurrent_resource_grant_policies,
32 max_per_request_resource_grant_policies)
33 , max_available_resource_use_ratio_(max_available_resource_use_ratio) {
37 LOG(
EXECUTOR) <<
"Executor Resource Manager queue proccessing thread started";
44 std::unique_ptr<ExecutorResourceHandle>
46 const size_t timeout_in_ms) {
47 std::pair<ResourceGrant, ResourceGrant> min_max_resource_grants;
51 min_max_resource_grants =
56 min_max_resource_grants.first,
57 min_max_resource_grants.second);
61 std::cout << std::endl <<
"Min resource grant";
62 min_max_resource_grants.first.print();
63 std::cout << std::endl <<
"Max resource grant";
64 min_max_resource_grants.second.print();
72 if (timeout_in_ms > 0) {
85 auto this_ptr = shared_from_this();
88 if (request_stats.
error) {
89 throw std::runtime_error(
"RequestStats error: " + *request_stats.
error);
93 CHECK_GE(actual_resource_grant.
cpu_slots, min_max_resource_grants.first.cpu_slots);
94 CHECK_GE(actual_resource_grant.
gpu_slots, min_max_resource_grants.first.gpu_slots);
96 min_max_resource_grants.first.cpu_result_mem);
97 return std::make_unique<ExecutorResourceHandle>(
105 static_cast<size_t>(0));
127 std::string error_msg) {
136 <<
" Queued requests: " << request_ids.size();
141 const auto actual_resource_grant =
143 request_stats.min_resource_grant,
144 request_stats.max_resource_grant,
145 request_stats.request_info.chunk_request_info,
149 if (actual_resource_grant.first) {
150 request_stats.actual_resource_grant = actual_resource_grant.second;
152 <<
" from " << request_ids.size() <<
" queued requests.";
153 LOG(
EXECUTOR) <<
"Request grant: " << actual_resource_grant.second.to_string();
156 std::cout << std::endl <<
"Actual grant";
157 actual_resource_grant.second.print();
161 }
catch (std::runtime_error
const& e) {
177 std::cout << std::endl <<
"Executor Stats" << std::endl;
178 std::cout <<
"Requests: " << executor_stats.requests << std::endl;
179 std::cout <<
"CPU Requests: " << executor_stats.cpu_requests << std::endl;
180 std::cout <<
"GPU Requests: " << executor_stats.gpu_requests << std::endl;
181 std::cout <<
"Queue Length: " << executor_stats.queue_length << std::endl;
182 std::cout <<
"CPU Queue Length: " << executor_stats.cpu_queue_length << std::endl;
183 std::cout <<
"GPU Queue Length: " << executor_stats.gpu_queue_length << std::endl;
184 std::cout <<
"Total Queue Time(ms): " << executor_stats.total_queue_time_ms
186 std::cout <<
"Total CPU Queue Time(ms): " << executor_stats.total_cpu_queue_time_ms
188 std::cout <<
"Total GPU Queue Time(ms): " << executor_stats.total_gpu_queue_time_ms
190 std::cout <<
"Requests Actually Queued: " << executor_stats.requests_actually_queued
192 std::cout <<
"Requests Executing: " << executor_stats.requests_executing << std::endl;
193 std::cout <<
"Requests Executed: " << executor_stats.requests_executed << std::endl;
194 std::cout <<
"Total Execution Time(ms): " << executor_stats.total_execution_time_ms
196 std::cout <<
"Total CPU Execution Time(ms): "
197 << executor_stats.total_cpu_execution_time_ms << std::endl;
198 std::cout <<
"Total GPU Execution Time(ms): "
199 << executor_stats.total_gpu_execution_time_ms << std::endl;
200 std::cout <<
"Total Time(ms): " << executor_stats.total_time_ms << std::endl;
201 std::cout <<
"Total CPU Time(ms): " << executor_stats.total_cpu_time_ms << std::endl;
202 std::cout <<
"Total GPU Time(ms): " << executor_stats.total_gpu_time_ms << std::endl;
206 const double avg_execution_time_ms =
207 executor_stats.total_execution_time_ms /
208 std::max(executor_stats.requests_executed,
size_t(1));
209 const double avg_cpu_execution_time_ms =
210 executor_stats.total_cpu_execution_time_ms /
211 std::max(executor_stats.cpu_requests_executed,
size_t(1));
212 const double avg_gpu_execution_time_ms =
213 executor_stats.total_gpu_execution_time_ms /
214 std::max(executor_stats.gpu_requests_executed,
size_t(1));
215 const double avg_total_time_ms = executor_stats.total_time_ms /
216 std::max(executor_stats.requests_executed,
size_t(1));
217 const double avg_cpu_total_time_ms =
218 executor_stats.total_cpu_time_ms /
219 std::max(executor_stats.cpu_requests_executed,
size_t(1));
220 const double avg_gpu_total_time_ms =
221 executor_stats.total_gpu_time_ms /
222 std::max(executor_stats.gpu_requests_executed,
size_t(1));
224 std::cout <<
"Avg Execution Time(ms): " << avg_execution_time_ms << std::endl;
225 std::cout <<
"Avg CPU Execution Time(ms): " << avg_cpu_execution_time_ms << std::endl;
226 std::cout <<
"Avg GPU Execution Time(ms): " << avg_gpu_execution_time_ms << std::endl;
228 std::cout <<
"Avg Total Time(ms): " << avg_total_time_ms << std::endl;
229 std::cout <<
"Avg CPU Total Time(ms): " << avg_cpu_total_time_ms << std::endl;
230 std::cout <<
"Avg GPU Total Time(ms): " << avg_gpu_total_time_ms << std::endl;
250 <<
"Pause of ExecutorResourceMgr queue was called, but was already paused. "
270 <<
"Resume of ExecutorResourceMgr queue was called, but was not paused. Taking "
283 const size_t resource_quantity) {
288 CHECK_EQ(resource_info.first,
size_t(0));
289 CHECK_EQ(resource_info.second, resource_quantity);
302 concurrent_resource_grant_policy);
303 const auto applied_concurrent_resource_grant_policy =
307 applied_concurrent_resource_grant_policy.concurrency_policy);
309 applied_concurrent_resource_grant_policy.oversubscription_concurrency_policy);
314 const size_t min_ms_between_print_stats{5000};
318 std::chrono::steady_clock::time_point last_print_time =
319 std::chrono::steady_clock::now();
356 std::cout <<
"Process loop chosen request_id: " << chosen_request_id << std::endl;
367 if (!request_stats.error) {
369 request_stats.actual_resource_grant,
370 request_stats.request_info.chunk_request_info);
375 std::chrono::steady_clock::time_point current_time =
376 std::chrono::steady_clock::now();
377 const size_t ms_since_last_print_stats =
378 std::chrono::duration_cast<std::chrono::milliseconds>(current_time -
381 if (ms_since_last_print_stats >= min_ms_between_print_stats) {
383 last_print_time = current_time;
392 const size_t timeout_in_ms,
395 const std::chrono::steady_clock::time_point enqueue_time =
396 std::chrono::steady_clock::now();
400 if (timeout_in_ms > 0) {
405 size_t device_type_queue_length_at_entry{0};
428 queue_length_at_entry,
429 device_type_queue_length_at_entry,
436 const std::chrono::steady_clock::time_point deque_time =
437 std::chrono::steady_clock::now();
440 const size_t current_request_count =
requests_count_.load(std::memory_order_relaxed);
441 CHECK_LT(request_id, current_request_count);
448 std::chrono::duration_cast<std::chrono::milliseconds>(request_stats.
deque_time -
486 const size_t current_request_count =
requests_count_.load(std::memory_order_relaxed);
487 CHECK_LT(request_id, current_request_count);
518 const std::chrono::steady_clock::time_point execution_finished_time =
519 std::chrono::steady_clock::now();
522 const size_t current_request_count =
requests_count_.load(std::memory_order_relaxed);
523 CHECK_LT(request_id, current_request_count);
529 std::chrono::duration_cast<std::chrono::milliseconds>(
533 std::chrono::duration_cast<std::chrono::milliseconds>(
569 std::shared_lock<std::shared_mutex> set_read_lock(chosen_mutex);
571 const std::vector<RequestId> request_ids_for_stage(chosen_set.begin(),
573 return request_ids_for_stage;
584 std::unique_lock<std::shared_mutex> set_write_lock(chosen_mutex);
586 CHECK(chosen_set.insert(request_id)
598 std::unique_lock<std::shared_mutex> set_write_lock(chosen_mutex);
600 CHECK_EQ(chosen_set.erase(request_id),
610 const size_t num_cpu_slots,
611 const size_t num_gpu_slots,
612 const size_t cpu_result_mem,
613 const size_t cpu_buffer_pool_mem,
614 const size_t gpu_buffer_pool_mem,
615 const double per_query_max_cpu_slots_ratio,
616 const double per_query_max_cpu_result_mem_ratio,
617 const double per_query_max_pinned_cpu_buffer_pool_mem_ratio,
618 const double per_query_max_pageable_cpu_buffer_pool_mem_ratio,
619 const bool allow_cpu_kernel_concurrency,
620 const bool allow_cpu_gpu_kernel_concurrency,
621 const bool allow_cpu_slot_oversubscription_concurrency,
622 const bool allow_gpu_slot_oversubscription,
623 const bool allow_cpu_result_mem_oversubscription_concurrency,
624 const double max_available_resource_use_ratio) {
626 CHECK_GT(cpu_result_mem,
size_t(0));
627 CHECK_GT(cpu_buffer_pool_mem,
size_t(0));
628 CHECK_GT(per_query_max_cpu_slots_ratio,
size_t(0));
629 CHECK_EQ(!(allow_cpu_kernel_concurrency || allow_cpu_gpu_kernel_concurrency) &&
630 allow_cpu_slot_oversubscription_concurrency,
632 CHECK_EQ(!(allow_cpu_kernel_concurrency || allow_cpu_gpu_kernel_concurrency) &&
633 allow_cpu_result_mem_oversubscription_concurrency,
635 CHECK_GT(max_available_resource_use_ratio, 0.0);
636 CHECK_LE(max_available_resource_use_ratio, 1.0);
638 const std::vector<std::pair<ResourceType, size_t>> total_resources = {
649 const auto max_per_request_gpu_slots_grant_policy =
651 const auto max_per_request_cpu_result_mem_grant_policy =
653 per_query_max_cpu_result_mem_ratio);
655 const auto max_per_request_pinned_cpu_buffer_pool_mem =
657 per_query_max_pinned_cpu_buffer_pool_mem_ratio);
658 const auto max_per_request_pageable_cpu_buffer_pool_mem =
660 per_query_max_pageable_cpu_buffer_pool_mem_ratio);
662 const std::vector<ResourceGrantPolicy> max_per_request_resource_grant_policies = {
663 max_per_request_cpu_slots_grant_policy,
664 max_per_request_gpu_slots_grant_policy,
665 max_per_request_cpu_result_mem_grant_policy,
666 max_per_request_pinned_cpu_buffer_pool_mem,
667 max_per_request_pageable_cpu_buffer_pool_mem};
669 const auto cpu_slots_undersubscription_concurrency_policy =
674 const auto cpu_slots_oversubscription_concurrency_policy =
675 allow_cpu_slot_oversubscription_concurrency
678 const auto gpu_slots_undersubscription_concurrency_policy =
679 allow_cpu_gpu_kernel_concurrency
682 const auto gpu_slots_oversubscription_concurrency_policy =
683 !allow_gpu_slot_oversubscription
685 : (allow_cpu_gpu_kernel_concurrency
691 const auto cpu_result_mem_oversubscription_concurrency_policy =
692 allow_cpu_result_mem_oversubscription_concurrency
696 const auto concurrent_cpu_slots_grant_policy =
698 cpu_slots_undersubscription_concurrency_policy,
699 cpu_slots_oversubscription_concurrency_policy);
702 gpu_slots_undersubscription_concurrency_policy,
703 gpu_slots_oversubscription_concurrency_policy);
705 const auto concurrent_cpu_result_mem_grant_policy =
708 cpu_result_mem_oversubscription_concurrency_policy);
710 const std::vector<ConcurrentResourceGrantPolicy> concurrent_resource_grant_policies{
711 concurrent_cpu_slots_grant_policy,
712 concurrent_gpu_slots_grant_policy,
713 concurrent_cpu_result_mem_grant_policy};
715 return std::make_shared<ExecutorResourceMgr>(total_resources,
716 concurrent_resource_grant_policies,
717 max_per_request_resource_grant_policies,
718 max_available_resource_use_ratio);
void release_resources(const RequestId request_id, const ResourceGrant &resource_grant)
Instructs ExecutorResourceMgr that the resources held by the requestor with the given request_id can ...
size_t requests_timed_out
size_t sum_gpu_queue_size_at_entry
A container to store requested and minimum neccessary resource requests across all resource types cur...
std::shared_ptr< ExecutorResourceMgr > generate_executor_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const double per_query_max_pinned_cpu_buffer_pool_mem_ratio, const double per_query_max_pageable_cpu_buffer_pool_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_gpu_slot_oversubscription, const bool allow_cpu_result_mem_oversubscription_concurrency, const double max_available_resource_use_ratio)
Convenience factory-esque method that allows us to use the same logic to generate an ExecutorResource...
ChunkRequestInfo get_chunk_request_info(const RequestId request_id)
Get the DataMgr chunk ids and associated sizes pertaining to the input data needed by a request...
void remove_request_from_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Removes the request specified by the provided request_id from the specified stage...
size_t total_gpu_queue_time_ms
void queue_request_and_wait(const RequestId request_id)
Submits a request with id request_id into the queue, waiting on a BinarySemaphore until ExecutorResou...
std::unique_ptr< ExecutorResourceHandle > request_resources(const RequestInfo &request_info)
Requests resources from ExecutorResourceMgr, with no timeout (unlike request_resources_with_timeout) ...
size_t sum_queue_size_at_entry
bool should_process_queue_
std::condition_variable pause_processor_queue_condition_
size_t sum_cpu_queue_size_at_entry
const size_t ACTUALLY_QUEUED_MIN_MS
void add_request_to_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Adds the request specified by the provided request_id to the specified stage...
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
ResourceType resource_type
The type of a resource this concurrent resource grant policy pertains to.
void set_resource(const ResourceType resource_type, const size_t resource_quantity)
Sets the quantity of resource_type to resource_quantity. If pool has outstanding requests, will throw. Responsibility of allowing the pool to empty and preventing concurrent requests while this operation is running is left to the caller (in particular, ExecutorResourceMgr::set_resource pauses the process queue, which waits until all executing requests are finished before yielding to the caller, before calling this method).
bool process_queue_is_paused_
void deallocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Deallocates resources granted to a requestor such that they can be used for other requests...
ResourceType
Stores the resource type for a ExecutorResourcePool request.
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
void process_queue_loop()
Internal method: A thread is assigned to run this function in the constructor of ExecutorResourceMgr...
Specifies the policies for resource grants in the presence of other requests, both under situations o...
ExecutorResourceMgr(const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies, const double max_available_resource_use_ratio)
The constructor instantiates an ExecutorResourcePool with the provided parameters, and starts the process queue by launching a thread to invoke process_queue_loop.
ResourceGrantPolicy gen_unlimited_resource_grant_policy(const ResourceSubtype resource_subtype)
Generates a ResourceGrantPolicy with ResourceGrantPolicySizeType::UNLIMITED
size_t cpu_requests_executing
void stop_process_queue_thread()
Internal method: Invoked from ExecutorResourceMgr destructor, sets stop_process_queue_thread_ to true...
~ExecutorResourceMgr()
The destructor ensures that the process queue thread (process_queue_thread) is stopped and that any t...
ResourceConcurrencyPolicy concurrency_policy
The grant policy in effect when there are concurrent requests for the resource specified by resource_...
std::pair< ResourceGrant, ResourceGrant > calc_min_max_resource_grants_for_request(const RequestInfo &resource_request) const
Given the provided resource_request, statically calculate the minimum and maximum grantable resources...
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Set the concurrent resource grant policy for a given resource type (stored in ConcurrentResourceGrant...
void allocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Given a resource grant (assumed to be computed in determine_dynamic_resource_grant), actually allocate (reserve) the resources in the pool so other requestors (queries) cannot use those resources until returned to the pool.
size_t cpu_requests_executed
void mark_request_error(const RequestId request_id, std::string error_msg)
std::chrono::steady_clock::time_point deque_time
ResourceGrantPolicy gen_ratio_resource_grant_policy(const ResourceSubtype resource_subtype, const double ratio_grant)
Generates a ResourceGrantPolicy with ResourceGrantPolicySizeType::RATIO_TO_TOTAL
std::vector< RequestId > get_requests_for_stage(const ExecutionRequestStage request_status) const
Internal method: Get the request ids for a given stage (QUEUED or EXECUTING)
RequestId getRequestId() const
Specifies the resources of each type for a given resource grant.
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Resets the concurrent resource grant policy object, which specifies a ResourceType as well as normal ...
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
size_t requests_executing
size_t gpu_requests_executed
ResourcePoolInfo get_resource_info() const
Returns a struct containing the total and allocated amounts of all resources tracked by ExecutorResou...
const RequestId INVALID_REQUEST_ID
std::set< RequestId > executing_requests_
Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_.
std::atomic< size_t > requests_count_
An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming r...
ResourceGrant actual_resource_grant
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const
Get the concurrent resource grant policy for a given resource type.
void mark_request_dequed(const RequestId request_id)
Internal method: Moves the request from the QUEUED stage to EXECUTING stage and performs other bookke...
RequestId choose_next_request()
Internal method: Invoked from process_queue_loop, chooses the next resource request to grant...
ResourceConcurrencyPolicy oversubscription_concurrency_policy
The grant policy in effect when there are concurrent requests for the resource specified by resource_...
std::shared_mutex queued_set_mutex_
RW mutex that protects access to queued_requests_
const bool enable_stats_printing_
const RequestInfo request_info
size_t process_queue_counter_
bool pause_process_queue_
size_t total_cpu_queue_time_ms
std::pair< bool, ResourceGrant > determine_dynamic_resource_grant(const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant, const ChunkRequestInfo &chunk_request_info, const double max_request_backoff_ratio) const
Determines the actual resource grant to give a query (which will be somewhere between the provided mi...
OutstandingQueueRequests outstanding_queue_requests_
Stores and manages a map of request ids to BinarySemaphore objects to allow threads waiting for resou...
Stores current key statistics relating to ExecutorResourceMgr state, particularly around the number o...
size_t requests_with_timeouts
size_t gpu_requests_executing
size_t requests_actually_queued
size_t gpu_requests_actually_queued
void wake_request_by_id(const RequestId request_id)
Wakes a waiting thread in the queue. Invoked by ExecutorResourceMgr::process_queue_loop() ...
size_t total_gpu_execution_time_ms
ExecutorDeviceType request_device_type
std::optional< std::string > error
void pause_process_queue()
Pauses the process queue in a thread-safe manner, waiting for all queries in the executing stage to f...
std::string getErrorMsg() const
ExecutorStats get_executor_stats() const
Returns a copy of the ExecutorStats struct held by ExecutorResourceMgr. Used for testing currently...
void mark_request_finished(const RequestId request_id)
Internal method: Invoked on successful completion of a query step from release_resources method...
std::unique_ptr< ExecutorResourceHandle > request_resources_with_timeout(const RequestInfo &request_info, const size_t timeout_in_ms)
Requests resources from ExecutorResourceMgr, will throw if request takes longer than time specified b...
std::shared_mutex executing_set_mutex_
RW mutex that protects access to executing_requests_
void queue_request_and_wait_with_timeout(const RequestId request_id, const size_t max_wait_in_ms)
Submits a request with id request_id into the queue, waiting on a BinarySemaphore until ExecutorResou...
std::chrono::steady_clock::time_point enqueue_time
void set_resource(const ResourceType resource_type, const size_t resoure_quantity)
Used to change the total amount available of a specified resource after construction of ExecutorResou...
std::chrono::steady_clock::time_point execution_finished_time
size_t total_execution_time_ms
void mark_request_timed_out(const RequestId request_id)
Internal method: Called if the request times out (i.e. request was made via request_resources_with_ti...
void resume_process_queue()
Resumes the process queue in a thread-safe manner. If the process queue is already paused...
std::condition_variable processor_queue_condition_
std::thread process_queue_thread_
The thread started in the ExecutorResourceMgr constructor that continuously loops inside of process_q...
Specifies all DataMgr chunks needed for a query step/request, along with their sizes in bytes...
size_t total_cpu_execution_time_ms
void set_process_queue_flag()
Internal method: Set the should_process_queue_ flag to true, signifying that the queue should be proc...
const double max_available_resource_use_ratio_
const bool enable_debug_printing_
std::mutex pause_processor_queue_mutex_
void print_executor_stats() const
Prints the ExecutorStats struct. Use for debugging.
std::set< RequestId > queued_requests_
Set of all request ids that are currently queued. Protected by queued_set_mutex_. ...
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...
Stores info pertaining to a single request made to ExecutorResourceMgr, including its request_id...
bool stop_process_queue_thread_
size_t cpu_requests_actually_queued
RequestId enqueue_request(const RequestInfo &request_info, const size_t timeout_in_ms, const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant)
Internal method: Invoked from request_resource/request_resource_with_timeout, places request in the r...
RequestStats get_request_for_id(const RequestId request_id) const
Internal method: Returns the RequestStats for a request specified by request_id.
size_t total_queue_time_ms
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const