Skip to content

Commit b7fc71e

Browse files
committed
Merge branch 'main' into 2024/05/22/versioned-master-node-requests
2 parents 34fabf5 + 6f857fb commit b7fc71e

File tree

21 files changed

+293
-99
lines changed

21 files changed

+293
-99
lines changed

server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ private static boolean assertGenerationConsistency(
9797
@Nullable ShardGeneration activeGeneration
9898
) {
9999
final ShardGeneration bestGeneration = generations.getOrDefault(indexName, Collections.emptyMap()).get(shardId);
100-
assert bestGeneration == null || activeGeneration == null || activeGeneration.equals(bestGeneration);
100+
assert bestGeneration == null || activeGeneration == null || activeGeneration.equals(bestGeneration)
101+
: "[" + indexName + "][" + shardId + "]: " + bestGeneration + " vs " + activeGeneration;
101102
return true;
102103
}
103104

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@
4747
import static org.elasticsearch.xpack.esql.type.EsqlDataTypes.CARTESIAN_POINT;
4848
import static org.elasticsearch.xpack.esql.type.EsqlDataTypes.GEO_POINT;
4949

50-
public class AggregateMapper {
50+
final class AggregateMapper {
5151

52-
static final List<String> NUMERIC = List.of("Int", "Long", "Double");
53-
static final List<String> SPATIAL = List.of("GeoPoint", "CartesianPoint");
52+
private static final List<String> NUMERIC = List.of("Int", "Long", "Double");
53+
private static final List<String> SPATIAL = List.of("GeoPoint", "CartesianPoint");
5454

5555
/** List of all mappable ESQL agg functions (excludes surrogates like AVG = SUM/COUNT). */
56-
static final List<? extends Class<? extends Function>> AGG_FUNCTIONS = List.of(
56+
private static final List<? extends Class<? extends Function>> AGG_FUNCTIONS = List.of(
5757
Count.class,
5858
CountDistinct.class,
5959
Max.class,
@@ -66,23 +66,19 @@ public class AggregateMapper {
6666
);
6767

6868
/** Record of agg Class, type, and grouping (or non-grouping). */
69-
record AggDef(Class<?> aggClazz, String type, String extra, boolean grouping) {}
69+
private record AggDef(Class<?> aggClazz, String type, String extra, boolean grouping) {}
7070

7171
/** Map of AggDef types to intermediate named expressions. */
72-
private final Map<AggDef, List<IntermediateStateDesc>> mapper;
72+
private static final Map<AggDef, List<IntermediateStateDesc>> mapper = AGG_FUNCTIONS.stream()
73+
.flatMap(AggregateMapper::typeAndNames)
74+
.flatMap(AggregateMapper::groupingAndNonGrouping)
75+
.collect(Collectors.toUnmodifiableMap(aggDef -> aggDef, AggregateMapper::lookupIntermediateState));
7376

7477
/** Cache of aggregates to intermediate expressions. */
75-
private final HashMap<Expression, List<? extends NamedExpression>> cache = new HashMap<>();
78+
private final HashMap<Expression, List<? extends NamedExpression>> cache;
7679

7780
AggregateMapper() {
78-
this(AGG_FUNCTIONS);
79-
}
80-
81-
AggregateMapper(List<? extends Class<? extends Function>> aggregateFunctionClasses) {
82-
mapper = aggregateFunctionClasses.stream()
83-
.flatMap(AggregateMapper::typeAndNames)
84-
.flatMap(AggregateMapper::groupingAndNonGrouping)
85-
.collect(Collectors.toUnmodifiableMap(aggDef -> aggDef, AggregateMapper::lookupIntermediateState));
81+
cache = new HashMap<>();
8682
}
8783

8884
public List<? extends NamedExpression> mapNonGrouping(List<? extends Expression> aggregates) {
@@ -108,11 +104,10 @@ public List<? extends NamedExpression> mapGrouping(Expression aggregate) {
108104
}
109105

110106
private Stream<? extends NamedExpression> map(Expression aggregate, boolean grouping) {
111-
aggregate = Alias.unwrap(aggregate);
112-
return cache.computeIfAbsent(aggregate, aggKey -> computeEntryForAgg(aggKey, grouping)).stream();
107+
return cache.computeIfAbsent(Alias.unwrap(aggregate), aggKey -> computeEntryForAgg(aggKey, grouping)).stream();
113108
}
114109

115-
private List<? extends NamedExpression> computeEntryForAgg(Expression aggregate, boolean grouping) {
110+
private static List<? extends NamedExpression> computeEntryForAgg(Expression aggregate, boolean grouping) {
116111
var aggDef = aggDefOrNull(aggregate, grouping);
117112
if (aggDef != null) {
118113
var is = getNonNull(aggDef);
@@ -128,7 +123,7 @@ private List<? extends NamedExpression> computeEntryForAgg(Expression aggregate,
128123
}
129124

130125
/** Gets the agg from the mapper - wrapper around map::get for more informative failure.*/
131-
private List<IntermediateStateDesc> getNonNull(AggDef aggDef) {
126+
private static List<IntermediateStateDesc> getNonNull(AggDef aggDef) {
132127
var l = mapper.get(aggDef);
133128
if (l == null) {
134129
throw new EsqlIllegalArgumentException("Cannot find intermediate state for: " + aggDef);
@@ -268,9 +263,4 @@ private static String dataTypeToString(DataType type, Class<?> aggClass) {
268263
throw new EsqlIllegalArgumentException("illegal agg type: " + type.typeName());
269264
}
270265
}
271-
272-
private static Expression unwrapAlias(Expression expression) {
273-
if (expression instanceof Alias alias) return alias.child();
274-
return expression;
275-
}
276266
}

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,16 @@ public void testJobsVacateShuttingDownNode() throws Exception {
9595
final TimeValue grace = type == SIGTERM ? randomTimeValue() : null;
9696
client().execute(
9797
PutShutdownNodeAction.INSTANCE,
98-
new PutShutdownNodeAction.Request(nodeIdToShutdown.get(), type, "just testing", null, targetNodeName, grace)
98+
new PutShutdownNodeAction.Request(
99+
TEST_REQUEST_TIMEOUT,
100+
TEST_REQUEST_TIMEOUT,
101+
nodeIdToShutdown.get(),
102+
type,
103+
"just testing",
104+
null,
105+
targetNodeName,
106+
grace
107+
)
99108
).actionGet();
100109

101110
// Wait for the desired end state of all 6 jobs running on nodes that are not shutting down.
@@ -189,7 +198,16 @@ public void testCloseJobVacatingShuttingDownNode() throws Exception {
189198
final TimeValue grace = type == SIGTERM ? randomTimeValue() : null;
190199
client().execute(
191200
PutShutdownNodeAction.INSTANCE,
192-
new PutShutdownNodeAction.Request(nodeIdToShutdown.get(), type, "just testing", null, targetNodeName, grace)
201+
new PutShutdownNodeAction.Request(
202+
TEST_REQUEST_TIMEOUT,
203+
TEST_REQUEST_TIMEOUT,
204+
nodeIdToShutdown.get(),
205+
type,
206+
"just testing",
207+
null,
208+
targetNodeName,
209+
grace
210+
)
193211
).actionGet();
194212

195213
if (randomBoolean()) {

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotShutdownIntegTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ private List<String> setupMountedIndices() throws Exception {
117117

118118
private void putShutdown(String nodeToRestartId) throws InterruptedException, ExecutionException {
119119
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
120+
TEST_REQUEST_TIMEOUT,
121+
TEST_REQUEST_TIMEOUT,
120122
nodeToRestartId,
121123
SingleNodeShutdownMetadata.Type.RESTART,
122124
this.getTestName(),
@@ -128,6 +130,11 @@ private void putShutdown(String nodeToRestartId) throws InterruptedException, Ex
128130
}
129131

130132
private void removeShutdown(String node) throws ExecutionException, InterruptedException {
131-
assertTrue(client().execute(DeleteShutdownNodeAction.INSTANCE, new DeleteShutdownNodeAction.Request(node)).get().isAcknowledged());
133+
assertTrue(
134+
client().execute(
135+
DeleteShutdownNodeAction.INSTANCE,
136+
new DeleteShutdownNodeAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, node)
137+
).get().isAcknowledged()
138+
);
132139
}
133140
}

x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/DesiredBalanceShutdownIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public Settings onNodeStopped(String newNodeName) {
6262
client().execute(
6363
PutShutdownNodeAction.INSTANCE,
6464
new PutShutdownNodeAction.Request(
65+
TEST_REQUEST_TIMEOUT,
66+
TEST_REQUEST_TIMEOUT,
6567
oldNodeId,
6668
SingleNodeShutdownMetadata.Type.REPLACE,
6769
"test",

x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownDelayedAllocationIT.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public void testShardAllocationIsDelayedForRestartingNode() throws Exception {
4949

5050
// Mark the node for shutdown
5151
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
52+
TEST_REQUEST_TIMEOUT,
53+
TEST_REQUEST_TIMEOUT,
5254
nodeToRestartId,
5355
SingleNodeShutdownMetadata.Type.RESTART,
5456
this.getTestName(),
@@ -85,6 +87,8 @@ public void testShardAllocationWillProceedAfterTimeout() throws Exception {
8587

8688
// Mark the node for shutdown
8789
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
90+
TEST_REQUEST_TIMEOUT,
91+
TEST_REQUEST_TIMEOUT,
8892
nodeToRestartId,
8993
SingleNodeShutdownMetadata.Type.RESTART,
9094
this.getTestName(),
@@ -116,6 +120,8 @@ public void testIndexLevelAllocationDelayWillBeUsedIfLongerThanShutdownDelay() t
116120

117121
// Mark the node for shutdown
118122
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
123+
TEST_REQUEST_TIMEOUT,
124+
TEST_REQUEST_TIMEOUT,
119125
nodeToRestartId,
120126
SingleNodeShutdownMetadata.Type.RESTART,
121127
this.getTestName(),
@@ -143,6 +149,8 @@ public void testShardAllocationTimeoutCanBeChanged() throws Exception {
143149

144150
// Update the timeout on the shutdown request to something shorter
145151
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
152+
TEST_REQUEST_TIMEOUT,
153+
TEST_REQUEST_TIMEOUT,
146154
nodeToRestartId,
147155
SingleNodeShutdownMetadata.Type.RESTART,
148156
this.getTestName(),
@@ -160,7 +168,11 @@ public void testShardAllocationTimeoutCanBeChanged() throws Exception {
160168
public void testShardAllocationStartsImmediatelyIfShutdownDeleted() throws Exception {
161169
String nodeToRestartId = setupLongTimeoutTestCase();
162170

163-
DeleteShutdownNodeAction.Request deleteShutdownRequest = new DeleteShutdownNodeAction.Request(nodeToRestartId);
171+
DeleteShutdownNodeAction.Request deleteShutdownRequest = new DeleteShutdownNodeAction.Request(
172+
TEST_REQUEST_TIMEOUT,
173+
TEST_REQUEST_TIMEOUT,
174+
nodeToRestartId
175+
);
164176
AcknowledgedResponse deleteShutdownResponse = client().execute(DeleteShutdownNodeAction.INSTANCE, deleteShutdownRequest).get();
165177
assertTrue(deleteShutdownResponse.isAcknowledged());
166178

@@ -189,6 +201,8 @@ private String setupLongTimeoutTestCase() throws Exception {
189201
{
190202
// Mark the node for shutdown with a delay that we'll never reach in the test
191203
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
204+
TEST_REQUEST_TIMEOUT,
205+
TEST_REQUEST_TIMEOUT,
192206
nodeToRestartId,
193207
SingleNodeShutdownMetadata.Type.RESTART,
194208
this.getTestName(),

x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,16 @@ public void testShutdownAwarePlugin() throws Exception {
5959
// Mark the node as shutting down
6060
client().execute(
6161
PutShutdownNodeAction.INSTANCE,
62-
new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing", null, null, null)
62+
new PutShutdownNodeAction.Request(
63+
TEST_REQUEST_TIMEOUT,
64+
TEST_REQUEST_TIMEOUT,
65+
shutdownNode,
66+
SingleNodeShutdownMetadata.Type.REMOVE,
67+
"removal for testing",
68+
null,
69+
null,
70+
null
71+
)
6372
).get();
6473

6574
GetShutdownStatusAction.Response getResp = client().execute(
@@ -86,7 +95,10 @@ public void testShutdownAwarePlugin() throws Exception {
8695
// The shutdown node should be in the triggered list
8796
assertThat(triggeredNodes.get(), contains(shutdownNode));
8897

89-
client().execute(DeleteShutdownNodeAction.INSTANCE, new DeleteShutdownNodeAction.Request(shutdownNode)).get();
98+
client().execute(
99+
DeleteShutdownNodeAction.INSTANCE,
100+
new DeleteShutdownNodeAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, shutdownNode)
101+
).get();
90102

91103
// The shutdown node should now not in the triggered list
92104
assertThat(triggeredNodes.get(), empty());

x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownReadinessIT.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,27 @@ private void putNodeShutdown(String nodeId, SingleNodeShutdownMetadata.Type type
8181
assertAcked(
8282
client().execute(
8383
PutShutdownNodeAction.INSTANCE,
84-
new PutShutdownNodeAction.Request(nodeId, type, this.getTestName(), allocationDelay, null, null)
84+
new PutShutdownNodeAction.Request(
85+
TEST_REQUEST_TIMEOUT,
86+
TEST_REQUEST_TIMEOUT,
87+
nodeId,
88+
type,
89+
this.getTestName(),
90+
allocationDelay,
91+
null,
92+
null
93+
)
8594
)
8695
);
8796
}
8897

8998
private void deleteNodeShutdown(String nodeId) {
90-
assertAcked(client().execute(DeleteShutdownNodeAction.INSTANCE, new DeleteShutdownNodeAction.Request(nodeId)));
99+
assertAcked(
100+
client().execute(
101+
DeleteShutdownNodeAction.INSTANCE,
102+
new DeleteShutdownNodeAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, nodeId)
103+
)
104+
);
91105
}
92106

93107
private void assertNoShuttingDownNodes(String nodeId) throws ExecutionException, InterruptedException {

x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,16 @@ private void putNodeShutdown(String nodeId, SingleNodeShutdownMetadata.Type type
458458
assertAcked(
459459
client().execute(
460460
PutShutdownNodeAction.INSTANCE,
461-
new PutShutdownNodeAction.Request(nodeId, type, this.getTestName(), null, nodeReplacementName, null)
461+
new PutShutdownNodeAction.Request(
462+
TEST_REQUEST_TIMEOUT,
463+
TEST_REQUEST_TIMEOUT,
464+
nodeId,
465+
type,
466+
this.getTestName(),
467+
null,
468+
nodeReplacementName,
469+
null
470+
)
462471
)
463472
);
464473
}

x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,16 @@ public void testTasksAreNotAssignedToShuttingDownNode() throws Exception {
9191
// Mark the node as shutting down
9292
client().execute(
9393
PutShutdownNodeAction.INSTANCE,
94-
new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing", null, null, null)
94+
new PutShutdownNodeAction.Request(
95+
TEST_REQUEST_TIMEOUT,
96+
TEST_REQUEST_TIMEOUT,
97+
shutdownNode,
98+
SingleNodeShutdownMetadata.Type.REMOVE,
99+
"removal for testing",
100+
null,
101+
null,
102+
null
103+
)
95104
).get();
96105

97106
// Tell the persistent task executor it can start allocating the task

x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/DeleteShutdownNodeAction.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,20 @@
77

88
package org.elasticsearch.xpack.shutdown;
99

10-
import org.elasticsearch.TransportVersions;
1110
import org.elasticsearch.action.ActionRequestValidationException;
1211
import org.elasticsearch.action.ActionType;
1312
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1413
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1514
import org.elasticsearch.common.Strings;
1615
import org.elasticsearch.common.io.stream.StreamInput;
1716
import org.elasticsearch.common.io.stream.StreamOutput;
18-
import org.elasticsearch.tasks.TaskId;
17+
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.core.UpdateForV9;
1919

2020
import java.io.IOException;
2121

22+
import static org.elasticsearch.xpack.shutdown.ShutdownPlugin.serializesWithParentTaskAndTimeouts;
23+
2224
public class DeleteShutdownNodeAction extends ActionType<AcknowledgedResponse> {
2325

2426
public static final DeleteShutdownNodeAction INSTANCE = new DeleteShutdownNodeAction();
@@ -32,29 +34,35 @@ public static class Request extends AcknowledgedRequest<Request> {
3234

3335
private final String nodeId;
3436

35-
public Request(String nodeId) {
36-
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
37+
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String nodeId) {
38+
super(masterNodeTimeout, ackTimeout);
3739
this.nodeId = nodeId;
3840
}
3941

40-
public Request(StreamInput in) throws IOException {
41-
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
42-
if (in.getTransportVersion().isPatchFrom(TransportVersions.V_8_13_4)
43-
|| in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)
44-
|| in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
45-
// effectively super(in):
46-
setParentTask(TaskId.readFromStream(in));
47-
masterNodeTimeout(in.readTimeValue());
48-
ackTimeout(in.readTimeValue());
42+
@UpdateForV9 // inline when bwc no longer needed
43+
public static Request readFrom(StreamInput in) throws IOException {
44+
if (serializesWithParentTaskAndTimeouts(in.getTransportVersion())) {
45+
return new Request(in);
46+
} else {
47+
return new Request(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, in);
4948
}
49+
}
50+
51+
private Request(StreamInput in) throws IOException {
52+
super(in);
53+
assert serializesWithParentTaskAndTimeouts(in.getTransportVersion());
5054
this.nodeId = in.readString();
5155
}
5256

57+
@UpdateForV9 // remove when bwc no longer needed
58+
private Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, StreamInput in) throws IOException {
59+
this(masterNodeTimeout, ackTimeout, in.readString());
60+
assert serializesWithParentTaskAndTimeouts(in.getTransportVersion()) == false;
61+
}
62+
5363
@Override
5464
public void writeTo(StreamOutput out) throws IOException {
55-
if (out.getTransportVersion().isPatchFrom(TransportVersions.V_8_13_4)
56-
|| out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)
57-
|| out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
65+
if (serializesWithParentTaskAndTimeouts(out.getTransportVersion())) {
5866
super.writeTo(out);
5967
}
6068
out.writeString(this.nodeId);

0 commit comments

Comments
 (0)