74
74
import org .elasticsearch .xpack .ql .expression .Expression ;
75
75
import org .elasticsearch .xpack .ql .expression .Expressions ;
76
76
import org .elasticsearch .xpack .ql .expression .Literal ;
77
+ import org .elasticsearch .xpack .ql .expression .NameId ;
77
78
import org .elasticsearch .xpack .ql .expression .NamedExpression ;
78
79
import org .elasticsearch .xpack .ql .expression .Order ;
79
80
import org .elasticsearch .xpack .ql .expression .function .aggregate .AggregateFunction ;
84
85
import java .util .ArrayList ;
85
86
import java .util .BitSet ;
86
87
import java .util .HashMap ;
88
+ import java .util .HashSet ;
87
89
import java .util .List ;
88
90
import java .util .Map ;
89
91
import java .util .Set ;
@@ -278,7 +280,7 @@ public PhysicalOperation plan(PhysicalPlan node, LocalExecutionPlanContext conte
278
280
}
279
281
}
280
282
Function <Page , Page > mapper = transformRequired ? p -> {
281
- var blocks = new Block [p . getBlockCount () ];
283
+ var blocks = new Block [mappedPosition . length ];
282
284
for (int i = 0 ; i < blocks .length ; i ++) {
283
285
blocks [i ] = p .getBlock (mappedPosition [i ]);
284
286
}
@@ -344,9 +346,8 @@ public PhysicalOperation plan(PhysicalPlan node, LocalExecutionPlanContext conte
344
346
} else {
345
347
throw new UnsupportedOperationException ();
346
348
}
347
- Map <Object , Integer > layout = new HashMap <>();
348
- layout .putAll (source .layout );
349
- layout .put (namedExpression .toAttribute ().id (), layout .size ());
349
+ Map <Object , Integer > layout = new HashMap <>(source .layout );
350
+ layout .put (namedExpression .toAttribute ().id (), nextFreeChannel (layout ));
350
351
return new PhysicalOperation (
351
352
new EvalOperatorFactory (evaluator , namedExpression .dataType ().isRational () ? Double .TYPE : Long .TYPE ),
352
353
layout ,
@@ -368,22 +369,46 @@ public PhysicalOperation plan(PhysicalPlan node, LocalExecutionPlanContext conte
368
369
return new PhysicalOperation (new RowOperatorFactory (obj ), layout );
369
370
} else if (node instanceof ProjectExec project ) {
370
371
var source = plan (project .child (), context );
371
- Map <Object , Integer > layout = new HashMap <>();
372
372
373
- var outputSet = project .outputSet ();
374
- var input = project .child ().output ();
375
- var mask = new BitSet (input .size ());
376
- int layoutPos = 0 ;
377
- for (Attribute element : input ) {
378
- var id = element .id ();
379
- var maskPosition = source .layout .get (id );
380
- var keepColumn = outputSet .contains (element );
381
- mask .set (maskPosition , keepColumn );
382
- if (keepColumn ) {
383
- layout .put (id , layoutPos ++);
373
+ Map <Integer , Set <NameId >> inputChannelToInputIds = new HashMap <>(source .layout .size ());
374
+ for (Map .Entry <Object , Integer > entry : source .layout .entrySet ()) {
375
+ inputChannelToInputIds .computeIfAbsent (entry .getValue (), ignore -> new HashSet <>()).add ((NameId ) entry .getKey ());
376
+ }
377
+
378
+ Map <Integer , Set <NameId >> inputChannelToOutputIds = new HashMap <>(inputChannelToInputIds .size ());
379
+ for (NamedExpression ne : project .projections ()) {
380
+ NameId inputId ;
381
+ if (ne instanceof Alias a ) {
382
+ inputId = ((NamedExpression ) a .child ()).id ();
383
+ } else {
384
+ inputId = ne .id ();
385
+ }
386
+ int inputChannel = source .layout .get (inputId );
387
+ inputChannelToOutputIds .computeIfAbsent (inputChannel , ignore -> new HashSet <>()).add (ne .id ());
388
+ }
389
+
390
+ BitSet mask = new BitSet (inputChannelToInputIds .size ());
391
+ Map <Object , Integer > layout = new HashMap <>(project .projections ().size ());
392
+ int outChannel = 0 ;
393
+
394
+ for (int inChannel = 0 ; inChannel < inputChannelToInputIds .size (); inChannel ++) {
395
+ Set <NameId > outputIds = inputChannelToOutputIds .get (inChannel );
396
+
397
+ if (outputIds != null ) {
398
+ mask .set (inChannel );
399
+ for (NameId outId : outputIds ) {
400
+ layout .put (outId , outChannel );
401
+ }
402
+ outChannel ++;
384
403
}
385
404
}
386
- return new PhysicalOperation (new ProjectOperatorFactory (mask ), layout , source );
405
+
406
+ if (mask .cardinality () == inputChannelToInputIds .size ()) {
407
+ // all columns are retained, project operator is not needed but the layout needs to be updated
408
+ return new PhysicalOperation (source .operatorFactories , layout );
409
+ } else {
410
+ return new PhysicalOperation (new ProjectOperatorFactory (mask ), layout , source );
411
+ }
387
412
} else if (node instanceof FilterExec filter ) {
388
413
PhysicalOperation source = plan (filter .child (), context );
389
414
return new PhysicalOperation (new FilterOperatorFactory (toEvaluator (filter .condition (), source .layout )), source .layout , source );
@@ -421,7 +446,7 @@ private PhysicalOperation planFieldExtractNode(LocalExecutionPlanContext context
421
446
PhysicalOperation op = source ;
422
447
for (Attribute attr : fieldExtractExec .attributesToExtract ()) {
423
448
layout = new HashMap <>(layout );
424
- layout .put (attr .id (), layout . size ( ));
449
+ layout .put (attr .id (), nextFreeChannel ( layout ));
425
450
Map <Object , Integer > previousLayout = op .layout ;
426
451
427
452
// Create ValuesSource object for the field to extract its values
@@ -519,6 +544,11 @@ public static class PhysicalOperation implements Describable {
519
544
this .layout = layout ;
520
545
}
521
546
547
+ PhysicalOperation (List <OperatorFactory > operatorFactories , Map <Object , Integer > layout ) {
548
+ this .operatorFactories .addAll (operatorFactories );
549
+ this .layout = layout ;
550
+ }
551
+
522
552
PhysicalOperation (OperatorFactory operatorFactory , Map <Object , Integer > layout , PhysicalOperation source ) {
523
553
this .operatorFactories .addAll (source .operatorFactories );
524
554
this .operatorFactories .add (operatorFactory );
@@ -535,6 +565,16 @@ public String describe() {
535
565
}
536
566
}
537
567
568
+ private static int nextFreeChannel (Map <Object , Integer > layout ) {
569
+ int nextChannel = 0 ;
570
+ for (int channel : layout .values ()) {
571
+ if (channel >= nextChannel ) {
572
+ nextChannel = channel + 1 ;
573
+ }
574
+ }
575
+ return nextChannel ;
576
+ }
577
+
538
578
/**
539
579
* The count and type of driver parallelism.
540
580
*/
0 commit comments