Skip to content

Commit ebab765

Browse files
committed
fix
1 parent fb6d626 commit ebab765

File tree

10 files changed

+43
-54
lines changed

10 files changed

+43
-54
lines changed

src/query/service/tests/it/storages/fuse/statistics.rs

-3
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,6 @@ async fn test_ft_cluster_stats_with_stats() -> databend_common_exception::Result
361361
0,
362362
block_compactor,
363363
vec![],
364-
vec![],
365364
FunctionContext::default(),
366365
);
367366
let stats = stats_gen.gen_with_origin_stats(&blocks, origin.clone())?;
@@ -403,7 +402,6 @@ async fn test_ft_cluster_stats_with_stats() -> databend_common_exception::Result
403402
0,
404403
block_compactor,
405404
operators,
406-
vec![],
407405
FunctionContext::default(),
408406
);
409407
let stats = stats_gen.gen_with_origin_stats(&blocks, origin.clone())?;
@@ -421,7 +419,6 @@ async fn test_ft_cluster_stats_with_stats() -> databend_common_exception::Result
421419
0,
422420
block_compactor,
423421
vec![],
424-
vec![],
425422
FunctionContext::default(),
426423
);
427424
let stats = stats_gen.gen_with_origin_stats(&blocks, origin)?;

src/query/storages/common/index/src/bloom_index.rs

+5
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,11 @@ impl BloomIndexBuilder {
597597
TableDataType::Map(_) | TableDataType::Variant
598598
) {
599599
column_distinct_count.insert(column.field.column_id, len);
600+
// Not need to generate bloom index,
601+
// it will never be used since range index is checked first.
602+
if len < 2 {
603+
continue;
604+
}
600605
}
601606
}
602607
let filter_name =

src/query/storages/fuse/src/fuse_table.rs

+3
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,15 @@ impl FuseTable {
287287
DEFAULT_BLOCK_BUFFER_SIZE,
288288
);
289289
let max_file_size = self.get_option(FUSE_OPT_KEY_FILE_SIZE, DEFAULT_BLOCK_COMPRESSED_SIZE);
290+
let max_rows_per_block =
291+
self.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_BLOCK_ROW_COUNT);
290292

291293
WriteSettings {
292294
storage_format: self.storage_format,
293295
table_compression: self.table_compression,
294296
max_page_size,
295297
block_per_seg,
298+
max_rows_per_block,
296299
min_compressed_per_block: (max_file_size * 4).div_ceil(5),
297300
max_uncompressed_per_block: (max_buffer_size * 3).min(MAX_BLOCK_UNCOMPRESSED_SIZE),
298301
}

src/query/storages/fuse/src/io/write/stream/block_builder.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,15 @@ impl StreamBlockBuilder {
225225
})
226226
}
227227

228-
pub fn block_size(&self) -> usize {
229-
self.block_size
228+
pub fn is_empty(&self) -> bool {
229+
self.row_count == 0
230230
}
231231

232-
pub fn num_rows(&self) -> usize {
233-
self.row_count
234-
}
235-
236-
pub fn file_size(&self) -> usize {
237-
self.block_writer.compressed_size()
232+
pub fn need_flush(&self) -> bool {
233+
let file_size = self.block_writer.compressed_size();
234+
file_size >= self.properties.write_settings.min_compressed_per_block
235+
|| self.block_size >= self.properties.write_settings.max_uncompressed_per_block
236+
|| self.row_count >= self.properties.write_settings.max_rows_per_block
238237
}
239238

240239
pub fn write(&mut self, block: DataBlock) -> Result<()> {
@@ -384,8 +383,12 @@ impl StreamBlockProperties {
384383

385384
let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta);
386385

387-
let cluster_stats_builder =
388-
ClusterStatisticsBuilder::try_create(table, ctx.clone(), write_settings.clone())?;
386+
let cluster_stats_builder = ClusterStatisticsBuilder::try_create(
387+
table,
388+
ctx.clone(),
389+
&source_schema,
390+
write_settings.clone(),
391+
)?;
389392

390393
let mut stats_columns = vec![];
391394
let mut distinct_columns = vec![];

src/query/storages/fuse/src/io/write/stream/cluster_statistics.rs

+5-8
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ use databend_common_exception::Result;
2020
use databend_common_expression::Column;
2121
use databend_common_expression::ColumnRef;
2222
use databend_common_expression::DataBlock;
23-
use databend_common_expression::DataField;
2423
use databend_common_expression::DataSchema;
2524
use databend_common_expression::Expr;
2625
use databend_common_expression::FunctionContext;
2726
use databend_common_expression::Scalar;
27+
use databend_common_expression::TableSchemaRef;
2828
use databend_common_functions::aggregates::eval_aggr;
2929
use databend_common_functions::BUILTIN_FUNCTIONS;
3030
use databend_common_sql::evaluator::BlockOperator;
@@ -51,34 +51,31 @@ impl ClusterStatisticsBuilder {
5151
pub fn try_create(
5252
table: &FuseTable,
5353
ctx: Arc<dyn TableContext>,
54+
source_schema: &TableSchemaRef,
5455
write_settings: WriteSettings,
5556
) -> Result<Arc<Self>> {
5657
let cluster_type = table.cluster_type();
5758
if cluster_type.is_none_or(|v| v == ClusterType::Hilbert) {
5859
return Ok(Default::default());
5960
}
6061

61-
let input_schema: Arc<DataSchema> = DataSchema::from(table.schema_with_stream()).into();
62-
let mut merged = input_schema.fields().clone();
62+
let input_schema: Arc<DataSchema> = DataSchema::from(source_schema).into();
63+
let input_filed_len = input_schema.fields.len();
6364

6465
let cluster_keys = table.linear_cluster_keys(ctx.clone());
6566
let mut cluster_key_index = Vec::with_capacity(cluster_keys.len());
6667
let mut extra_key_num = 0;
6768

6869
let mut exprs = Vec::with_capacity(cluster_keys.len());
69-
7070
for remote_expr in &cluster_keys {
7171
let expr = remote_expr
7272
.as_expr(&BUILTIN_FUNCTIONS)
7373
.project_column_ref(|name| input_schema.index_of(name).unwrap());
7474
let index = match &expr {
7575
Expr::ColumnRef(ColumnRef { id, .. }) => *id,
7676
_ => {
77-
let cname = format!("{}", expr);
78-
merged.push(DataField::new(cname.as_str(), expr.data_type().clone()));
7977
exprs.push(expr);
80-
81-
let offset = merged.len() - 1;
78+
let offset = input_filed_len + extra_key_num;
8279
extra_key_num += 1;
8380
offset
8481
}

src/query/storages/fuse/src/io/write/stream/column_statistics.rs

+2
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ fn scalar_update_hll_cardinality(scalar: &ScalarRef, ty: &DataType, hll: &mut Co
214214
return;
215215
}
216216

217+
let ty = ty.remove_nullable();
218+
217219
with_number_mapped_type!(|NUM_TYPE| match ty {
218220
DataType::Number(NumberDataType::NUM_TYPE) => {
219221
let val = NumberType::<NUM_TYPE>::try_downcast_scalar(scalar).unwrap();

src/query/storages/fuse/src/io/write/write_settings.rs

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use databend_common_io::constants::DEFAULT_BLOCK_COMPRESSED_SIZE;
1616
use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT;
17+
use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT;
1718
use databend_storages_common_table_meta::table::TableCompression;
1819

1920
use crate::FuseStorageFormat;
@@ -29,6 +30,7 @@ pub struct WriteSettings {
2930
pub max_page_size: usize,
3031

3132
pub block_per_seg: usize,
33+
pub max_rows_per_block: usize,
3234
pub min_compressed_per_block: usize,
3335
pub max_uncompressed_per_block: usize,
3436
}
@@ -40,6 +42,7 @@ impl Default for WriteSettings {
4042
table_compression: TableCompression::default(),
4143
max_page_size: DEFAULT_ROW_PER_PAGE,
4244
block_per_seg: DEFAULT_BLOCK_PER_SEGMENT,
45+
max_rows_per_block: DEFAULT_BLOCK_ROW_COUNT,
4346
min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 4).div_ceil(5),
4447
max_uncompressed_per_block: MAX_BLOCK_UNCOMPRESSED_SIZE,
4548
}

src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs

+11-12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_catalog::table_context::TableContext;
2323
use databend_common_exception::ErrorCode;
2424
use databend_common_exception::Result;
2525
use databend_common_expression::DataBlock;
26+
use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT;
2627
use databend_common_pipeline_core::processors::Event;
2728
use databend_common_pipeline_core::processors::InputPort;
2829
use databend_common_pipeline_core::processors::OutputPort;
@@ -37,6 +38,7 @@ use crate::io::BlockWriter;
3738
use crate::io::StreamBlockBuilder;
3839
use crate::io::StreamBlockProperties;
3940
use crate::FuseTable;
41+
use crate::FUSE_OPT_KEY_ROW_PER_BLOCK;
4042

4143
#[allow(clippy::large_enum_variant)]
4244
enum State {
@@ -72,7 +74,10 @@ impl TransformBlockWriter {
7274
table_meta_timestamps: TableMetaTimestamps,
7375
with_tid: bool,
7476
) -> Result<ProcessorPtr> {
75-
let max_block_size = ctx.get_settings().get_max_block_size()? as usize;
77+
let max_block_size = std::cmp::min(
78+
ctx.get_settings().get_max_block_size()? as usize,
79+
table.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_BLOCK_ROW_COUNT),
80+
);
7681
let properties = StreamBlockProperties::try_create(ctx, table, table_meta_timestamps)?;
7782
Ok(ProcessorPtr::create(Box::new(TransformBlockWriter {
7883
state: State::Consume,
@@ -157,22 +162,16 @@ impl Processor for TransformBlockWriter {
157162
fn process(&mut self) -> Result<()> {
158163
match std::mem::replace(&mut self.state, State::Consume) {
159164
State::Serialize(block) => {
160-
let min_compressed = self.properties.write_settings.min_compressed_per_block;
161-
let max_uncompressed = self.properties.write_settings.max_uncompressed_per_block;
162-
let max_block_size = self.max_block_size;
163-
164165
// Check if the datablock is valid, this is needed to ensure data is correct
165166
block.check_valid()?;
166-
let builder = self.get_or_create_builder()?;
167-
let blocks = block.split_by_rows_no_tail(max_block_size);
167+
let blocks = block.split_by_rows_no_tail(self.max_block_size);
168168
let mut blocks = VecDeque::from(blocks);
169+
170+
let builder = self.get_or_create_builder()?;
169171
while let Some(b) = blocks.pop_front() {
170172
builder.write(b)?;
171173

172-
let file_size = builder.file_size();
173-
let written_block_size = builder.block_size();
174-
175-
if file_size >= min_compressed || written_block_size >= max_uncompressed {
174+
if builder.need_flush() {
176175
self.state = State::Flush;
177176

178177
for left in blocks {
@@ -184,7 +183,7 @@ impl Processor for TransformBlockWriter {
184183
}
185184
State::Flush => {
186185
let builder = self.builder.take().unwrap();
187-
if builder.num_rows() > 0 {
186+
if !builder.is_empty() {
188187
let serialized = builder.finish()?;
189188
self.state = State::Write(serialized);
190189
}

tests/sqllogictests/suites/base/09_fuse_engine/09_0005_remote_insert_into_select.test

-20
Original file line numberDiff line numberDiff line change
@@ -68,26 +68,6 @@ select sum(business_id) from t
6868
----
6969
NULL
7070

71-
statement ok
72-
create table t1(a string not null) block_size_threshold=1024;
73-
74-
statement ok
75-
create table t2 like t1 engine = Random;
76-
77-
statement ok
78-
insert into t1 select repeat(a, 500) from t2 limit 10;
79-
80-
query I
81-
select count() > 1 from fuse_block('db_09_0005', 't1');
82-
----
83-
1
84-
85-
statement ok
86-
DROP TABLE t1 ALL
87-
88-
statement ok
89-
DROP TABLE t2
90-
9171
statement ok
9272
DROP TABLE t
9373

tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test

+1-1
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ query II
332332
select segment_count, block_count from fuse_snapshot('db_09_0008', 't5') limit 2
333333
----
334334
1 3
335-
3 4
335+
3 5
336336

337337

338338

0 commit comments

Comments
 (0)