Skip to content

Commit 3fd0b72

Browse files
authored
Merge pull request #7867 from zhang2014/feat/ISSUE-7861
feat(cluster): experimental distributed eval index
2 parents edcc1bf + 9f3cda0 commit 3fd0b72

File tree

31 files changed

+436
-260
lines changed

31 files changed

+436
-260
lines changed

scripts/ci/ci-run-stateful-tests-cluster-s3.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,4 @@ SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)"
2727
cd "$SCRIPT_PATH/../../tests" || exit
2828

2929
echo "Starting databend-test"
30-
./databend-test --mode 'cluster' --run-dir 1_stateful
30+
./databend-test --mode 'cluster' --run-dir 1_stateful --skip '02_0001_create_table_with_external_location'

src/query/catalog/src/table_context.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use common_datablocks::DataBlock;
2525
use common_exception::Result;
2626
use common_functions::scalars::FunctionContext;
2727
use common_io::prelude::FormatSettings;
28+
use common_legacy_planners::PartInfoPtr;
2829
use common_legacy_planners::Partitions;
2930
use common_legacy_planners::ReadDataSourcePlan;
3031
use common_meta_types::UserInfo;
@@ -64,9 +65,7 @@ pub trait TableContext: Send + Sync {
6465
fn get_write_progress_value(&self) -> ProgressValues;
6566
fn get_result_progress(&self) -> Arc<Progress>;
6667
fn get_result_progress_value(&self) -> ProgressValues;
67-
// Steal n partitions from the partition pool by the pipeline worker.
68-
// This also can steal the partitions from distributed node.
69-
fn try_get_partitions(&self, num: u64) -> Result<Partitions>;
68+
fn try_get_part(&self) -> Option<PartInfoPtr>;
7069
// Update the context partition pool from the pipeline builder.
7170
fn try_set_partitions(&self, partitions: Partitions) -> Result<()>;
7271
fn attach_query_str(&self, kind: String, query: &str);

src/query/pipeline/core/src/pipeline.rs

+26
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,12 @@ use crate::TransformPipeBuilder;
4545
pub struct Pipeline {
4646
max_threads: usize,
4747
pub pipes: Vec<Pipe>,
48+
on_init: Option<InitCallback>,
4849
on_finished: Option<FinishedCallback>,
4950
}
5051

52+
pub type InitCallback = Arc<Box<dyn Fn() -> Result<()> + Send + Sync + 'static>>;
53+
5154
pub type FinishedCallback =
5255
Arc<Box<dyn Fn(&Option<ErrorCode>) -> Result<()> + Send + Sync + 'static>>;
5356

@@ -56,6 +59,7 @@ impl Pipeline {
5659
Pipeline {
5760
max_threads: 0,
5861
pipes: Vec::new(),
62+
on_init: None,
5963
on_finished: None,
6064
}
6165
}
@@ -159,6 +163,21 @@ impl Pipeline {
159163
}
160164
}
161165

166+
pub fn set_on_init<F: Fn() -> Result<()> + Send + Sync + 'static>(&mut self, f: F) {
167+
if let Some(on_init) = &self.on_init {
168+
let old_on_init = on_init.clone();
169+
170+
self.on_init = Some(Arc::new(Box::new(move || {
171+
old_on_init()?;
172+
f()
173+
})));
174+
175+
return;
176+
}
177+
178+
self.on_init = Some(Arc::new(Box::new(f)));
179+
}
180+
162181
pub fn set_on_finished<F: Fn(&Option<ErrorCode>) -> Result<()> + Send + Sync + 'static>(
163182
&mut self,
164183
f: F,
@@ -177,6 +196,13 @@ impl Pipeline {
177196
self.on_finished = Some(Arc::new(Box::new(f)));
178197
}
179198

199+
pub fn take_on_init(&mut self) -> InitCallback {
200+
match self.on_init.take() {
201+
None => Arc::new(Box::new(|| Ok(()))),
202+
Some(on_init) => on_init,
203+
}
204+
}
205+
180206
pub fn take_on_finished(&mut self) -> FinishedCallback {
181207
match self.on_finished.take() {
182208
None => Arc::new(Box::new(|_may_error| Ok(()))),

src/query/service/src/api/rpc/exchange/exchange_manager.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,8 @@ impl FragmentCoordinator {
668668

669669
match &self.payload {
670670
FragmentPayload::PlanV2(plan) => {
671-
let pipeline_builder = PipelineBuilderV2::create(ctx);
671+
let pipeline_ctx = QueryContext::create_from(ctx);
672+
let pipeline_builder = PipelineBuilderV2::create(pipeline_ctx);
672673
self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?);
673674
}
674675
};

src/query/service/src/interpreters/interpreter_insert_v2.rs

-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ impl Interpreter for InsertInterpreterV2 {
164164
_ => unreachable!(),
165165
};
166166

167-
table1.get_table_info();
168167
let catalog = self.plan.catalog.clone();
169168
let is_distributed_plan = select_plan.is_distributed_plan();
170169

src/query/service/src/pipelines/executor/pipeline_complete_executor.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_base::base::Thread;
1718
use common_exception::ErrorCode;
1819
use common_exception::Result;
1920

@@ -67,7 +68,13 @@ impl PipelineCompleteExecutor {
6768
}
6869

6970
pub fn execute(&self) -> Result<()> {
70-
self.executor.execute()
71+
let executor = self.executor.clone();
72+
let execute_thread =
73+
Thread::named_spawn(Some(String::from("CompleteExecutor")), move || {
74+
executor.execute()
75+
});
76+
77+
execute_thread.join().flatten()
7178
}
7279
}
7380

src/query/service/src/pipelines/executor/pipeline_executor.rs

+52-23
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use crate::pipelines::executor::executor_worker_context::ExecutorWorkerContext;
3636
use crate::pipelines::executor::ExecutorSettings;
3737
use crate::pipelines::pipeline::Pipeline;
3838

39+
pub type InitCallback = Arc<Box<dyn Fn() -> Result<()> + Send + Sync + 'static>>;
40+
3941
pub type FinishedCallback =
4042
Arc<Box<dyn Fn(&Option<ErrorCode>) -> Result<()> + Send + Sync + 'static>>;
4143

@@ -45,6 +47,7 @@ pub struct PipelineExecutor {
4547
workers_condvar: Arc<WorkersCondvar>,
4648
pub async_runtime: Arc<Runtime>,
4749
pub global_tasks_queue: Arc<ExecutorTasksQueue>,
50+
on_init_callback: InitCallback,
4851
on_finished_callback: FinishedCallback,
4952
settings: ExecutorSettings,
5053
finished_notify: Notify,
@@ -57,12 +60,14 @@ impl PipelineExecutor {
5760
settings: ExecutorSettings,
5861
) -> Result<Arc<PipelineExecutor>> {
5962
let threads_num = pipeline.get_max_threads();
63+
let on_init_callback = pipeline.take_on_init();
6064
let on_finished_callback = pipeline.take_on_finished();
6165

6266
assert_ne!(threads_num, 0, "Pipeline max threads cannot equals zero.");
6367
Self::try_create(
6468
RunningGraph::create(pipeline)?,
6569
threads_num,
70+
on_init_callback,
6671
on_finished_callback,
6772
settings,
6873
)
@@ -82,6 +87,11 @@ impl PipelineExecutor {
8287
.max()
8388
.unwrap_or(0);
8489

90+
let on_init_callbacks = pipelines
91+
.iter_mut()
92+
.map(|x| x.take_on_init())
93+
.collect::<Vec<_>>();
94+
8595
let on_finished_callbacks = pipelines
8696
.iter_mut()
8797
.map(|x| x.take_on_finished())
@@ -91,6 +101,13 @@ impl PipelineExecutor {
91101
Self::try_create(
92102
RunningGraph::from_pipelines(pipelines)?,
93103
threads_num,
104+
Arc::new(Box::new(move || {
105+
for on_init_callback in &on_init_callbacks {
106+
on_init_callback()?;
107+
}
108+
109+
Ok(())
110+
})),
94111
Arc::new(Box::new(move |may_error| {
95112
for on_finished_callback in &on_finished_callbacks {
96113
on_finished_callback(may_error)?;
@@ -105,33 +122,25 @@ impl PipelineExecutor {
105122
fn try_create(
106123
graph: RunningGraph,
107124
threads_num: usize,
125+
on_init_callback: InitCallback,
108126
on_finished_callback: FinishedCallback,
109127
settings: ExecutorSettings,
110128
) -> Result<Arc<PipelineExecutor>> {
111-
unsafe {
112-
let workers_condvar = WorkersCondvar::create(threads_num);
113-
let global_tasks_queue = ExecutorTasksQueue::create(threads_num);
114-
115-
let mut init_schedule_queue = graph.init_schedule_queue()?;
129+
let workers_condvar = WorkersCondvar::create(threads_num);
130+
let global_tasks_queue = ExecutorTasksQueue::create(threads_num);
116131

117-
let mut tasks = VecDeque::new();
118-
while let Some(task) = init_schedule_queue.pop_task() {
119-
tasks.push_back(task);
120-
}
121-
global_tasks_queue.init_tasks(tasks);
122-
123-
Ok(Arc::new(PipelineExecutor {
124-
graph,
125-
threads_num,
126-
workers_condvar,
127-
global_tasks_queue,
128-
on_finished_callback,
129-
async_runtime: GlobalIORuntime::instance(),
130-
settings,
131-
finished_notify: Notify::new(),
132-
finished_error: Mutex::new(None),
133-
}))
134-
}
132+
Ok(Arc::new(PipelineExecutor {
133+
graph,
134+
threads_num,
135+
workers_condvar,
136+
global_tasks_queue,
137+
on_init_callback,
138+
on_finished_callback,
139+
async_runtime: GlobalIORuntime::instance(),
140+
settings,
141+
finished_notify: Notify::new(),
142+
finished_error: Mutex::new(None),
143+
}))
135144
}
136145

137146
pub fn finish(&self, cause: Option<ErrorCode>) {
@@ -145,6 +154,8 @@ impl PipelineExecutor {
145154
}
146155

147156
pub fn execute(self: &Arc<Self>) -> Result<()> {
157+
self.init()?;
158+
148159
self.start_executor_daemon()?;
149160

150161
let mut thread_join_handles = self.execute_threads(self.threads_num);
@@ -171,6 +182,24 @@ impl PipelineExecutor {
171182
Ok(())
172183
}
173184

185+
fn init(self: &Arc<Self>) -> Result<()> {
186+
unsafe {
187+
// TODO: the on init callback cannot be killed.
188+
(self.on_init_callback)()?;
189+
190+
let mut init_schedule_queue = self.graph.init_schedule_queue()?;
191+
192+
let mut tasks = VecDeque::new();
193+
while let Some(task) = init_schedule_queue.pop_task() {
194+
tasks.push_back(task);
195+
}
196+
197+
self.global_tasks_queue.init_tasks(tasks);
198+
199+
Ok(())
200+
}
201+
}
202+
174203
fn start_executor_daemon(self: &Arc<Self>) -> Result<()> {
175204
if !self.settings.max_execute_time.is_zero() {
176205
let this = self.clone();

src/query/service/src/sessions/query_ctx.rs

+6-22
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ pub struct QueryContext {
6868
version: String,
6969
partition_queue: Arc<RwLock<VecDeque<PartInfoPtr>>>,
7070
shared: Arc<QueryContextShared>,
71-
precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
7271
fragment_id: Arc<AtomicUsize>,
7372
}
7473

@@ -84,7 +83,6 @@ impl QueryContext {
8483
partition_queue: Arc::new(RwLock::new(VecDeque::new())),
8584
version: format!("DatabendQuery {}", *crate::version::DATABEND_COMMIT_VERSION),
8685
shared,
87-
precommit_blocks: Arc::new(RwLock::new(Vec::new())),
8886
fragment_id: Arc::new(AtomicUsize::new(0)),
8987
})
9088
}
@@ -225,20 +223,11 @@ impl TableContext for QueryContext {
225223
fn get_result_progress_value(&self) -> ProgressValues {
226224
self.shared.result_progress.as_ref().get_values()
227225
}
228-
// Steal n partitions from the partition pool by the pipeline worker.
229-
// This also can steal the partitions from distributed node.
230-
fn try_get_partitions(&self, num: u64) -> Result<Partitions> {
231-
let mut partitions = vec![];
232-
for _ in 0..num {
233-
match self.partition_queue.write().pop_back() {
234-
None => break,
235-
Some(partition) => {
236-
partitions.push(partition);
237-
}
238-
}
239-
}
240-
Ok(partitions)
226+
227+
fn try_get_part(&self) -> Option<PartInfoPtr> {
228+
self.partition_queue.write().pop_front()
241229
}
230+
242231
// Update the context partition pool from the pipeline builder.
243232
fn try_set_partitions(&self, partitions: Partitions) -> Result<()> {
244233
let mut partition_queue = self.partition_queue.write();
@@ -328,15 +317,10 @@ impl TableContext for QueryContext {
328317
self.shared.dal_ctx.as_ref()
329318
}
330319
fn push_precommit_block(&self, block: DataBlock) {
331-
let mut blocks = self.precommit_blocks.write();
332-
blocks.push(block);
320+
self.shared.push_precommit_block(block)
333321
}
334322
fn consume_precommit_blocks(&self) -> Vec<DataBlock> {
335-
let mut blocks = self.precommit_blocks.write();
336-
337-
let mut swaped_precommit_blocks = vec![];
338-
std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks);
339-
swaped_precommit_blocks
323+
self.shared.consume_precommit_blocks()
340324
}
341325
fn try_get_function_context(&self) -> Result<FunctionContext> {
342326
let tz = self.get_settings().get_timezone()?;

src/query/service/src/sessions/query_ctx_shared.rs

+16
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::Weak;
2222
use common_base::base::Progress;
2323
use common_base::base::Runtime;
2424
use common_contexts::DalContext;
25+
use common_datablocks::DataBlock;
2526
use common_exception::ErrorCode;
2627
use common_exception::Result;
2728
use common_meta_types::UserInfo;
@@ -79,6 +80,7 @@ pub struct QueryContextShared {
7980
pub(in crate::sessions) catalog_manager: Arc<CatalogManager>,
8081
pub(in crate::sessions) storage_operator: StorageOperator,
8182
pub(in crate::sessions) executor: Arc<RwLock<Weak<PipelineExecutor>>>,
83+
pub(in crate::sessions) precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
8284
}
8385

8486
impl QueryContextShared {
@@ -108,6 +110,7 @@ impl QueryContextShared {
108110
auth_manager: AuthMgr::create(config).await?,
109111
affect: Arc::new(Mutex::new(None)),
110112
executor: Arc::new(RwLock::new(Weak::new())),
113+
precommit_blocks: Arc::new(RwLock::new(vec![])),
111114
}))
112115
}
113116

@@ -297,4 +300,17 @@ impl QueryContextShared {
297300
let mut executor = self.executor.write();
298301
*executor = weak_ptr;
299302
}
303+
304+
pub fn push_precommit_block(&self, block: DataBlock) {
305+
let mut blocks = self.precommit_blocks.write();
306+
blocks.push(block);
307+
}
308+
309+
pub fn consume_precommit_blocks(&self) -> Vec<DataBlock> {
310+
let mut blocks = self.precommit_blocks.write();
311+
312+
let mut swaped_precommit_blocks = vec![];
313+
std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks);
314+
swaped_precommit_blocks
315+
}
300316
}

0 commit comments

Comments
 (0)