Skip to content

refactor(executor): support pluggable query pipeline executors #17346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/base/src/runtime/defer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::runtime::LimitMemGuard;

pub fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
let _guard = LimitMemGuard::enter_unlimited();

struct Defer<F: FnOnce() -> R, R>(Option<F>);

impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
pipelines.push(build_res.main_pipeline);

let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
table_ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
let query_handle = complete_executor.execute().await?;
table_ctx.set_query_handle(complete_executor.get_handle())?;
query_handle.wait().await?;
}

let segment_reader =
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::clusters::ClusterDiscovery;
use crate::locks::LockManager;
#[cfg(feature = "enable_queries_executor")]
use crate::pipelines::executor::GlobalQueriesExecutor;
use crate::pipelines::executor::NewQueryPipelineExecutor;
use crate::servers::flight::v1::exchange::DataExchangeManager;
use crate::servers::http::v1::ClientSessionManager;
use crate::servers::http::v1::HttpQueryManager;
Expand Down Expand Up @@ -91,6 +92,8 @@ impl GlobalServices {
// 4. cluster discovery init.
ClusterDiscovery::init(config).await?;

NewQueryPipelineExecutor::init(config)?;

// TODO(xuanwo):
//
// This part is a bit complex because catalog are used widely in different
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ async fn compact_table(

// Clears previously generated segment locations to avoid duplicate data in the refresh phase
ctx.clear_written_segment_locations()?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
let query_handle = complete_executor.execute().await?;
ctx.set_query_handle(complete_executor.get_handle())?;
query_handle.wait().await?;
drop(complete_executor);
}
}
Expand Down
15 changes: 9 additions & 6 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
ctx_cloned.set_executor(complete_executor.get_inner())?;
complete_executor.execute()
let query_handle = complete_executor.execute().await?;
ctx_cloned.set_query_handle(complete_executor.get_handle())?;
query_handle.wait().await
} else {
Ok(())
}
Expand All @@ -162,8 +163,9 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
ctx_cloned.set_executor(complete_executor.get_inner())?;
complete_executor.execute()
let query_handle = complete_executor.execute().await?;
ctx_cloned.set_query_handle(complete_executor.get_handle())?;
query_handle.wait().await
} else {
Ok(())
}
Expand All @@ -189,8 +191,9 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
ctx_cloned.set_executor(complete_executor.get_inner())?;
complete_executor.execute()
let query_handle = complete_executor.execute().await?;
ctx_cloned.set_query_handle(complete_executor.get_handle())?;
query_handle.wait().await
} else {
Ok(())
}
Expand Down
11 changes: 6 additions & 5 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,16 @@ pub trait Interpreter: Sync + Send {

let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;

ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
let query_handle = complete_executor.execute().await?;
ctx.set_query_handle(complete_executor.get_handle())?;
query_handle.wait().await?;
self.inject_result()
} else {
let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;

ctx.set_executor(pulling_executor.get_inner())?;
ctx.set_query_handle(executor.start().await?)?;
Ok(Box::pin(ProgressStream::try_create(
Box::pin(PullingExecutorStream::create(pulling_executor)?),
Box::pin(PullingExecutorStream::create(executor)?),
ctx.get_result_progress(),
)?))
}
Expand Down
15 changes: 8 additions & 7 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ impl ExplainInterpreter {
let build_res = build_query_pipeline(&self.ctx, &[], &plan, ignore_result).await?;

// Drain the data
let query_profiles = self.execute_and_get_profiles(build_res)?;
let query_profiles = self.execute_and_get_profiles(build_res).await?;

Ok(GraphicalProfiles {
query_id: query_ctx.get_id(),
Expand All @@ -443,7 +443,7 @@ impl ExplainInterpreter {
let build_res = build_query_pipeline(&self.ctx, &[], &plan, ignore_result).await?;

// Drain the data
let query_profiles = self.execute_and_get_profiles(build_res)?;
let query_profiles = self.execute_and_get_profiles(build_res).await?;

let result = if self.partial {
format_partial_tree(&plan, metadata, &query_profiles)?.format_pretty()?
Expand All @@ -466,7 +466,7 @@ impl ExplainInterpreter {
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

fn execute_and_get_profiles(
async fn execute_and_get_profiles(
&self,
mut build_res: PipelineBuildResult,
) -> Result<HashMap<u32, PlanProfile>> {
Expand All @@ -485,16 +485,17 @@ impl ExplainInterpreter {
pipelines.push(build_res.main_pipeline);

let executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
executor.execute()?;
let query_handle = executor.execute().await?;
self.ctx
.add_query_profiles(&executor.get_inner().fetch_profiling(false));
.add_query_profiles(&executor.get_handle().fetch_profiling(false));
query_handle.wait().await?;
}
false => {
let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
executor.start();
executor.start().await?;
while (executor.pull_data()?).is_some() {}
self.ctx
.add_query_profiles(&executor.get_inner().fetch_profiling(false));
.add_query_profiles(&executor.get_handle().fetch_profiling(false));
}
}
Ok(self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,10 @@ impl ReclusterTableInterpreter {
let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
self.ctx.clear_written_segment_locations()?;
self.ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;

let query_handle = complete_executor.execute().await?;
self.ctx.set_query_handle(complete_executor.get_handle())?;
query_handle.wait().await?;
// make sure the executor is dropped before the next loop.
drop(complete_executor);
// make sure the lock guard is dropped before the next loop.
Expand Down
16 changes: 8 additions & 8 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_base::runtime::error_info::NodeErrorType;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::MemStat;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::ThreadTracker;
use databend_common_base::runtime::TrackingPayload;
use databend_common_base::runtime::TrySpawn;
Expand Down Expand Up @@ -59,7 +60,6 @@ use crate::pipelines::executor::ProcessorAsyncTask;
use crate::pipelines::executor::QueriesExecutorTasksQueue;
use crate::pipelines::executor::QueriesPipelineExecutor;
use crate::pipelines::executor::QueryExecutorTasksQueue;
use crate::pipelines::executor::QueryPipelineExecutor;
use crate::pipelines::executor::WorkersCondvar;
use crate::pipelines::processors::connect;
use crate::pipelines::processors::DirectedEdge;
Expand Down Expand Up @@ -513,7 +513,7 @@ impl ScheduleQueue {
mut self,
global: &Arc<QueryExecutorTasksQueue>,
context: &mut ExecutorWorkerContext,
executor: &Arc<QueryPipelineExecutor>,
executor: &Arc<Runtime>,
) {
debug_assert!(!context.has_task());

Expand Down Expand Up @@ -541,7 +541,7 @@ impl ScheduleQueue {
pub fn schedule_async_task(
proc: ProcessorWrapper,
query_id: Arc<String>,
executor: &Arc<QueryPipelineExecutor>,
executor: &Arc<Runtime>,
wakeup_worker_id: usize,
workers_condvar: Arc<WorkersCondvar>,
global_queue: Arc<QueryExecutorTasksQueue>,
Expand All @@ -553,7 +553,7 @@ impl ScheduleQueue {
let tracking_payload = graph.get_node_tracking_payload(node_index);
let _guard = ThreadTracker::tracking(tracking_payload.clone());
let process_future = proc.processor.async_process();
executor.async_runtime.spawn(
executor.spawn(
ProcessorAsyncTask::create(
query_id,
wakeup_worker_id,
Expand Down Expand Up @@ -716,16 +716,16 @@ impl RunningGraph {
/// # Safety
///
/// Method is thread unsafe and require thread safe call
pub unsafe fn init_schedule_queue(self: Arc<Self>, capacity: usize) -> Result<ScheduleQueue> {
ExecutingGraph::init_schedule_queue(&self.0, capacity, &self)
pub unsafe fn init_schedule_queue(self: &Arc<Self>, capacity: usize) -> Result<ScheduleQueue> {
ExecutingGraph::init_schedule_queue(&self.0, capacity, self)
}

/// # Safety
///
/// Method is thread unsafe and require thread safe call
pub unsafe fn schedule_queue(self: Arc<Self>, node_index: NodeIndex) -> Result<ScheduleQueue> {
pub unsafe fn schedule_queue(self: &Arc<Self>, node_index: NodeIndex) -> Result<ScheduleQueue> {
let mut schedule_queue = ScheduleQueue::with_capacity(0);
ExecutingGraph::schedule_queue(&self.0, node_index, &mut schedule_queue, &self)?;
ExecutingGraph::schedule_queue(&self.0, node_index, &mut schedule_queue, self)?;
Ok(schedule_queue)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ impl ExecutorWorkerContext {
std::mem::replace(&mut self.task, ExecutorTask::None)
}

pub fn get_task_pid(&self) -> NodeIndex {
unsafe {
match &self.task {
ExecutorTask::None => unreachable!(),
ExecutorTask::Sync(p) => p.processor.id(),
ExecutorTask::Async(p) => p.processor.id(),
ExecutorTask::AsyncCompleted(p) => p.id,
}
}
}

pub fn get_task_info(&self) -> Option<(Arc<RunningGraph>, NodeIndex)> {
unsafe {
match &self.task {
Expand All @@ -120,6 +131,25 @@ impl ExecutorWorkerContext {
}
}

/// # Safety
pub unsafe fn execute_task_new(&mut self) -> Result<NodeIndex> {
match std::mem::replace(&mut self.task, ExecutorTask::None) {
ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")),
ExecutorTask::Sync(processor) => match self.execute_sync_task(processor) {
Ok(Some((node_idx, _))) => Ok(node_idx),
Ok(None) => Err(ErrorCode::Internal("Execute none task.")),
Err(cause) => Err(cause),
},
ExecutorTask::Async(_processor) => Err(ErrorCode::Internal(
"Async task should only be executed on queries executor",
)),
ExecutorTask::AsyncCompleted(task) => match task.res {
Ok(_) => Ok(task.id),
Err(cause) => Err(cause),
},
}
}

/// # Safety
pub unsafe fn execute_task(
&mut self,
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/pipelines/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod executor_graph;
mod executor_settings;
mod executor_worker_context;
mod global_queries_executor;
mod new_query_pipeline_executor;
mod pipeline_complete_executor;
mod pipeline_executor;
mod pipeline_pulling_executor;
Expand All @@ -37,8 +38,10 @@ pub use executor_worker_context::CompletedAsyncTask;
pub use executor_worker_context::ExecutorTask;
pub use executor_worker_context::ExecutorWorkerContext;
pub use global_queries_executor::GlobalQueriesExecutor;
pub use new_query_pipeline_executor::NewQueryPipelineExecutor;
pub use pipeline_complete_executor::PipelineCompleteExecutor;
pub use pipeline_executor::PipelineExecutor;
pub use pipeline_executor::QueryHandle;
pub use pipeline_pulling_executor::PipelinePullingExecutor;
pub use pipeline_pushing_executor::PipelinePushingExecutor;
pub use processor_async_task::ProcessorAsyncTask;
Expand Down
Loading
Loading