diff --git a/src/common/base/src/runtime/defer.rs b/src/common/base/src/runtime/defer.rs index 9756278b631b6..1678399862240 100644 --- a/src/common/base/src/runtime/defer.rs +++ b/src/common/base/src/runtime/defer.rs @@ -12,7 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::runtime::LimitMemGuard; + pub fn defer R, R>(f: F) -> impl Drop { + let _guard = LimitMemGuard::enter_unlimited(); + struct Defer R, R>(Option); impl R, R> Drop for Defer { diff --git a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs index 2259feaf9bf7b..e3a03d6c0372c 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs @@ -91,8 +91,9 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> { pipelines.push(build_res.main_pipeline); let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - table_ctx.set_executor(complete_executor.get_inner())?; - complete_executor.execute()?; + let query_handle = complete_executor.execute().await?; + table_ctx.set_query_handle(complete_executor.get_handle())?; + query_handle.wait().await?; } let segment_reader = diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index ed3a8db7bbc2a..321cbcb0b4435 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -47,6 +47,7 @@ use crate::clusters::ClusterDiscovery; use crate::locks::LockManager; #[cfg(feature = "enable_queries_executor")] use crate::pipelines::executor::GlobalQueriesExecutor; +use crate::pipelines::executor::NewQueryPipelineExecutor; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::servers::http::v1::ClientSessionManager; use crate::servers::http::v1::HttpQueryManager; @@ -91,6 +92,8 @@ impl GlobalServices { // 4. cluster discovery init. ClusterDiscovery::init(config).await?; + NewQueryPipelineExecutor::init(config)?; + // TODO(xuanwo): // // This part is a bit complex because catalog are used widely in different diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 3c695cf3465e9..319d91799b897 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -187,8 +187,9 @@ async fn compact_table( // Clears previously generated segment locations to avoid duplicate data in the refresh phase ctx.clear_written_segment_locations()?; - ctx.set_executor(complete_executor.get_inner())?; - complete_executor.execute()?; + let query_handle = complete_executor.execute().await?; + ctx.set_query_handle(complete_executor.get_handle())?; + query_handle.wait().await?; drop(complete_executor); } } diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index 52a775668244f..6f5ac8602e92e 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -135,8 +135,9 @@ async fn do_refresh(ctx: Arc, desc: RefreshDesc) -> Result<()> { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - ctx_cloned.set_executor(complete_executor.get_inner())?; - complete_executor.execute() + let query_handle = complete_executor.execute().await?; + ctx_cloned.set_query_handle(complete_executor.get_handle())?; + query_handle.wait().await } else { Ok(()) } @@ -162,8 +163,9 @@ async fn do_refresh(ctx: Arc, desc: RefreshDesc) -> Result<()> { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - ctx_cloned.set_executor(complete_executor.get_inner())?; - complete_executor.execute() + let query_handle = complete_executor.execute().await?; + ctx_cloned.set_query_handle(complete_executor.get_handle())?; + query_handle.wait().await } else { Ok(()) } @@ -189,8 +191,9 @@ async fn do_refresh(ctx: Arc, desc: RefreshDesc) -> Result<()> { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - ctx_cloned.set_executor(complete_executor.get_inner())?; - complete_executor.execute() + let query_handle = complete_executor.execute().await?; + ctx_cloned.set_query_handle(complete_executor.get_handle())?; + query_handle.wait().await } else { Ok(()) } diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index fdc9e32217728..20a455f4799ac 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -132,15 +132,16 @@ pub trait Interpreter: Sync + Send { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - ctx.set_executor(complete_executor.get_inner())?; - complete_executor.execute()?; + let query_handle = complete_executor.execute().await?; + ctx.set_query_handle(complete_executor.get_handle())?; + query_handle.wait().await?; self.inject_result() } else { - let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; - ctx.set_executor(pulling_executor.get_inner())?; + ctx.set_query_handle(executor.start().await?)?; Ok(Box::pin(ProgressStream::try_create( - Box::pin(PullingExecutorStream::create(pulling_executor)?), + Box::pin(PullingExecutorStream::create(executor)?), ctx.get_result_progress(), )?)) } diff --git a/src/query/service/src/interpreters/interpreter_explain.rs b/src/query/service/src/interpreters/interpreter_explain.rs index 69f13c13ee04f..05b1b61b76786 100644 --- a/src/query/service/src/interpreters/interpreter_explain.rs +++ b/src/query/service/src/interpreters/interpreter_explain.rs @@ -421,7 +421,7 @@ impl ExplainInterpreter { let build_res = build_query_pipeline(&self.ctx, &[], &plan, ignore_result).await?; // Drain the data - let query_profiles = self.execute_and_get_profiles(build_res)?; + let query_profiles = self.execute_and_get_profiles(build_res).await?; Ok(GraphicalProfiles { query_id: query_ctx.get_id(), @@ -443,7 +443,7 @@ impl ExplainInterpreter { let build_res = build_query_pipeline(&self.ctx, &[], &plan, ignore_result).await?; // Drain the data - let query_profiles = self.execute_and_get_profiles(build_res)?; + let query_profiles = self.execute_and_get_profiles(build_res).await?; let result = if self.partial { format_partial_tree(&plan, metadata, &query_profiles)?.format_pretty()? @@ -466,7 +466,7 @@ impl ExplainInterpreter { Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])]) } - fn execute_and_get_profiles( + async fn execute_and_get_profiles( &self, mut build_res: PipelineBuildResult, ) -> Result> { @@ -485,16 +485,17 @@ impl ExplainInterpreter { pipelines.push(build_res.main_pipeline); let executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - executor.execute()?; + let query_handle = executor.execute().await?; self.ctx - .add_query_profiles(&executor.get_inner().fetch_profiling(false)); + .add_query_profiles(&executor.get_handle().fetch_profiling(false)); + query_handle.wait().await?; } false => { let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; - executor.start(); + executor.start().await?; while (executor.pull_data()?).is_some() {} self.ctx - .add_query_profiles(&executor.get_inner().fetch_profiling(false)); + .add_query_profiles(&executor.get_handle().fetch_profiling(false)); } } Ok(self diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 9c9f08633d285..8189028d3163a 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -218,8 +218,10 @@ impl ReclusterTableInterpreter { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; self.ctx.clear_written_segment_locations()?; - self.ctx.set_executor(complete_executor.get_inner())?; - complete_executor.execute()?; + + let query_handle = complete_executor.execute().await?; + self.ctx.set_query_handle(complete_executor.get_handle())?; + query_handle.wait().await?; // make sure the executor is dropped before the next loop. drop(complete_executor); // make sure the lock guard is dropped before the next loop. diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 7a56e250ee5a1..ed6758ed4cd46 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -29,6 +29,7 @@ use databend_common_base::runtime::error_info::NodeErrorType; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_base::runtime::MemStat; +use databend_common_base::runtime::Runtime; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrackingPayload; use databend_common_base::runtime::TrySpawn; @@ -59,7 +60,6 @@ use crate::pipelines::executor::ProcessorAsyncTask; use crate::pipelines::executor::QueriesExecutorTasksQueue; use crate::pipelines::executor::QueriesPipelineExecutor; use crate::pipelines::executor::QueryExecutorTasksQueue; -use crate::pipelines::executor::QueryPipelineExecutor; use crate::pipelines::executor::WorkersCondvar; use crate::pipelines::processors::connect; use crate::pipelines::processors::DirectedEdge; @@ -513,7 +513,7 @@ impl ScheduleQueue { mut self, global: &Arc, context: &mut ExecutorWorkerContext, - executor: &Arc, + executor: &Arc, ) { debug_assert!(!context.has_task()); @@ -541,7 +541,7 @@ impl ScheduleQueue { pub fn schedule_async_task( proc: ProcessorWrapper, query_id: Arc, - executor: &Arc, + executor: &Arc, wakeup_worker_id: usize, workers_condvar: Arc, global_queue: Arc, @@ -553,7 +553,7 @@ impl ScheduleQueue { let tracking_payload = graph.get_node_tracking_payload(node_index); let _guard = ThreadTracker::tracking(tracking_payload.clone()); let process_future = proc.processor.async_process(); - executor.async_runtime.spawn( + executor.spawn( ProcessorAsyncTask::create( query_id, wakeup_worker_id, @@ -716,16 +716,16 @@ impl RunningGraph { /// # Safety /// /// Method is thread unsafe and require thread safe call - pub unsafe fn init_schedule_queue(self: Arc, capacity: usize) -> Result { - ExecutingGraph::init_schedule_queue(&self.0, capacity, &self) + pub unsafe fn init_schedule_queue(self: &Arc, capacity: usize) -> Result { + ExecutingGraph::init_schedule_queue(&self.0, capacity, self) } /// # Safety /// /// Method is thread unsafe and require thread safe call - pub unsafe fn schedule_queue(self: Arc, node_index: NodeIndex) -> Result { + pub unsafe fn schedule_queue(self: &Arc, node_index: NodeIndex) -> Result { let mut schedule_queue = ScheduleQueue::with_capacity(0); - ExecutingGraph::schedule_queue(&self.0, node_index, &mut schedule_queue, &self)?; + ExecutingGraph::schedule_queue(&self.0, node_index, &mut schedule_queue, self)?; Ok(schedule_queue) } diff --git a/src/query/service/src/pipelines/executor/executor_worker_context.rs b/src/query/service/src/pipelines/executor/executor_worker_context.rs index a6380b8bb85ee..eeb6495efb98f 100644 --- a/src/query/service/src/pipelines/executor/executor_worker_context.rs +++ b/src/query/service/src/pipelines/executor/executor_worker_context.rs @@ -109,6 +109,17 @@ impl ExecutorWorkerContext { std::mem::replace(&mut self.task, ExecutorTask::None) } + pub fn get_task_pid(&self) -> NodeIndex { + unsafe { + match &self.task { + ExecutorTask::None => unreachable!(), + ExecutorTask::Sync(p) => p.processor.id(), + ExecutorTask::Async(p) => p.processor.id(), + ExecutorTask::AsyncCompleted(p) => p.id, + } + } + } + pub fn get_task_info(&self) -> Option<(Arc, NodeIndex)> { unsafe { match &self.task { @@ -120,6 +131,25 @@ impl ExecutorWorkerContext { } } + /// # Safety + pub unsafe fn execute_task_new(&mut self) -> Result { + match std::mem::replace(&mut self.task, ExecutorTask::None) { + ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")), + ExecutorTask::Sync(processor) => match self.execute_sync_task(processor) { + Ok(Some((node_idx, _))) => Ok(node_idx), + Ok(None) => Err(ErrorCode::Internal("Execute none task.")), + Err(cause) => Err(cause), + }, + ExecutorTask::Async(_processor) => Err(ErrorCode::Internal( + "Async task should only be executed on queries executor", + )), + ExecutorTask::AsyncCompleted(task) => match task.res { + Ok(_) => Ok(task.id), + Err(cause) => Err(cause), + }, + } + } + /// # Safety pub unsafe fn execute_task( &mut self, diff --git a/src/query/service/src/pipelines/executor/mod.rs b/src/query/service/src/pipelines/executor/mod.rs index f5b9f1520586c..d4d758af4934c 100644 --- a/src/query/service/src/pipelines/executor/mod.rs +++ b/src/query/service/src/pipelines/executor/mod.rs @@ -19,6 +19,7 @@ mod executor_graph; mod executor_settings; mod executor_worker_context; mod global_queries_executor; +mod new_query_pipeline_executor; mod pipeline_complete_executor; mod pipeline_executor; mod pipeline_pulling_executor; @@ -37,8 +38,10 @@ pub use executor_worker_context::CompletedAsyncTask; pub use executor_worker_context::ExecutorTask; pub use executor_worker_context::ExecutorWorkerContext; pub use global_queries_executor::GlobalQueriesExecutor; +pub use new_query_pipeline_executor::NewQueryPipelineExecutor; pub use pipeline_complete_executor::PipelineCompleteExecutor; pub use pipeline_executor::PipelineExecutor; +pub use pipeline_executor::QueryHandle; pub use pipeline_pulling_executor::PipelinePullingExecutor; pub use pipeline_pushing_executor::PipelinePushingExecutor; pub use processor_async_task::ProcessorAsyncTask; diff --git a/src/query/service/src/pipelines/executor/new_query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/new_query_pipeline_executor.rs new file mode 100644 index 0000000000000..a1e6b3ad89686 --- /dev/null +++ b/src/query/service/src/pipelines/executor/new_query_pipeline_executor.rs @@ -0,0 +1,435 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Barrier; +use std::time::Instant; + +use databend_common_base::base::GlobalInstance; +use databend_common_base::runtime::catch_unwind; +use databend_common_base::runtime::defer; +use databend_common_base::runtime::error_info::NodeErrorType; +use databend_common_base::runtime::GlobalIORuntime; +use databend_common_base::runtime::MemStat; +use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::Thread; +use databend_common_base::runtime::ThreadTracker; +use databend_common_base::runtime::TrySpawn; +use databend_common_base::JoinHandle; +use databend_common_config::InnerConfig; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_pipeline_core::ExecutionInfo; +use databend_common_pipeline_core::FinishedCallbackChain; +use databend_common_pipeline_core::LockGuard; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_core::PlanProfile; +use parking_lot::Mutex; +use petgraph::prelude::NodeIndex; +use tokio::sync::watch::Receiver; + +use crate::pipelines::executor::executor_graph::ScheduleQueue; +use crate::pipelines::executor::pipeline_executor::InitCallback; +use crate::pipelines::executor::pipeline_executor::NewPipelineExecutor; +use crate::pipelines::executor::pipeline_executor::QueryHandle; +use crate::pipelines::executor::pipeline_executor::QueryTask; +use crate::pipelines::executor::ExecutorSettings; +use crate::pipelines::executor::ExecutorWorkerContext; +use crate::pipelines::executor::QueryExecutorTasksQueue; +use crate::pipelines::executor::RunningGraph; +use crate::pipelines::executor::WorkersCondvar; + +pub struct NewQueryPipelineExecutor { + tx: async_channel::Sender, +} + +impl NewQueryPipelineExecutor { + pub fn create() -> Arc { + let (tx, rx) = async_channel::bounded(4); + + GlobalIORuntime::instance().spawn(async move { + let background = QueryPipelineExecutorBackground::create(rx); + background.work_loop().await + }); + + Arc::new(NewQueryPipelineExecutor { tx }) + } + + pub fn init(_config: &InnerConfig) -> Result<()> { + let new_query_pipeline_executor = NewQueryPipelineExecutor::create(); + GlobalInstance::set(new_query_pipeline_executor); + Ok(()) + } +} + +#[async_trait::async_trait] +impl NewPipelineExecutor for NewQueryPipelineExecutor { + async fn submit( + &self, + pipelines: Vec, + settings: ExecutorSettings, + ) -> Result> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + let tracking_payload = ThreadTracker::new_tracking_payload(); + let query_task = QueryTask::try_create(pipelines, tx, settings, tracking_payload)?; + + if let Err(_cause) = self.tx.send(query_task).await { + return Err(ErrorCode::Internal("")); + } + + match rx.await { + Ok(submit_res) => submit_res, + Err(_cause) => Err(ErrorCode::Internal("Broken query task")), + } + } +} + +pub struct QueryPipelineExecutorBackground { + rx: async_channel::Receiver, +} + +impl QueryPipelineExecutorBackground { + pub fn create(rx: async_channel::Receiver) -> Self { + QueryPipelineExecutorBackground { rx } + } + + pub async fn work_loop(&self) { + while let Ok(msg) = self.rx.recv().await { + let tracking_payload = msg.tracking_payload.clone(); + let _tracking_payload_guard = ThreadTracker::tracking(tracking_payload); + + Self::recv_query_task(msg); + } + + log::info!("QueryPipelineExecutor background shutdown."); + } + + fn recv_query_task(mut msg: QueryTask) { + let instant = Instant::now(); + let query_id = msg.settings.query_id.clone(); + + let _timer_guard = defer(move || { + log::info!( + "schedule query({:?}) successfully. elapsed {:?}", + query_id, + instant.elapsed() + ); + }); + + let thread_num = msg.max_threads_num; + let tx = msg.tx.take().unwrap(); + + let (finish_tx, finish_rx) = tokio::sync::watch::channel(None); + let query_handle = QueryPipelineHandle::create(msg, finish_rx); + + if let Err(_send_error) = tx.send(Ok(query_handle.clone())) { + log::warn!( + "Ignore query {:?} in executor, because query may killed.", + &query_handle.settings.query_id + ); + return; + } + + let finish_tx = finish_tx.clone(); + let threads_barrier = Arc::new(Barrier::new(thread_num)); + for idx in 0..thread_num { + #[allow(unused_mut)] + let mut name = format!("PipelineExecutor-{}", idx); + + #[cfg(debug_assertions)] + { + // We need to pass the thread name in the unit test, because the thread name is the test name + if matches!(std::env::var("UNIT_TEST"), Ok(var_value) if var_value == "TRUE") { + if let Some(cur_thread_name) = std::thread::current().name() { + name = cur_thread_name.to_string(); + } + } + } + + let finish_tx = finish_tx.clone(); + let threads_barrier = threads_barrier.clone(); + let query_handle = query_handle.clone(); + Thread::named_spawn(Some(name), move || unsafe { + let _shutdown_guard = defer({ + let query_handle = query_handle.clone(); + + move || { + if !threads_barrier.wait().is_leader() { + return; + } + + if let Err(_cause) = finish_tx.send(Some(query_handle.on_finish())) { + log::warn!(""); + } + } + }); + + if let Err(cause) = query_handle.run_query_worker(idx, thread_num) { + // We will ignore the abort query error, because returned by finished_error if abort query. + if cause.code() == ErrorCode::ABORTED_QUERY { + return; + } + + query_handle.finish(Some(cause.clone())); + } + }); + } + } +} + +pub struct QueryPipelineHandle { + graph: Arc, + #[allow(dead_code)] + query_holds: Vec>, + workers_condvar: Arc, + global_tasks_queue: Arc, + async_runtime: Arc, + settings: ExecutorSettings, + + finished_error: Mutex>, + daemon_handle: Mutex>>, + on_init_callback: Mutex>, + on_finished_callback: Mutex, + finished_notify: Receiver>>, +} + +impl QueryPipelineHandle { + pub fn create(task: QueryTask, finished_notify: Receiver>>) -> Arc { + Arc::new(QueryPipelineHandle { + finished_notify, + graph: task.graph, + query_holds: task.holds, + workers_condvar: WorkersCondvar::create(task.max_threads_num), + + global_tasks_queue: QueryExecutorTasksQueue::create(task.max_threads_num), + async_runtime: GlobalIORuntime::instance(), + settings: task.settings, + daemon_handle: Mutex::new(None), + finished_error: Mutex::new(None), + on_init_callback: Mutex::new(Some(task.on_init_callback)), + on_finished_callback: Mutex::new(task.on_finished_callback), + }) + } + + unsafe fn init_schedule(self: &Arc, threads: usize) -> Result<()> { + let mut on_init_callback = self.on_init_callback.lock(); + + if let Some(on_init_callback) = on_init_callback.take() { + let instant = Instant::now(); + let query_id = self.settings.query_id.clone(); + let _timer_guard = defer(move || { + log::info!( + "Init pipeline successfully, query_id: {:?}, elapsed: {:?}", + query_id, + instant.elapsed() + ); + }); + + // untracking for on finished + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + if let Some(mem_stat) = &tracking_payload.mem_stat { + tracking_payload.mem_stat = Some(MemStat::create_child( + String::from("Pipeline-on-finished"), + mem_stat.get_parent_memory_stat(), + )); + } + + if let Err(cause) = Result::flatten(catch_unwind(move || { + let _guard = ThreadTracker::tracking(tracking_payload); + + on_init_callback() + })) { + return Err(cause.add_message_back("(while in query pipeline init)")); + } + + let mut init_schedule_queue = self.graph.init_schedule_queue(threads)?; + + let mut wakeup_worker_id = 0; + while let Some(proc) = init_schedule_queue.async_queue.pop_front() { + ScheduleQueue::schedule_async_task( + proc.clone(), + self.settings.query_id.clone(), + &self.async_runtime, + wakeup_worker_id, + self.workers_condvar.clone(), + self.global_tasks_queue.clone(), + ); + wakeup_worker_id += 1; + + if wakeup_worker_id == threads { + wakeup_worker_id = 0; + } + } + + let sync_queue = std::mem::take(&mut init_schedule_queue.sync_queue); + self.global_tasks_queue.init_sync_tasks(sync_queue); + self.start_executor_daemon()?; + } + + Ok(()) + } + + fn start_executor_daemon(self: &Arc) -> Result<()> { + if !self.settings.max_execute_time_in_seconds.is_zero() { + // NOTE(wake ref): When runtime scheduling is blocked, holding executor strong ref may cause the executor can not stop. + let this = Arc::downgrade(self); + let max_execute_time_in_seconds = self.settings.max_execute_time_in_seconds; + + self.async_runtime.spawn(async move { + let _ = tokio::time::sleep(max_execute_time_in_seconds).await; + if let Some(executor) = this.upgrade() { + executor.finish(Some(ErrorCode::AbortedQuery( + "Aborted query, because the execution time exceeds the maximum execution time limit", + ))); + } + }); + } + + Ok(()) + } + + pub unsafe fn run_query_worker(self: &Arc, worker: usize, threads: usize) -> Result<()> { + self.init_schedule(threads)?; + + let mut node_index = NodeIndex::new(0); + let workers_condvar = self.workers_condvar.clone(); + let mut context = ExecutorWorkerContext::create(worker, workers_condvar); + + let execute_result = catch_unwind({ + let node_index = &mut node_index; + move || { + while !self.global_tasks_queue.is_finished() { + // When there are not enough tasks, the thread will be blocked, so we need loop check. + while !self.global_tasks_queue.is_finished() && !context.has_task() { + self.global_tasks_queue.steal_task_to_context(&mut context); + } + + while !self.global_tasks_queue.is_finished() && context.has_task() { + *node_index = context.get_task_pid(); + let executed_pid = context.execute_task_new()?; + + if self.global_tasks_queue.is_finished() { + break; + } + + *node_index = executed_pid; + let runtime = &self.async_runtime; + let schedule_queue = self.graph.schedule_queue(executed_pid)?; + schedule_queue.schedule(&self.global_tasks_queue, &mut context, runtime); + } + } + + Ok(()) + } + }); + + if let Err(cause) = execute_result.flatten() { + let record_error = NodeErrorType::LocalError(cause.clone()); + self.graph.record_node_error(node_index, record_error); + self.graph.should_finish(Err(cause.clone()))?; + return Err(cause); + } + + Ok(()) + } + + fn apply_finished_chain(&self, info: ExecutionInfo) -> Result<()> { + let mut on_finished_chain = self.on_finished_callback.lock(); + + // untracking for on finished + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + if let Some(mem_stat) = &tracking_payload.mem_stat { + tracking_payload.mem_stat = Some(MemStat::create_child( + String::from("Pipeline-on-finished"), + mem_stat.get_parent_memory_stat(), + )); + } + + let _guard = ThreadTracker::tracking(tracking_payload); + on_finished_chain.apply(info) + } + + fn on_finish(&self) -> Result<()> { + { + let finished_error_guard = self.finished_error.lock(); + if let Some(error) = finished_error_guard.as_ref() { + let may_error = error.clone(); + drop(finished_error_guard); + + let profiling = self.fetch_profiling(true); + self.apply_finished_chain(ExecutionInfo::create( + Err(may_error.clone()), + profiling, + ))?; + return Err(may_error); + } + } + + if let Err(error) = self.graph.assert_finished_graph() { + let profiling = self.fetch_profiling(true); + + self.apply_finished_chain(ExecutionInfo::create(Err(error.clone()), profiling))?; + return Err(error); + } + + let profiling = self.fetch_profiling(true); + self.apply_finished_chain(ExecutionInfo::create(Ok(()), profiling))?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl QueryHandle for QueryPipelineHandle { + async fn wait(&self) -> Result<()> { + let mut finished_notify = self.finished_notify.clone(); + + let x = match finished_notify.wait_for(Option::is_some).await { + Err(_cause) => Err(ErrorCode::Internal("")), + Ok(res) => res.as_ref().unwrap().clone(), + }; + x + } + + fn is_finished(&self) -> bool { + self.global_tasks_queue.is_finished() + } + + fn finish(&self, cause: Option>) { + if let Some(cause) = cause { + let mut finished_error = self.finished_error.lock(); + + if finished_error.is_none() { + *finished_error = Some(cause); + } + } + + self.global_tasks_queue.finish(self.workers_condvar.clone()); + self.graph.interrupt_running_nodes(); + + if let Some(daemon_handle) = { self.daemon_handle.lock().take() } { + daemon_handle.abort(); + } + } + + fn fetch_profiling(&self, collect_metrics: bool) -> HashMap { + match collect_metrics { + true => self + .graph + .fetch_profiling(Some(self.settings.executor_node_id.clone())), + false => self.graph.fetch_profiling(None), + } + } +} diff --git a/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs b/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs index ed689fc856589..4ee3e5bc9c18c 100644 --- a/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs @@ -14,23 +14,26 @@ use std::sync::Arc; +use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::drop_guard; use databend_common_base::runtime::MemStat; -use databend_common_base::runtime::Thread; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrackingPayload; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_pipeline_core::Pipeline; -use fastrace::func_path; -use fastrace::prelude::*; +use parking_lot::Mutex; +use crate::pipelines::executor::pipeline_executor::NewPipelineExecutor; +use crate::pipelines::executor::pipeline_executor::QueryHandle; use crate::pipelines::executor::ExecutorSettings; -use crate::pipelines::executor::PipelineExecutor; pub struct PipelineCompleteExecutor { - executor: Arc, + pipelines: Mutex>, + settings: Mutex>, + tracking_payload: TrackingPayload, + query_handle: Mutex>>, } // Use this executor when the pipeline is complete pipeline (has source and sink) @@ -56,11 +59,12 @@ impl PipelineCompleteExecutor { "Logical error, PipelineCompleteExecutor can only work on complete pipeline.", )); } - let executor = PipelineExecutor::create(pipeline, settings)?; Ok(PipelineCompleteExecutor { - executor: Arc::new(executor), + pipelines: Mutex::new(vec![pipeline]), + settings: Mutex::new(Some(settings)), tracking_payload, + query_handle: Mutex::new(None), }) } @@ -79,48 +83,47 @@ impl PipelineCompleteExecutor { } } - let executor = PipelineExecutor::from_pipelines(pipelines, settings)?; Ok(Arc::new(PipelineCompleteExecutor { - executor: Arc::new(executor), tracking_payload, + settings: Mutex::new(Some(settings)), + pipelines: Mutex::new(pipelines), + query_handle: Mutex::new(None), })) } - pub fn get_inner(&self) -> Arc { - self.executor.clone() + pub fn get_handle(&self) -> Arc { + self.query_handle.lock().clone().unwrap() } pub fn finish(&self, cause: Option) { - let _guard = ThreadTracker::tracking(self.tracking_payload.clone()); - self.executor.finish(cause); - } - - #[fastrace::trace] - pub fn execute(&self) -> Result<()> { - let _guard = ThreadTracker::tracking(self.tracking_payload.clone()); - - Thread::named_spawn( - Some(String::from("CompleteExecutor")), - self.thread_function(), - ) - .join() - .flatten() + if let Some(handle) = self.query_handle.lock().as_ref() { + handle.finish(cause); + } } - fn thread_function(&self) -> impl Fn() -> Result<()> { - let span = Span::enter_with_local_parent(func_path!()); - let executor = self.executor.clone(); + pub async fn execute(&self) -> Result> { + let (settings, pipelines) = { + if let Some(settings) = self.settings.lock().take() { + let mut pipelines = vec![]; + std::mem::swap(&mut pipelines, self.pipelines.lock().as_mut()); + (settings, pipelines) + } else { + return Ok(self.query_handle.lock().clone().unwrap()); + } + }; - move || { - let _g = span.set_local_parent(); - executor.execute() - } + let executor = GlobalInstance::get::>(); + let query_handle = executor.submit(pipelines, settings).await?; + *self.query_handle.lock() = Some(query_handle.clone()); + Ok(query_handle) } } impl Drop for PipelineCompleteExecutor { fn drop(&mut self) { drop_guard(move || { + let _guard = ThreadTracker::tracking(self.tracking_payload.clone()); + self.finish(None); }) } diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 7fef8013bbb51..6c80572a555c8 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -21,6 +21,7 @@ use databend_common_base::base::WatchNotify; use databend_common_base::runtime::catch_unwind; use databend_common_base::runtime::defer; use databend_common_base::runtime::GlobalIORuntime; +use databend_common_base::runtime::TrackingPayload; use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -297,3 +298,127 @@ impl PipelineExecutor { } } } + +#[async_trait::async_trait] +pub trait QueryHandle: Send + Sync + 'static { + async fn wait(&self) -> Result<()>; + + fn is_finished(&self) -> bool; + + fn finish(&self, cause: Option>); + + fn fetch_profiling(&self, collect_metrics: bool) -> HashMap; +} + +#[async_trait::async_trait] +pub trait NewPipelineExecutor: Send + Sync + 'static { + async fn submit( + &self, + pipelines: Vec, + settings: ExecutorSettings, + ) -> Result>; +} + +pub struct QueryTask { + pub graph: Arc, + pub settings: ExecutorSettings, + pub on_init_callback: InitCallback, + pub on_finished_callback: FinishedCallbackChain, + pub holds: Vec>, + pub max_threads_num: usize, + pub tracking_payload: TrackingPayload, + pub tx: Option>>>, +} + +impl QueryTask { + fn pipelines_max_threads(pipelines: &[Pipeline]) -> usize { + pipelines + .iter() + .map(|x| x.get_max_threads()) + .max() + .unwrap_or(0) + } + + fn pipelines_on_init(pipelines: &mut [Pipeline]) -> InitCallback { + let mut on_init_functions = Vec::with_capacity(pipelines.len()); + + for pipeline in pipelines { + on_init_functions.push(pipeline.take_on_init()); + } + + Box::new(move || { + for on_init_function in on_init_functions { + on_init_function()?; + } + + Ok(()) + }) + } + + fn pipelines_on_finish(pipelines: &mut [Pipeline]) -> FinishedCallbackChain { + let mut finished_chain = FinishedCallbackChain::create(); + for pipeline in pipelines { + finished_chain.extend(pipeline.take_on_finished()); + } + + finished_chain + } + + fn pipelines_hold(pipelines: &mut [Pipeline]) -> Vec> { + let mut holds = Vec::with_capacity(pipelines.len()); + for pipeline in pipelines { + holds.extend(pipeline.take_lock_guards()); + } + + holds + } + + pub fn try_create( + mut pipelines: Vec, + tx: tokio::sync::oneshot::Sender>>, + settings: ExecutorSettings, + tracking_payload: TrackingPayload, + ) -> Result { + let on_init_callback = Self::pipelines_on_init(&mut pipelines); + let mut on_finished_callback = Self::pipelines_on_finish(&mut pipelines); + let holds = Self::pipelines_hold(&mut pipelines); + + if pipelines.is_empty() { + let cause = ErrorCode::Internal("Executor Pipelines is empty."); + let info = ExecutionInfo::create(Err(cause.clone()), HashMap::new()); + let _ignore = on_finished_callback.apply(info); + return Err(cause); + } + + let threads_num = Self::pipelines_max_threads(&pipelines); + + if threads_num == 0 { + let cause = ErrorCode::Internal("Pipeline max threads cannot equals zero."); + let info = ExecutionInfo::create(Err(cause.clone()), HashMap::new()); + let _ignore = on_finished_callback.apply(info); + return Err(cause); + } + + let query_id = settings.query_id.clone(); + + let running_graph = match RunningGraph::from_pipelines(pipelines, 1, query_id, None) { + Ok(running_graph) => running_graph, + Err(cause) => { + let info = ExecutionInfo::create(Err(cause.clone()), HashMap::new()); + let _ignore_res = on_finished_callback.apply(info); + return Err(cause); + } + }; + + Ok(QueryTask { + settings, + tx: Some(tx), + on_init_callback, + on_finished_callback, + holds, + graph: running_graph, + max_threads_num: threads_num, + tracking_payload, + }) + } +} diff --git a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs index a0d74844e975d..da15b27d447c2 100644 --- a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs @@ -12,17 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use std::sync::mpsc::Receiver; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::SyncSender; use std::sync::Arc; use std::time::Duration; +use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::drop_guard; +use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::MemStat; -use databend_common_base::runtime::Thread; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrackingPayload; use databend_common_exception::ErrorCode; @@ -33,72 +32,24 @@ use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_sinks::Sink; use databend_common_pipeline_sinks::Sinker; -use fastrace::func_path; -use fastrace::prelude::*; -use parking_lot::Condvar; use parking_lot::Mutex; +use crate::pipelines::executor::pipeline_executor::NewPipelineExecutor; use crate::pipelines::executor::ExecutorSettings; -use crate::pipelines::executor::PipelineExecutor; +use crate::pipelines::executor::QueryHandle; use crate::pipelines::processors::InputPort; use crate::pipelines::processors::ProcessorPtr; use crate::pipelines::PipelineBuildResult; -struct State { - is_finished: AtomicBool, - finish_mutex: Mutex, - finish_condvar: Condvar, - - catch_error: Mutex>, -} - -impl State { - pub fn create() -> Arc { - Arc::new(State { - catch_error: Mutex::new(None), - is_finished: AtomicBool::new(false), - finish_mutex: Mutex::new(false), - finish_condvar: Condvar::new(), - }) - } - - pub fn finished(&self, message: Result<()>) { - self.is_finished.store(true, Ordering::Release); - - if let Err(error) = message { - *self.catch_error.lock() = Some(error); - } - - { - let mut mutex = self.finish_mutex.lock(); - *mutex = true; - self.finish_condvar.notify_one(); - } - } - - pub fn wait_finish(&self) { - let mut mutex = self.finish_mutex.lock(); - - while !*mutex { - self.finish_condvar.wait(&mut mutex); - } - } - - pub fn is_finished(&self) -> bool { - self.is_finished.load(Ordering::Relaxed) - } - - pub fn try_get_catch_error(&self) -> Option { - self.catch_error.lock().as_ref().cloned() - } -} - // Use this executor when the pipeline is pulling pipeline (exists source but not exists sink) pub struct PipelinePullingExecutor { - state: Arc, - executor: Arc, receiver: Receiver, tracking_payload: TrackingPayload, + + pipelines: Mutex>, + settings: Mutex>, + + query_handle: Mutex>>, } impl PipelinePullingExecutor { @@ -111,24 +62,15 @@ impl PipelinePullingExecutor { tracking_payload } - fn wrap_pipeline( - pipeline: &mut Pipeline, - tx: SyncSender, - mem_stat: Arc, - ) -> Result<()> { + fn wrap_pipeline(pipeline: &mut Pipeline, tx: SyncSender) -> Result<()> { if pipeline.is_pushing_pipeline()? || !pipeline.is_pulling_pipeline()? { return Err(ErrorCode::Internal( "Logical error, PipelinePullingExecutor can only work on pulling pipeline.", )); } - pipeline.add_sink(|input| { - Ok(ProcessorPtr::create(PullingSink::create( - tx.clone(), - mem_stat.clone(), - input, - ))) - })?; + pipeline + .add_sink(|input| Ok(ProcessorPtr::create(PullingSink::create(tx.clone(), input))))?; pipeline.set_on_finished(move |_info: &ExecutionInfo| { drop(tx); @@ -147,18 +89,14 @@ impl PipelinePullingExecutor { let (sender, receiver) = std::sync::mpsc::sync_channel(pipeline.output_len()); - Self::wrap_pipeline( - &mut pipeline, - sender, - tracking_payload.mem_stat.clone().unwrap(), - )?; - let executor = PipelineExecutor::create(pipeline, settings)?; + Self::wrap_pipeline(&mut pipeline, sender)?; Ok(PipelinePullingExecutor { receiver, - executor: Arc::new(executor), - state: State::create(), tracking_payload, + pipelines: Mutex::new(vec![pipeline]), + settings: Mutex::new(Some(settings)), + query_handle: Mutex::new(None), }) } @@ -172,102 +110,88 @@ impl PipelinePullingExecutor { let mut main_pipeline = build_res.main_pipeline; let (sender, receiver) = std::sync::mpsc::sync_channel(main_pipeline.output_len()); - Self::wrap_pipeline( - &mut main_pipeline, - sender, - tracking_payload.mem_stat.clone().unwrap(), - )?; + Self::wrap_pipeline(&mut main_pipeline, sender)?; let mut pipelines = build_res.sources_pipelines; pipelines.push(main_pipeline); - let executor = PipelineExecutor::from_pipelines(pipelines, settings)?; + Ok(PipelinePullingExecutor { receiver, - state: State::create(), tracking_payload, - executor: Arc::new(executor), + pipelines: Mutex::new(pipelines), + settings: Mutex::new(Some(settings)), + query_handle: Mutex::new(None), }) } - #[fastrace::trace] - pub fn start(&mut self) { - let _guard = ThreadTracker::tracking(self.tracking_payload.clone()); - - let state = self.state.clone(); - let threads_executor = self.executor.clone(); - let thread_function = Self::thread_function(state, threads_executor); - #[allow(unused_mut)] - let mut thread_name = Some(String::from("PullingExecutor")); - - #[cfg(debug_assertions)] - { - // We need to pass the thread name in the unit test, because the thread name is the test name - if matches!(std::env::var("UNIT_TEST"), Ok(var_value) if var_value == "TRUE") { - if let Some(cur_thread_name) = std::thread::current().name() { - thread_name = Some(cur_thread_name.to_string()); - } + pub async fn start(&mut self) -> Result> { + let (settings, pipelines) = { + if let Some(settings) = self.settings.lock().take() { + let mut pipelines = vec![]; + std::mem::swap(&mut pipelines, self.pipelines.lock().as_mut()); + (settings, pipelines) + } else { + return Ok(self.query_handle.lock().clone().unwrap()); } - } - - Thread::named_spawn(thread_name, thread_function); - } - - pub fn get_inner(&self) -> Arc { - self.executor.clone() - } - - fn thread_function(state: Arc, executor: Arc) -> impl Fn() { - let span = Span::enter_with_local_parent(func_path!()); - move || { - let _g = span.set_local_parent(); - state.finished(executor.execute()); - } + }; + + let executor = GlobalInstance::get::>(); + let query_handle = executor.submit(pipelines, settings).await?; + *self.query_handle.lock() = Some(query_handle.clone()); + Ok(query_handle) + + // let _guard = ThreadTracker::tracking(self.tracking_payload.clone()); + // + // let state = self.state.clone(); + // let threads_executor = self.executor.clone(); + // let thread_function = Self::thread_function(state, threads_executor); + // #[allow(unused_mut)] + // let mut thread_name = Some(String::from("PullingExecutor")); + // + // #[cfg(debug_assertions)] + // { + // // We need to pass the thread name in the unit test, because the thread name is the test name + // if matches!(std::env::var("UNIT_TEST"), Ok(var_value) if var_value == "TRUE") { + // if let Some(cur_thread_name) = std::thread::current().name() { + // thread_name = Some(cur_thread_name.to_string()); + // } + // } + // } + // + // Thread::named_spawn(thread_name, thread_function); + } + + pub fn get_handle(&self) -> Arc { + self.query_handle.lock().clone().unwrap() } pub fn finish(&self, cause: Option) { - let _guard = ThreadTracker::tracking(self.tracking_payload.clone()); - - self.executor.finish(cause); + if let Some(handle) = self.query_handle.lock().as_ref() { + handle.finish(cause); + } } pub fn pull_data(&mut self) -> Result> { - let mut need_check_graph_status = false; + let query_handle = self.query_handle.lock().clone().unwrap(); loop { return match self.receiver.recv_timeout(Duration::from_millis(100)) { Ok(data_block) => Ok(Some(data_block)), Err(RecvTimeoutError::Timeout) => { - if self.state.is_finished() { - if let Some(error) = self.state.try_get_catch_error() { - return Err(error); - } - - // It may be parallel. Let's check again. - if !need_check_graph_status { - need_check_graph_status = true; - self.state.wait_finish(); - continue; - } - - return Err(ErrorCode::Internal(format!( - "Processor graph not completed. graph nodes state: {}", - self.executor.format_graph_nodes() - ))); + if !query_handle.is_finished() { + continue; } - continue; + GlobalIORuntime::instance().block_on(query_handle.wait())?; + Ok(None) } Err(RecvTimeoutError::Disconnected) => { - if !self.executor.is_finished() { - self.executor.finish::<()>(None); + if !query_handle.is_finished() { + query_handle.finish(None); } - self.state.wait_finish(); - - return match self.state.try_get_catch_error() { - None => Ok(None), - Some(error) => Err(error), - }; + GlobalIORuntime::instance().block_on(query_handle.wait())?; + Ok(None) } }; } @@ -286,19 +210,11 @@ impl Drop for PipelinePullingExecutor { struct PullingSink { sender: Option>, - query_execution_mem_stat: Arc, } impl PullingSink { - pub fn create( - tx: SyncSender, - mem_stat: Arc, - input: Arc, - ) -> Box { - Sinker::create(input, PullingSink { - sender: Some(tx), - query_execution_mem_stat: mem_stat, - }) + pub fn create(tx: SyncSender, input: Arc) -> Box { + Sinker::create(input, PullingSink { sender: Some(tx) }) } } @@ -315,8 +231,6 @@ impl Sink for PullingSink { // TODO: need moveout memory for plan tracker ThreadTracker::moveout_memory(memory_size); - self.query_execution_mem_stat.moveout_memory(memory_size); - if let Some(sender) = &self.sender { if let Err(cause) = sender.send(data_block) { return Err(ErrorCode::Internal(format!( diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 939dabfceda92..802caf8f4bcb7 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -66,6 +66,8 @@ pub struct QueryPipelineExecutor { finished_error: Mutex>, #[allow(unused)] lock_guards: Vec>, + + initialized: Mutex, } impl QueryPipelineExecutor { @@ -188,6 +190,7 @@ impl QueryPipelineExecutor { finished_error: Mutex::new(None), finished_notify: Arc::new(WatchNotify::new()), lock_guards, + initialized: Mutex::new(false), })) } @@ -230,10 +233,6 @@ impl QueryPipelineExecutor { #[fastrace::trace] pub fn execute(self: &Arc) -> Result<()> { - self.init(self.graph.clone())?; - - self.start_executor_daemon()?; - let mut thread_join_handles = self.execute_threads(self.threads_num); while let Some(join_handle) = thread_join_handles.pop() { @@ -271,63 +270,70 @@ impl QueryPipelineExecutor { Ok(()) } - fn init(self: &Arc, graph: Arc) -> Result<()> { - unsafe { - // TODO: the on init callback cannot be killed. - { - let instant = Instant::now(); - let mut guard = self.on_init_callback.lock(); - if let Some(callback) = guard.take() { - drop(guard); - - // untracking for on finished - let mut tracking_payload = ThreadTracker::new_tracking_payload(); - if let Some(mem_stat) = &tracking_payload.mem_stat { - tracking_payload.mem_stat = Some(MemStat::create_child( - String::from("Pipeline-on-finished"), - mem_stat.get_parent_memory_stat(), - )); - } + unsafe fn init_schedule(self: &Arc, graph: Arc) -> Result<()> { + let mut initialized_guard = self.initialized.lock(); - if let Err(cause) = Result::flatten(catch_unwind(move || { - let _guard = ThreadTracker::tracking(tracking_payload); + if *initialized_guard { + return Ok(()); + } - callback() - })) { - return Err(cause.add_message_back("(while in query pipeline init)")); - } + *initialized_guard = true; + + // TODO: the on init callback cannot be killed. + { + let instant = Instant::now(); + let mut guard = self.on_init_callback.lock(); + if let Some(callback) = guard.take() { + drop(guard); + + // untracking for on finished + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + if let Some(mem_stat) = &tracking_payload.mem_stat { + tracking_payload.mem_stat = Some(MemStat::create_child( + String::from("Pipeline-on-finished"), + mem_stat.get_parent_memory_stat(), + )); } - info!( - "Init pipeline successfully, query_id: {:?}, elapsed: {:?}", - self.settings.query_id, - instant.elapsed() - ); - } + if let Err(cause) = Result::flatten(catch_unwind(move || { + let _guard = ThreadTracker::tracking(tracking_payload); - let mut init_schedule_queue = graph.init_schedule_queue(self.threads_num)?; - - let mut wakeup_worker_id = 0; - while let Some(proc) = init_schedule_queue.async_queue.pop_front() { - ScheduleQueue::schedule_async_task( - proc.clone(), - self.settings.query_id.clone(), - self, - wakeup_worker_id, - self.workers_condvar.clone(), - self.global_tasks_queue.clone(), - ); - wakeup_worker_id += 1; - - if wakeup_worker_id == self.threads_num { - wakeup_worker_id = 0; + callback() + })) { + return Err(cause.add_message_back("(while in query pipeline init)")); } } - let sync_queue = std::mem::take(&mut init_schedule_queue.sync_queue); - self.global_tasks_queue.init_sync_tasks(sync_queue); - Ok(()) + info!( + "Init pipeline successfully, query_id: {:?}, elapsed: {:?}", + self.settings.query_id, + instant.elapsed() + ); + } + + let mut init_schedule_queue = unsafe { graph.init_schedule_queue(self.threads_num)? }; + + let mut wakeup_worker_id = 0; + while let Some(proc) = init_schedule_queue.async_queue.pop_front() { + ScheduleQueue::schedule_async_task( + proc.clone(), + self.settings.query_id.clone(), + &self.async_runtime, + wakeup_worker_id, + self.workers_condvar.clone(), + self.global_tasks_queue.clone(), + ); + wakeup_worker_id += 1; + + if wakeup_worker_id == self.threads_num { + wakeup_worker_id = 0; + } } + + let sync_queue = std::mem::take(&mut init_schedule_queue.sync_queue); + self.global_tasks_queue.init_sync_tasks(sync_queue); + + self.start_executor_daemon() } fn start_executor_daemon(self: &Arc) -> Result<()> { @@ -392,6 +398,8 @@ impl QueryPipelineExecutor { /// /// Method is thread unsafe and require thread safe call pub unsafe fn execute_single_thread(self: &Arc, thread_num: usize) -> Result<()> { + self.init_schedule(self.graph.clone())?; + let workers_condvar = self.workers_condvar.clone(); let mut context = ExecutorWorkerContext::create(thread_num, workers_condvar); @@ -415,7 +423,7 @@ impl QueryPipelineExecutor { schedule_queue.schedule( &self.global_tasks_queue, &mut context, - self, + &self.async_runtime, ); } Err(cause) => { diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs index b8285d3061c24..78fe636f1476e 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs @@ -129,8 +129,9 @@ impl TransformRecursiveCteSource { ctx.clear_runtime_filter(); let build_res = build_query_pipeline_without_render_result_set(&ctx, &plan).await?; let settings = ExecutorSettings::try_create(ctx.clone())?; - let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; - ctx.set_executor(pulling_executor.get_inner())?; + let mut pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + ctx.set_query_handle(pulling_executor.start().await?)?; + Ok(( PullingExecutorStream::create(pulling_executor)? .try_collect::>() diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 5bee2a3567800..74556c914483f 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -141,10 +141,10 @@ impl QueryExecutor for ServiceQueryExecutor { ) -> Result> { let build_res = build_query_pipeline_without_render_result_set(&self.ctx, plan).await?; let settings = ExecutorSettings::try_create(self.ctx.clone())?; - let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; - self.ctx.set_executor(pulling_executor.get_inner())?; + let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + self.ctx.set_query_handle(executor.start().await?)?; - PullingExecutorStream::create(pulling_executor)? + PullingExecutorStream::create(executor)? .try_collect::>() .await } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index d158ccf3c9b89..1af9334b3d628 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -26,7 +26,6 @@ use arrow_flight::FlightData; use async_channel::Receiver; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::GlobalIORuntime; -use databend_common_base::runtime::Thread; use databend_common_base::runtime::TrySpawn; use databend_common_base::JoinHandle; use databend_common_config::GlobalConfig; @@ -36,7 +35,6 @@ use databend_common_grpc::ConnectionFactory; use databend_common_pipeline_core::basic_callback; use databend_common_pipeline_core::ExecutionInfo; use databend_common_sql::executor::PhysicalPlan; -use fastrace::prelude::*; use log::warn; use parking_lot::Mutex; use parking_lot::ReentrantMutex; @@ -50,7 +48,7 @@ use super::exchange_params::ShuffleExchangeParams; use super::exchange_sink::ExchangeSink; use super::exchange_transform::ExchangeTransform; use super::statistics_receiver::StatisticsReceiver; -use super::statistics_sender::StatisticsSender; +use super::statistics_sender::DistributedQueryDaemon; use crate::clusters::ClusterHelper; use crate::clusters::FlightParams; use crate::pipelines::executor::ExecutorSettings; @@ -868,27 +866,26 @@ impl QueryCoordinator { let ctx = query_ctx.clone(); let (_, request_server_exchange) = request_server_exchanges.into_iter().next().unwrap(); - let mut statistics_sender = StatisticsSender::spawn( - &query_id, - ctx, - request_server_exchange, - executor.get_inner(), - ); - - let span = if let Some(parent) = SpanContext::current_local_parent() { - Span::root("Distributed-Executor", parent) - } else { - Span::noop() - }; - - Thread::named_spawn(Some(String::from("Distributed-Executor")), move || { - let _g = span.set_local_parent(); - let error = executor.execute().err(); - statistics_sender.shutdown(error.clone()); - query_ctx - .get_exchange_manager() - .on_finished_query(&query_id, error); - }); + let distributed_daemon = DistributedQueryDaemon::create(&query_id, ctx); + + distributed_daemon.run(request_server_exchange, executor); + // let mut statistics_sender = + // DistributedQueryDaemon::spawn(&query_id, ctx, request_server_exchange, executor); + + // let span = if let Some(parent) = SpanContext::current_local_parent() { + // Span::root("Distributed-Executor", parent) + // } else { + // Span::noop() + // }; + + // Thread::named_spawn(Some(String::from("Distributed-Executor")), move || { + // let _g = span.set_local_parent(); + // let error = executor.execute().err(); + // statistics_sender.shutdown(error.clone()); + // query_ctx + // .get_exchange_manager() + // .on_finished_query(&query_id, error); + // }); Ok(()) } diff --git a/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs b/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs index 33ea8c1f915b7..43d1c34612e5a 100644 --- a/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs +++ b/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs @@ -15,59 +15,88 @@ use std::sync::Arc; use std::time::Duration; -use async_channel::Sender; use databend_common_base::base::tokio::time::sleep; +use databend_common_base::runtime::defer; +use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::JoinHandle; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_storage::MutationStatus; use futures_util::future::Either; use log::warn; +use parking_lot::Mutex; -use crate::pipelines::executor::PipelineExecutor; +use crate::pipelines::executor::PipelineCompleteExecutor; +use crate::pipelines::executor::QueryHandle; +use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::servers::flight::v1::packets::DataPacket; use crate::servers::flight::v1::packets::ProgressInfo; use crate::servers::flight::FlightExchange; use crate::servers::flight::FlightSender; use crate::sessions::QueryContext; -pub struct StatisticsSender { - _spawner: Arc, - shutdown_flag_sender: Sender>, - join_handle: Option>, +pub struct DistributedQueryDaemon { + query_id: String, + ctx: Arc, } -impl StatisticsSender { - pub fn spawn( - query_id: &str, - ctx: Arc, - exchange: FlightExchange, - executor: Arc, - ) -> Self { - let spawner = ctx.clone(); - let tx = exchange.convert_to_sender(); - let (shutdown_flag_sender, shutdown_flag_receiver) = async_channel::bounded(1); - - let handle = spawner.spawn({ - let query_id = query_id.to_string(); - - async move { +impl DistributedQueryDaemon { + pub fn create(query_id: &str, ctx: Arc) -> Self { + DistributedQueryDaemon { + ctx, + query_id: query_id.to_string(), + } + } + + pub fn run(self, exchange: FlightExchange, executor: Arc) { + GlobalIORuntime::instance().spawn(async move { + let ctx = self.ctx.clone(); + let query_id = self.query_id.clone(); + let tx = exchange.convert_to_sender(); + + let statistics_handle = self.ctx.spawn(async move { + let shutdown_cause = Arc::new(Mutex::new(None)); + let _shutdown_guard = defer({ + let query_id = query_id.clone(); + let shutdown_cause = shutdown_cause.clone(); + + move || { + let exchange_manager = DataExchangeManager::instance(); + + let shutdown_cause = shutdown_cause.lock().take(); + exchange_manager.on_finished_query(&query_id, shutdown_cause); + } + }); + + if let Err(cause) = executor.execute().await { + *shutdown_cause.lock() = Some(cause.clone()); + + let data = DataPacket::ErrorCode(cause); + if let Err(error_code) = tx.send(data).await { + warn!( + "Cannot send data via flight exchange, cause: {:?}", + error_code + ); + } + + return; + } + + let query_handle = executor.get_handle(); + let mut cnt = 0; + let mut wait_shutdown = Box::pin(query_handle.wait()); let mut sleep_future = Box::pin(sleep(Duration::from_millis(100))); - let mut notified = Box::pin(shutdown_flag_receiver.recv()); loop { - match futures::future::select(sleep_future, notified).await { - Either::Right((Err(_), _)) => { - break; - } - Either::Right((Ok(None), _)) => { + match futures::future::select(sleep_future, wait_shutdown).await { + Either::Right((Ok(_), _)) => { + // query completed. break; } - Either::Right((Ok(Some(error_code)), _recv)) => { - let data = DataPacket::ErrorCode(error_code); + Either::Right((Err(cause), _)) => { + *shutdown_cause.lock() = Some(cause.clone()); + let data = DataPacket::ErrorCode(cause); if let Err(error_code) = tx.send(data).await { warn!( "Cannot send data via flight exchange, cause: {:?}", @@ -78,7 +107,7 @@ impl StatisticsSender { return; } Either::Left((_, right)) => { - notified = right; + wait_shutdown = right; sleep_future = Box::pin(sleep(Duration::from_millis(100))); if let Err(cause) = Self::send_progress(&ctx, &tx).await { @@ -91,7 +120,8 @@ impl StatisticsSender { if cnt % 5 == 0 { // send profiles per 500 millis - if let Err(error) = Self::send_profile(&executor, &tx, false).await + if let Err(error) = + Self::send_profile(&query_handle, &tx, false).await { warn!("Profiles send has error, cause: {:?}.", error); } @@ -100,7 +130,7 @@ impl StatisticsSender { } } - if let Err(error) = Self::send_profile(&executor, &tx, true).await { + if let Err(error) = Self::send_profile(&query_handle, &tx, true).await { warn!("Profiles send has error, cause: {:?}.", error); } @@ -115,32 +145,15 @@ impl StatisticsSender { if let Err(error) = Self::send_progress(&ctx, &tx).await { warn!("Statistics send has error, cause: {:?}.", error); } - } - }); + }); - StatisticsSender { - _spawner: spawner, - shutdown_flag_sender, - join_handle: Some(handle), - } - } - - pub fn shutdown(&mut self, error: Option) { - let shutdown_flag_sender = self.shutdown_flag_sender.clone(); - - let join_handle = self.join_handle.take(); - futures::executor::block_on(async move { - if let Err(error_code) = shutdown_flag_sender.send(error).await { + if let Err(cause) = statistics_handle.await { warn!( - "Cannot send data via flight exchange, cause: {:?}", - error_code + "Distributed query statistics handle abort. cause: {:?}", + cause ); - } - - shutdown_flag_sender.close(); - - if let Some(join_handle) = join_handle { - let _ = join_handle.await; + let exchange_manager = DataExchangeManager::instance(); + exchange_manager.on_finished_query(&self.query_id, Some(cause)); } }); } @@ -182,11 +195,11 @@ impl StatisticsSender { } async fn send_profile( - executor: &PipelineExecutor, + query_handle: &Arc, tx: &FlightSender, collect_metrics: bool, ) -> Result<()> { - let plans_profile = executor.fetch_profiling(collect_metrics); + let plans_profile = query_handle.fetch_profiling(collect_metrics); if !plans_profile.is_empty() { let data_packet = DataPacket::QueryProfiles(plans_profile); @@ -236,8 +249,4 @@ impl StatisticsSender { } progress_info } - - // fn fetch_profiling(ctx: &Arc) -> Result> { - // // ctx.get_exchange_manager() - // } } diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 14741394c1bb5..dd101ea1104d0 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -305,7 +305,15 @@ impl Executor { if e.code() != ErrorCode::CLOSED_QUERY { r.session.txn_mgr().lock().set_fail(); } - r.session.force_kill_query(e.clone()); + + r.session.force_kill_query(ErrorCode::create( + e.code(), + e.name(), + e.display_text(), + e.detail(), + None, + e.backtrace(), + )); } ExecuteStopped { stats: Progresses::from_context(&r.ctx), diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index b06d45b0f0841..5453d9c5b552f 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -126,7 +126,7 @@ use crate::catalogs::Catalog; use crate::clusters::Cluster; use crate::clusters::ClusterHelper; use crate::locks::LockManager; -use crate::pipelines::executor::PipelineExecutor; +use crate::pipelines::executor::QueryHandle; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::sessions::query_affect::QueryAffect; use crate::sessions::ProcessInfo; @@ -326,8 +326,8 @@ impl QueryContext { *self.shared.init_query_id.write() = id; } - pub fn set_executor(&self, weak_ptr: Arc) -> Result<()> { - self.shared.set_executor(weak_ptr) + pub fn set_query_handle(&self, handle: Arc) -> Result<()> { + self.shared.set_query_handle(handle) } pub fn attach_stage(&self, attachment: StageAttachment) { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index af2324b508239..7a4ea3d572706 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -61,7 +61,7 @@ use uuid::Uuid; use crate::clusters::Cluster; use crate::clusters::ClusterDiscovery; -use crate::pipelines::executor::PipelineExecutor; +use crate::pipelines::executor::QueryHandle; use crate::sessions::query_affect::QueryAffect; use crate::sessions::Session; use crate::storages::Table; @@ -105,7 +105,7 @@ pub struct QueryContextShared { pub(in crate::sessions) affect: Arc>>, pub(in crate::sessions) catalog_manager: Arc, pub(in crate::sessions) data_operator: DataOperator, - pub(in crate::sessions) executor: Arc>>, + pub(in crate::sessions) query_handle: Arc>>>, pub(in crate::sessions) stage_attachment: Arc>>, pub(in crate::sessions) created_time: SystemTime, // now it is only set in query_log::log_query_finished @@ -179,7 +179,7 @@ impl QueryContextShared { tables_refs: Arc::new(Mutex::new(HashMap::new())), streams_refs: Default::default(), affect: Arc::new(Mutex::new(None)), - executor: Arc::new(RwLock::new(Weak::new())), + query_handle: Arc::new(RwLock::new(None)), stage_attachment: Arc::new(RwLock::new(None)), created_time: SystemTime::now(), finish_time: Default::default(), @@ -255,11 +255,15 @@ impl QueryContextShared { *guard = Some(mode); } - pub fn kill(&self, cause: ErrorCode) { + pub fn kill(&self, cause: ErrorCode) { self.set_error(cause.clone()); - if let Some(executor) = self.executor.read().upgrade() { - executor.finish(Some(cause)); + let query_handle = self.query_handle.read(); + + if let Some(query_handle) = query_handle.as_ref() { + if let Some(query_handle) = query_handle.upgrade() { + query_handle.finish(Some(cause)); + } } self.aborting.store(true, Ordering::Release); @@ -604,15 +608,24 @@ impl QueryContextShared { *guard = Some(affect); } - pub fn set_executor(&self, executor: Arc) -> Result<()> { - let mut guard = self.executor.write(); + pub fn set_query_handle(&self, handle: Arc) -> Result<()> { match self.check_aborting() { Ok(_) => { - *guard = Arc::downgrade(&executor); + let mut guard = self.query_handle.write(); + *guard = Some(Arc::downgrade(&handle)); Ok(()) } Err(err) => { - executor.finish(Some(err.clone())); + let error_code = ErrorCode::create( + err.code(), + err.name(), + err.display_text(), + err.detail(), + None, + err.backtrace(), + ); + + handle.finish(Some(error_code)); Err(err.with_context("failed to set executor")) } } @@ -646,15 +659,17 @@ impl QueryContextShared { &self.query_cache_metrics } - pub fn set_priority(&self, priority: u8) { - if let Some(executor) = self.executor.read().upgrade() { - executor.change_priority(priority) - } + pub fn set_priority(&self, _priority: u8) { + unimplemented!() } pub fn get_query_profiles(&self) -> Vec { - if let Some(executor) = self.executor.read().upgrade() { - self.add_query_profiles(&executor.fetch_profiling(false)); + let query_handle = self.query_handle.read(); + + if let Some(query_handle) = query_handle.as_ref() { + if let Some(query_handle) = query_handle.upgrade() { + self.add_query_profiles(&query_handle.fetch_profiling(false)); + } } self.query_profiles.read().values().cloned().collect() diff --git a/src/query/service/src/sessions/session.rs b/src/query/service/src/sessions/session.rs index b154c3ee91c37..4f8961acf0f97 100644 --- a/src/query/service/src/sessions/session.rs +++ b/src/query/service/src/sessions/session.rs @@ -136,7 +136,7 @@ impl Session { self.kill(/* shutdown io stream */); } - pub fn force_kill_query(&self, cause: ErrorCode) { + pub fn force_kill_query(&self, cause: ErrorCode) { if let Some(context_shared) = self.session_ctx.get_query_context_shared() { context_shared.kill(cause); } diff --git a/src/query/service/src/stream/processor_executor_stream.rs b/src/query/service/src/stream/processor_executor_stream.rs index bfa4a01300ae3..a8bb2cd7a62ed 100644 --- a/src/query/service/src/stream/processor_executor_stream.rs +++ b/src/query/service/src/stream/processor_executor_stream.rs @@ -28,8 +28,7 @@ pub struct PullingExecutorStream { } impl PullingExecutorStream { - pub fn create(mut executor: PipelinePullingExecutor) -> Result { - executor.start(); + pub fn create(executor: PipelinePullingExecutor) -> Result { Ok(Self { end_of_stream: false, executor, diff --git a/src/query/service/src/stream/table_read_block_stream.rs b/src/query/service/src/stream/table_read_block_stream.rs index beefdd5bfb2de..ade2dbd7c28b5 100644 --- a/src/query/service/src/stream/table_read_block_stream.rs +++ b/src/query/service/src/stream/table_read_block_stream.rs @@ -50,8 +50,8 @@ impl ReadDataBlockStream for T { let settings = ctx.get_settings(); pipeline.set_max_threads(settings.get_max_threads()? as usize); let executor_settings = ExecutorSettings::try_create(ctx.clone())?; - let executor = PipelinePullingExecutor::try_create(pipeline, executor_settings)?; - ctx.set_executor(executor.get_inner())?; + let mut executor = PipelinePullingExecutor::try_create(pipeline, executor_settings)?; + ctx.set_query_handle(executor.start().await?)?; Ok(Box::pin(PullingExecutorStream::create(executor)?)) } } diff --git a/src/query/service/src/test_kits/context.rs b/src/query/service/src/test_kits/context.rs index f95ea84e9094c..be6c61b018d36 100644 --- a/src/query/service/src/test_kits/context.rs +++ b/src/query/service/src/test_kits/context.rs @@ -43,12 +43,13 @@ pub async fn execute_command(ctx: Arc, query: &str) -> Result<()> Ok(()) } -pub fn execute_pipeline(ctx: Arc, mut res: PipelineBuildResult) -> Result<()> { +pub async fn execute_pipeline(ctx: Arc, mut res: PipelineBuildResult) -> Result<()> { let executor_settings = ExecutorSettings::try_create(ctx.clone())?; res.set_max_threads(ctx.get_settings().get_max_threads()? as usize); let mut pipelines = res.sources_pipelines; pipelines.push(res.main_pipeline); let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; - ctx.set_executor(executor.get_inner())?; - executor.execute() + let query_handle = executor.execute().await?; + ctx.set_query_handle(executor.get_handle())?; + query_handle.wait().await } diff --git a/src/query/service/src/test_kits/fixture.rs b/src/query/service/src/test_kits/fixture.rs index 8cbfe09776d96..14d7b0185537c 100644 --- a/src/query/service/src/test_kits/fixture.rs +++ b/src/query/service/src/test_kits/fixture.rs @@ -858,7 +858,7 @@ impl TestFixture { .add_sink(|input| Ok(ProcessorPtr::create(EmptySink::create(input))))?; } - execute_pipeline(ctx, build_res) + execute_pipeline(ctx, build_res).await } pub async fn execute_command(&self, query: &str) -> Result<()> { diff --git a/src/query/service/tests/it/pipelines/executor/executor_graph.rs b/src/query/service/tests/it/pipelines/executor/executor_graph.rs index 2efe0cfbcf080..70548d4b33bfd 100644 --- a/src/query/service/tests/it/pipelines/executor/executor_graph.rs +++ b/src/query/service/tests/it/pipelines/executor/executor_graph.rs @@ -351,7 +351,11 @@ async fn test_schedule_with_one_tasks() -> Result<()> { let init_queue = unsafe { graph.clone().init_schedule_queue(0)? }; assert_eq!(init_queue.sync_queue.len(), 1); - init_queue.schedule(&executor.global_tasks_queue, &mut context, &executor); + init_queue.schedule( + &executor.global_tasks_queue, + &mut context, + &executor.async_runtime, + ); assert!(context.has_task()); assert_eq!( format!("{:?}", context.take_task()), @@ -374,7 +378,11 @@ async fn test_schedule_with_two_tasks() -> Result<()> { let init_queue = unsafe { graph.clone().init_schedule_queue(0)? }; assert_eq!(init_queue.sync_queue.len(), 2); - init_queue.schedule(&executor.global_tasks_queue, &mut context, &executor); + init_queue.schedule( + &executor.global_tasks_queue, + &mut context, + &executor.async_runtime, + ); assert!(context.has_task()); assert_eq!( format!("{:?}", context.take_task()), diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 4af069452d491..99779c55fd5db 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -150,8 +150,9 @@ async fn do_compact(ctx: Arc, table: Arc) -> Result, table: Arc) -> Result let mut executor_settings = ExecutorSettings::try_create(ctx.clone())?; executor_settings.enable_queries_executor = false; let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?; - ctx.set_executor(executor.get_inner())?; - executor.execute()?; + let query_handle = executor.execute().await?; + ctx.set_query_handle(executor.get_handle())?; + query_handle.wait().await?; } Ok(()) }