Skip to content

Commit efb890b

Browse files
authored
Emit process_nanos from LookupOperator (#120694)
1 parent 144ff0c commit efb890b

File tree

8 files changed

+53
-49
lines changed

8 files changed

+53
-49
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ static TransportVersion def(int id) {
184184
public static final TransportVersion REMOVE_DESIRED_NODE_VERSION = def(9_004_0_00);
185185
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION = def(9_005_0_00);
186186
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE = def(9_006_0_00);
187+
public static final TransportVersion ESQL_PROFILE_ASYNC_NANOS = def(9_007_00_0);
187188

188189
/*
189190
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,10 @@ protected final XContentBuilder innerToXContent(XContentBuilder builder) throws
188188
if (builder.humanReadable()) {
189189
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
190190
}
191-
return builder.field("pages_processed", pagesProcessed).field("rows_received", rowsReceived).field("rows_emitted", rowsEmitted);
191+
builder.field("pages_processed", pagesProcessed);
192+
builder.field("rows_received", rowsReceived);
193+
builder.field("rows_emitted", rowsEmitted);
194+
return builder;
192195
}
193196

194197
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public abstract class AsyncOperator<Fetched> implements Operator {
4545
private final DriverContext driverContext;
4646

4747
private final int maxOutstandingRequests;
48-
private final LongAdder totalTimeInNanos = new LongAdder();
48+
private final LongAdder processNanos = new LongAdder();
4949

5050
private boolean finished = false;
5151
private volatile boolean closed = false;
@@ -98,7 +98,7 @@ public void addInput(Page input) {
9898
final long startNanos = System.nanoTime();
9999
performAsync(input, ActionListener.runAfter(listener, () -> {
100100
driverContext.removeAsyncAction();
101-
totalTimeInNanos.add(System.nanoTime() - startNanos);
101+
processNanos.add(System.nanoTime() - startNanos);
102102
}));
103103
success = true;
104104
} finally {
@@ -231,15 +231,11 @@ public IsBlockedResult isBlocked() {
231231

232232
@Override
233233
public final Operator.Status status() {
234-
return status(
235-
Math.max(0L, checkpoint.getMaxSeqNo()),
236-
Math.max(0L, checkpoint.getProcessedCheckpoint()),
237-
TimeValue.timeValueNanos(totalTimeInNanos.sum()).millis()
238-
);
234+
return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum());
239235
}
240236

241-
protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
242-
return new Status(receivedPages, completedPages, totalTimeInMillis);
237+
protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
238+
return new Status(receivedPages, completedPages, processNanos);
243239
}
244240

245241
public static class Status implements Operator.Status {
@@ -251,25 +247,31 @@ public static class Status implements Operator.Status {
251247

252248
final long receivedPages;
253249
final long completedPages;
254-
final long totalTimeInMillis;
250+
final long processNanos;
255251

256-
protected Status(long receivedPages, long completedPages, long totalTimeInMillis) {
252+
protected Status(long receivedPages, long completedPages, long processNanos) {
257253
this.receivedPages = receivedPages;
258254
this.completedPages = completedPages;
259-
this.totalTimeInMillis = totalTimeInMillis;
255+
this.processNanos = processNanos;
260256
}
261257

262258
protected Status(StreamInput in) throws IOException {
263259
this.receivedPages = in.readVLong();
264260
this.completedPages = in.readVLong();
265-
this.totalTimeInMillis = in.readVLong();
261+
this.processNanos = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ASYNC_NANOS)
262+
? in.readVLong()
263+
: TimeValue.timeValueMillis(in.readVLong()).nanos();
266264
}
267265

268266
@Override
269267
public void writeTo(StreamOutput out) throws IOException {
270268
out.writeVLong(receivedPages);
271269
out.writeVLong(completedPages);
272-
out.writeVLong(totalTimeInMillis);
270+
out.writeVLong(
271+
out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ASYNC_NANOS)
272+
? processNanos
273+
: TimeValue.timeValueNanos(processNanos).millis()
274+
);
273275
}
274276

275277
public long receivedPages() {
@@ -280,8 +282,8 @@ public long completedPages() {
280282
return completedPages;
281283
}
282284

283-
public long totalTimeInMillis() {
284-
return totalTimeInMillis;
285+
public long procesNanos() {
286+
return processNanos;
285287
}
286288

287289
@Override
@@ -297,12 +299,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
297299
}
298300

299301
protected final XContentBuilder innerToXContent(XContentBuilder builder) throws IOException {
302+
builder.field("process_nanos", processNanos);
303+
if (builder.humanReadable()) {
304+
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
305+
}
300306
builder.field("received_pages", receivedPages);
301307
builder.field("completed_pages", completedPages);
302-
builder.field("total_time_in_millis", totalTimeInMillis);
303-
if (totalTimeInMillis >= 0) {
304-
builder.field("total_time", TimeValue.timeValueMillis(totalTimeInMillis));
305-
}
306308
return builder;
307309
}
308310

@@ -311,14 +313,12 @@ public boolean equals(Object o) {
311313
if (this == o) return true;
312314
if (o == null || getClass() != o.getClass()) return false;
313315
Status status = (Status) o;
314-
return receivedPages == status.receivedPages
315-
&& completedPages == status.completedPages
316-
&& totalTimeInMillis == status.totalTimeInMillis;
316+
return receivedPages == status.receivedPages && completedPages == status.completedPages && processNanos == status.processNanos;
317317
}
318318

319319
@Override
320320
public int hashCode() {
321-
return Objects.hash(receivedPages, completedPages, totalTimeInMillis);
321+
return Objects.hash(receivedPages, completedPages, processNanos);
322322
}
323323

324324
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,31 +39,31 @@ protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IO
3939
case 0 -> new AsyncOperator.Status(
4040
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
4141
in.completedPages(),
42-
in.totalTimeInMillis()
42+
in.procesNanos()
4343
);
4444
case 1 -> new AsyncOperator.Status(
4545
in.receivedPages(),
4646
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
47-
in.totalTimeInMillis()
47+
in.procesNanos()
4848
);
4949
case 2 -> new AsyncOperator.Status(
5050
in.receivedPages(),
5151
in.completedPages(),
52-
randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong)
52+
randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong)
5353
);
5454
default -> throw new AssertionError("unknown ");
5555
};
5656
}
5757

5858
public void testToXContent() {
59-
var status = new AsyncOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis());
59+
var status = new AsyncOperator.Status(100, 50, TimeValue.timeValueNanos(10).nanos());
6060
String json = Strings.toString(status, true, true);
6161
assertThat(json, equalTo("""
6262
{
63+
"process_nanos" : 10,
64+
"process_time" : "10nanos",
6365
"received_pages" : 100,
64-
"completed_pages" : 50,
65-
"total_time_in_millis" : 10000,
66-
"total_time" : "10s"
66+
"completed_pages" : 50
6767
}"""));
6868
}
6969
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ protected void doClose() {
175175
}
176176

177177
@Override
178-
protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
179-
return new EnrichLookupOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms);
178+
protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
179+
return new EnrichLookupOperator.Status(receivedPages, completedPages, processNanos, totalTerms);
180180
}
181181

182182
public static class Status extends AsyncOperator.Status {
@@ -188,8 +188,8 @@ public static class Status extends AsyncOperator.Status {
188188

189189
final long totalTerms;
190190

191-
Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms) {
192-
super(receivedPages, completedPages, totalTimeInMillis);
191+
Status(long receivedPages, long completedPages, long processNanos, long totalTerms) {
192+
super(receivedPages, completedPages, processNanos);
193193
this.totalTerms = totalTerms;
194194
}
195195

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ protected void doClose() {
217217
}
218218

219219
@Override
220-
protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
221-
return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages);
220+
protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
221+
return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalTerms, emittedPages);
222222
}
223223

224224
public static class Status extends AsyncOperator.Status {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,25 @@ protected EnrichLookupOperator.Status mutateInstance(EnrichLookupOperator.Status
4141
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
4242
in.completedPages(),
4343
in.totalTerms,
44-
in.totalTimeInMillis()
44+
in.procesNanos()
4545
);
4646
case 1 -> new EnrichLookupOperator.Status(
4747
in.receivedPages(),
4848
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
4949
in.totalTerms,
50-
in.totalTimeInMillis()
50+
in.procesNanos()
5151
);
5252
case 2 -> new EnrichLookupOperator.Status(
5353
in.receivedPages(),
5454
in.completedPages(),
5555
randomValueOtherThan(in.totalTerms, ESTestCase::randomNonNegativeLong),
56-
in.totalTimeInMillis()
56+
in.procesNanos()
5757
);
5858
case 3 -> new EnrichLookupOperator.Status(
5959
in.receivedPages(),
6060
in.completedPages(),
6161
in.totalTerms,
62-
randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong)
62+
randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong)
6363
);
6464
default -> throw new AssertionError("unknown ");
6565
};
@@ -70,10 +70,10 @@ public void testToXContent() {
7070
String json = Strings.toString(status, true, true);
7171
assertThat(json, equalTo("""
7272
{
73+
"process_nanos" : 10000,
74+
"process_time" : "10micros",
7375
"received_pages" : 100,
7476
"completed_pages" : 50,
75-
"total_time_in_millis" : 10000,
76-
"total_time" : "10s",
7777
"total_terms" : 120
7878
}"""));
7979
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,29 +38,29 @@ protected LookupFromIndexOperator.Status createTestInstance() {
3838
protected LookupFromIndexOperator.Status mutateInstance(LookupFromIndexOperator.Status in) throws IOException {
3939
long receivedPages = in.receivedPages();
4040
long completedPages = in.completedPages();
41-
long totalTimeInMillis = in.totalTimeInMillis();
41+
long procesNanos = in.procesNanos();
4242
long totalTerms = in.totalTerms();
4343
long emittedPages = in.emittedPages();
4444
switch (randomIntBetween(0, 4)) {
4545
case 0 -> receivedPages = randomValueOtherThan(receivedPages, ESTestCase::randomNonNegativeLong);
4646
case 1 -> completedPages = randomValueOtherThan(completedPages, ESTestCase::randomNonNegativeLong);
47-
case 2 -> totalTimeInMillis = randomValueOtherThan(totalTimeInMillis, ESTestCase::randomNonNegativeLong);
47+
case 2 -> procesNanos = randomValueOtherThan(procesNanos, ESTestCase::randomNonNegativeLong);
4848
case 3 -> totalTerms = randomValueOtherThan(totalTerms, ESTestCase::randomNonNegativeLong);
4949
case 4 -> emittedPages = randomValueOtherThan(emittedPages, ESTestCase::randomNonNegativeLong);
5050
default -> throw new UnsupportedOperationException();
5151
}
52-
return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages);
52+
return new LookupFromIndexOperator.Status(receivedPages, completedPages, procesNanos, totalTerms, emittedPages);
5353
}
5454

5555
public void testToXContent() {
5656
var status = new LookupFromIndexOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis(), 120, 88);
5757
String json = Strings.toString(status, true, true);
5858
assertThat(json, equalTo("""
5959
{
60+
"process_nanos" : 10000,
61+
"process_time" : "10micros",
6062
"received_pages" : 100,
6163
"completed_pages" : 50,
62-
"total_time_in_millis" : 10000,
63-
"total_time" : "10s",
6464
"emitted_pages" : 88,
6565
"total_terms" : 120
6666
}"""));

0 commit comments

Comments
 (0)