@@ -205,71 +205,6 @@ impl BlockReader {
205
205
Ok ( ( num_rows, columns_array_iter) )
206
206
}
207
207
208
- async fn read_columns ( & self , part : PartInfoPtr ) -> Result < ( usize , Vec < ArrayIter < ' static > > ) > {
209
- let part = FusePartInfo :: from_part ( & part) ?;
210
-
211
- // TODO: add prefetch column data.
212
- let num_rows = part. nums_rows ;
213
- let num_cols = self . projection . len ( ) ;
214
- let mut column_chunk_futs = Vec :: with_capacity ( num_cols) ;
215
-
216
- let columns = self . projection . project_column_leaves ( & self . column_leaves ) ?;
217
- let indices = Self :: build_projection_indices ( & columns) ;
218
- for ( index, _) in indices {
219
- let column_meta = & part. columns_meta [ & index] ;
220
- let column_reader = self . operator . object ( & part. location ) ;
221
- let fut = async move {
222
- let ( idx, column_chunk) =
223
- Self :: read_column ( column_reader, index, column_meta. offset , column_meta. len )
224
- . await ?;
225
- Ok :: < _ , ErrorCode > ( ( idx, column_chunk) )
226
- }
227
- . instrument ( debug_span ! ( "read_col_chunk" ) ) ;
228
- column_chunk_futs. push ( fut) ;
229
- }
230
-
231
- let num_cols = column_chunk_futs. len ( ) ;
232
- let chunks = futures:: stream:: iter ( column_chunk_futs)
233
- . buffered ( std:: cmp:: min ( 10 , num_cols) )
234
- . try_collect :: < Vec < _ > > ( )
235
- . await ?;
236
-
237
- let mut chunk_map: HashMap < usize , Vec < u8 > > = chunks. into_iter ( ) . collect ( ) ;
238
- let mut cnt_map = Self :: build_projection_count_map ( & columns) ;
239
- let mut columns_array_iter = Vec :: with_capacity ( num_cols) ;
240
- for column in & columns {
241
- let field = column. field . clone ( ) ;
242
- let indices = & column. leaf_ids ;
243
- let mut column_metas = Vec :: with_capacity ( indices. len ( ) ) ;
244
- let mut column_chunks = Vec :: with_capacity ( indices. len ( ) ) ;
245
- let mut column_descriptors = Vec :: with_capacity ( indices. len ( ) ) ;
246
- for index in indices {
247
- let column_meta = & part. columns_meta [ index] ;
248
- let cnt = cnt_map. get_mut ( index) . unwrap ( ) ;
249
- * cnt -= 1 ;
250
- let column_chunk = if cnt > & mut 0 {
251
- chunk_map. get ( index) . unwrap ( ) . clone ( )
252
- } else {
253
- chunk_map. remove ( index) . unwrap ( )
254
- } ;
255
- let column_descriptor = & self . parquet_schema_descriptor . columns ( ) [ * index] ;
256
- column_metas. push ( column_meta) ;
257
- column_chunks. push ( column_chunk) ;
258
- column_descriptors. push ( column_descriptor) ;
259
- }
260
- columns_array_iter. push ( Self :: to_array_iter (
261
- column_metas,
262
- column_chunks,
263
- num_rows,
264
- column_descriptors,
265
- field,
266
- & part. compression ,
267
- ) ?) ;
268
- }
269
-
270
- Ok ( ( num_rows, columns_array_iter) )
271
- }
272
-
273
208
pub fn build_block ( & self , chunks : Vec < ( usize , Box < dyn Array > ) > ) -> Result < DataBlock > {
274
209
let mut results = Vec :: with_capacity ( chunks. len ( ) ) ;
275
210
let mut chunk_map: HashMap < usize , Box < dyn Array > > = chunks. into_iter ( ) . collect ( ) ;
@@ -402,13 +337,6 @@ impl BlockReader {
402
337
Ok ( ( index, chunk) )
403
338
}
404
339
405
- #[ tracing:: instrument( level = "debug" , skip_all) ]
406
- pub async fn read ( & self , part : PartInfoPtr ) -> Result < DataBlock > {
407
- let ( num_rows, columns_array_iter) = self . read_columns ( part) . await ?;
408
- let mut deserializer = RowGroupDeserializer :: new ( columns_array_iter, num_rows, None ) ;
409
- self . try_next_block ( & mut deserializer)
410
- }
411
-
412
340
fn try_next_block ( & self , deserializer : & mut RowGroupDeserializer ) -> Result < DataBlock > {
413
341
match deserializer. next ( ) {
414
342
None => Err ( ErrorCode :: Internal (
0 commit comments