Skip to content

Commit c1396dc

Browse files
committed
fix(processor): try fix test failure
1 parent 6f76776 commit c1396dc

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

src/query/service/src/api/rpc/exchange/exchange_manager.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,8 @@ impl FragmentCoordinator {
668668

669669
match &self.payload {
670670
FragmentPayload::PlanV2(plan) => {
671-
let pipeline_builder = PipelineBuilderV2::create(ctx);
671+
let new_ctx = QueryContext::create_from(ctx);
672+
let pipeline_builder = PipelineBuilderV2::create(new_ctx);
672673
self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?);
673674
}
674675
};

src/query/service/src/sessions/query_ctx.rs

+2-9
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ pub struct QueryContext {
6969
version: String,
7070
partition_queue: Arc<RwLock<VecDeque<PartInfoPtr>>>,
7171
shared: Arc<QueryContextShared>,
72-
precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
7372
fragment_id: Arc<AtomicUsize>,
7473
}
7574

@@ -85,7 +84,6 @@ impl QueryContext {
8584
partition_queue: Arc::new(RwLock::new(VecDeque::new())),
8685
version: format!("DatabendQuery {}", *crate::version::DATABEND_COMMIT_VERSION),
8786
shared,
88-
precommit_blocks: Arc::new(RwLock::new(Vec::new())),
8987
fragment_id: Arc::new(AtomicUsize::new(0)),
9088
})
9189
}
@@ -334,15 +332,10 @@ impl TableContext for QueryContext {
334332
self.shared.dal_ctx.as_ref()
335333
}
336334
fn push_precommit_block(&self, block: DataBlock) {
337-
let mut blocks = self.precommit_blocks.write();
338-
blocks.push(block);
335+
self.shared.push_precommit_block(block)
339336
}
340337
fn consume_precommit_blocks(&self) -> Vec<DataBlock> {
341-
let mut blocks = self.precommit_blocks.write();
342-
343-
let mut swaped_precommit_blocks = vec![];
344-
std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks);
345-
swaped_precommit_blocks
338+
self.shared.consume_precommit_blocks()
346339
}
347340
fn try_get_function_context(&self) -> Result<FunctionContext> {
348341
let tz = self.get_settings().get_timezone()?;

src/query/service/src/sessions/query_ctx_shared.rs

+16
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::Weak;
2222
use common_base::base::Progress;
2323
use common_base::base::Runtime;
2424
use common_contexts::DalContext;
25+
use common_datablocks::DataBlock;
2526
use common_exception::ErrorCode;
2627
use common_exception::Result;
2728
use common_meta_types::UserInfo;
@@ -79,6 +80,7 @@ pub struct QueryContextShared {
7980
pub(in crate::sessions) catalog_manager: Arc<CatalogManager>,
8081
pub(in crate::sessions) storage_operator: StorageOperator,
8182
pub(in crate::sessions) executor: Arc<RwLock<Weak<PipelineExecutor>>>,
83+
pub(in crate::sessions) precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
8284
}
8385

8486
impl QueryContextShared {
@@ -108,6 +110,7 @@ impl QueryContextShared {
108110
auth_manager: AuthMgr::create(config).await?,
109111
affect: Arc::new(Mutex::new(None)),
110112
executor: Arc::new(RwLock::new(Weak::new())),
113+
precommit_blocks: Arc::new(RwLock::new(vec![])),
111114
}))
112115
}
113116

@@ -297,4 +300,17 @@ impl QueryContextShared {
297300
let mut executor = self.executor.write();
298301
*executor = weak_ptr;
299302
}
303+
304+
pub fn push_precommit_block(&self, block: DataBlock) {
305+
let mut blocks = self.precommit_blocks.write();
306+
blocks.push(block);
307+
}
308+
309+
pub fn consume_precommit_blocks(&self) -> Vec<DataBlock> {
310+
let mut blocks = self.precommit_blocks.write();
311+
312+
let mut swaped_precommit_blocks = vec![];
313+
std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks);
314+
swaped_precommit_blocks
315+
}
300316
}

0 commit comments

Comments
 (0)