21 #include <condition_variable>
24 #include <shared_mutex>
30 namespace ExecutorResourceMgr_Namespace {
109 const std::chrono::steady_clock::time_point&
enqueue_time,
113 : request_id(request_id)
114 , request_info(request_info)
115 , min_resource_grant(min_resource_grant)
116 , max_resource_grant(max_resource_grant)
117 , enqueue_time(enqueue_time)
118 , queue_length_at_entry(queue_length_at_entry)
119 , device_type_queue_length_at_entry(device_type_queue_length_at_entry)
120 , timeout_in_ms(timeout_in_ms) {}
125 class ExecutorResourceHandle;
145 const std::vector<std::pair<ResourceType, size_t>>& total_resources,
146 const std::vector<ConcurrentResourceGrantPolicy>&
147 concurrent_resource_grant_policies,
148 const std::vector<ResourceGrantPolicy>& max_per_request_resource_grant_policies,
149 const double max_available_resource_use_ratio);
171 const size_t timeout_in_ms);
354 const size_t timeout_in_ms,
571 const size_t num_cpu_slots,
572 const size_t num_gpu_slots,
573 const size_t cpu_result_mem,
574 const size_t cpu_buffer_pool_mem,
575 const size_t gpu_buffer_pool_mem,
576 const double per_query_max_cpu_slots_ratio,
577 const double per_query_max_cpu_result_mem_ratio,
578 const double per_query_max_pinned_cpu_buffer_pool_mem_ratio,
579 const double per_query_max_pageable_cpu_buffer_pool_mem_ratio,
580 const bool allow_cpu_kernel_concurrency,
581 const bool allow_cpu_gpu_kernel_concurrency,
582 const bool allow_cpu_slot_oversubscription_concurrency,
583 const bool allow_gpu_slot_oversubscription,
584 const bool allow_cpu_result_mem_oversubscription_concurrency,
585 const double max_available_resource_use_ratio);
A container for various stats about the current state of the ExecutorResourcePool. Note that ExecutorResourcePool does not persist a struct of this type, but rather builds one on the fly when ExecutorResourcePool::get_resource_info() is called.
void release_resources(const RequestId request_id, const ResourceGrant &resource_grant)
Instructs ExecutorResourceMgr that the resources held by the requestor with the given request_id can ...
size_t requests_timed_out
size_t sum_gpu_queue_size_at_entry
A container to store requested and minimum neccessary resource requests across all resource types cur...
Stores and allows access to a binary semaphore per RequestId (using an std::unordered_map), as well as accessing all outstanding RequestIds for waiting requests.
std::shared_ptr< ExecutorResourceMgr > generate_executor_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const double per_query_max_pinned_cpu_buffer_pool_mem_ratio, const double per_query_max_pageable_cpu_buffer_pool_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_gpu_slot_oversubscription, const bool allow_cpu_result_mem_oversubscription_concurrency, const double max_available_resource_use_ratio)
Convenience factory-esque method that allows us to use the same logic to generate an ExecutorResource...
ChunkRequestInfo get_chunk_request_info(const RequestId request_id)
Get the DataMgr chunk ids and associated sizes pertaining to the input data needed by a request...
void remove_request_from_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Removes the request specified by the provided request_id from the specified stage...
size_t total_gpu_queue_time_ms
std::unique_ptr< ExecutorResourceHandle > request_resources(const RequestInfo &request_info)
Requests resources from ExecutorResourceMgr, with no timeout (unlike request_resources_with_timeout) ...
RequestId get_request_id() const
size_t sum_queue_size_at_entry
bool should_process_queue_
std::condition_variable pause_processor_queue_condition_
size_t sum_cpu_queue_size_at_entry
const size_t ACTUALLY_QUEUED_MIN_MS
void add_request_to_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Adds the request specified by the provided request_id to the specified stage...
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
bool process_queue_is_paused_
ResourceType
Stores the resource type for a ExecutorResourcePool request.
const ResourceGrant min_resource_grant
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
void process_queue_loop()
Internal method: A thread is assigned to run this function in the constructor of ExecutorResourceMgr...
~ExecutorResourceHandle()
Specifies the policies for resource grants in the presence of other requests, both under situations o...
ExecutorResourceMgr(const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies, const double max_available_resource_use_ratio)
The constructor instantiates an ExecutorResourcePool with the provided parameters, and starts the process queue by launching a thread to invoke process_queue_loop.
size_t cpu_requests_executing
void stop_process_queue_thread()
Internal method: Invoked from ExecutorResourceMgr destructor, sets stop_process_queue_thread_ to true...
size_t device_type_queue_length_at_entry
~ExecutorResourceMgr()
The destructor ensures that the process queue thread (process_queue_thread) is stopped and that any t...
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Set the concurrent resource grant policy for a given resource type (stored in ConcurrentResourceGrant...
size_t cpu_requests_executed
void mark_request_error(const RequestId request_id, std::string error_msg)
std::chrono::steady_clock::time_point deque_time
std::vector< RequestId > get_requests_for_stage(const ExecutionRequestStage request_status) const
Internal method: Get the request ids for a given stage (QUEUED or EXECUTING)
Specifies the resources of each type for a given resource grant.
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
size_t requests_executing
size_t gpu_requests_executed
ResourcePoolInfo get_resource_info() const
Returns a struct containing the total and allocated amounts of all resources tracked by ExecutorResou...
const ResourceGrant resource_grant_
const RequestId INVALID_REQUEST_ID
std::set< RequestId > executing_requests_
Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_.
std::atomic< size_t > requests_count_
An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming r...
ResourceGrant actual_resource_grant
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const
Get the concurrent resource grant policy for a given resource type.
void mark_request_dequed(const RequestId request_id)
Internal method: Moves the request from the QUEUED stage to EXECUTING stage and performs other bookke...
RequestId choose_next_request()
Internal method: Invoked from process_queue_loop, chooses the next resource request to grant...
ExecutorResourceHandle(std::shared_ptr< ExecutorResourceMgr > resource_mgr, const RequestId request_id, const ResourceGrant &resource_grant)
std::pair< size_t, size_t > get_resource_info(const ResourceType resource_type) const
Returns the allocated and total available amount of the resource specified.
std::shared_mutex queued_set_mutex_
RW mutex that protects access to queued_requests_
const RequestId request_id
const bool enable_stats_printing_
const RequestInfo request_info
size_t process_queue_counter_
ExecutorResourcePool keeps track of available compute and memory resources and can be queried to get ...
bool pause_process_queue_
size_t total_cpu_queue_time_ms
OutstandingQueueRequests outstanding_queue_requests_
Stores and manages a map of request ids to BinarySemaphore objects to allow threads waiting for resou...
ResourceGrant get_resource_grant() const
Stores current key statistics relating to ExecutorResourceMgr state, particularly around the number o...
size_t requests_with_timeouts
size_t gpu_requests_executing
size_t requests_actually_queued
size_t gpu_requests_actually_queued
size_t queue_length_at_entry
size_t total_gpu_execution_time_ms
A wrapper returned by ExecutorResourceMgr to the requestee, containing the ResourceGrant that was gra...
const RequestId request_id_
std::optional< std::string > error
void pause_process_queue()
Pauses the process queue in a thread-safe manner, waiting for all queries in the executing stage to f...
ExecutorStats get_executor_stats() const
Returns a copy of the ExecutorStats struct held by ExecutorResourceMgr. Used for testing currently...
void mark_request_finished(const RequestId request_id)
Internal method: Invoked on successful completion of a query step from release_resources method...
std::pair< size_t, size_t > get_resource_info(const ResourceType resource_type) const
Returns the allocated and total available amount of the resource specified.
std::unique_ptr< ExecutorResourceHandle > request_resources_with_timeout(const RequestInfo &request_info, const size_t timeout_in_ms)
Requests resources from ExecutorResourceMgr, will throw if request takes longer than time specified b...
std::shared_mutex executing_set_mutex_
RW mutex that protects access to executing_requests_
std::chrono::steady_clock::time_point enqueue_time
void set_resource(const ResourceType resource_type, const size_t resoure_quantity)
Used to change the total amount available of a specified resource after construction of ExecutorResou...
std::chrono::steady_clock::time_point execution_finished_time
size_t total_execution_time_ms
void mark_request_timed_out(const RequestId request_id)
Internal method: Called if the request times out (i.e. request was made via request_resources_with_ti...
void resume_process_queue()
Resumes the process queue in a thread-safe manner. If the process queue is already paused...
std::condition_variable processor_queue_condition_
ExecutorResourceMgr is the central manager for resources available to all executors in the system...
std::thread process_queue_thread_
The thread started in the ExecutorResourceMgr constructor that continuously loops inside of process_q...
std::shared_timed_mutex shared_mutex
Specifies all DataMgr chunks needed for a query step/request, along with their sizes in bytes...
size_t total_cpu_execution_time_ms
void set_process_queue_flag()
Internal method: Set the should_process_queue_ flag to true, signifying that the queue should be proc...
const double max_available_resource_use_ratio_
const bool enable_debug_printing_
std::mutex pause_processor_queue_mutex_
const ResourceGrant max_resource_grant
void print_executor_stats() const
Prints the ExecutorStats struct. Use for debugging.
std::set< RequestId > queued_requests_
Set of all request ids that are currently queued. Protected by queued_set_mutex_. ...
RequestStats(const RequestId request_id, const RequestInfo &request_info, const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant, const std::chrono::steady_clock::time_point &enqueue_time, const size_t queue_length_at_entry, const size_t device_type_queue_length_at_entry, const size_t timeout_in_ms)
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...
Stores info pertaining to a single request made to ExecutorResourceMgr, including its request_id...
bool stop_process_queue_thread_
std::shared_ptr< ExecutorResourceMgr > resource_mgr_
size_t cpu_requests_actually_queued
RequestId enqueue_request(const RequestInfo &request_info, const size_t timeout_in_ms, const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant)
Internal method: Invoked from request_resource/request_resource_with_timeout, places request in the r...
RequestStats get_request_for_id(const RequestId request_id) const
Internal method: Returns the RequestStats for a request specified by request_id.
size_t total_queue_time_ms