@@ -27,16 +27,20 @@ use databend_common_expression::LimitType;
27
27
use databend_common_expression:: SortColumnDescription ;
28
28
use databend_common_functions:: BUILTIN_FUNCTIONS ;
29
29
use databend_common_pipeline_core:: Pipeline ;
30
+ use databend_common_pipeline_transforms:: build_compact_block_pipeline;
30
31
use databend_common_pipeline_transforms:: processors:: create_dummy_item;
31
32
use databend_common_pipeline_transforms:: processors:: TransformPipelineHelper ;
32
33
use databend_common_pipeline_transforms:: processors:: TransformSortPartial ;
33
34
use databend_common_sql:: evaluator:: BlockOperator ;
34
35
use databend_common_sql:: evaluator:: CompoundBlockOperator ;
36
+ use databend_common_sql:: executor:: physical_plans:: MutationKind ;
35
37
use databend_storages_common_table_meta:: meta:: TableMetaTimestamps ;
36
38
use databend_storages_common_table_meta:: table:: ClusterType ;
37
39
38
40
use crate :: operations:: TransformBlockWriter ;
41
+ use crate :: operations:: TransformSerializeBlock ;
39
42
use crate :: statistics:: ClusterStatsGenerator ;
43
+ use crate :: FuseStorageFormat ;
40
44
use crate :: FuseTable ;
41
45
42
46
impl FuseTable {
@@ -46,16 +50,44 @@ impl FuseTable {
46
50
pipeline : & mut Pipeline ,
47
51
table_meta_timestamps : TableMetaTimestamps ,
48
52
) -> Result < ( ) > {
49
- pipeline. add_transform ( |input, output| {
50
- TransformBlockWriter :: try_create (
51
- ctx. clone ( ) ,
52
- input,
53
- output,
54
- self ,
55
- table_meta_timestamps,
56
- false ,
57
- )
58
- } ) ?;
53
+ match self . storage_format {
54
+ FuseStorageFormat :: Parquet => {
55
+ pipeline. add_transform ( |input, output| {
56
+ TransformBlockWriter :: try_create (
57
+ ctx. clone ( ) ,
58
+ input,
59
+ output,
60
+ self ,
61
+ table_meta_timestamps,
62
+ false ,
63
+ )
64
+ } ) ?;
65
+ }
66
+ FuseStorageFormat :: Native => {
67
+ let block_thresholds = self . get_block_thresholds ( ) ;
68
+ build_compact_block_pipeline ( pipeline, block_thresholds) ?;
69
+
70
+ let schema = DataSchema :: from ( self . schema ( ) ) . into ( ) ;
71
+ let cluster_stats_gen = self . cluster_gen_for_append (
72
+ ctx. clone ( ) ,
73
+ pipeline,
74
+ block_thresholds,
75
+ Some ( schema) ,
76
+ ) ?;
77
+ pipeline. add_transform ( |input, output| {
78
+ let proc = TransformSerializeBlock :: try_create (
79
+ ctx. clone ( ) ,
80
+ input,
81
+ output,
82
+ self ,
83
+ cluster_stats_gen. clone ( ) ,
84
+ MutationKind :: Insert ,
85
+ table_meta_timestamps,
86
+ ) ?;
87
+ proc. into_processor ( )
88
+ } ) ?;
89
+ }
90
+ }
59
91
60
92
Ok ( ( ) )
61
93
}
0 commit comments