Skip to content

Commit 4d2d8d4

Browse files
committed
Be more specific than IllegalArgumentException (elastic#25)
Today, we throw an overly-general IllegalArgumentException to reject unacceptable messages in the consensus layer. We expect messages to be rejected from time to time, if delayed or some other failure occurs, and it is not useful to log the full stack trace leading to these exceptions. This change introduces ConsensusMessageRejectedException for these cases, and suppresses overly verbose logging when receiving such a message. It does not include any of the extra machinery needed to pass these exceptions over the wire.
1 parent 4939751 commit 4d2d8d4

File tree

7 files changed

+101
-50
lines changed

7 files changed

+101
-50
lines changed

core/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -987,8 +987,10 @@ private enum ElasticsearchExceptionHandle {
987987
UNKNOWN_NAMED_OBJECT_EXCEPTION(org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException.class,
988988
org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException::new, 148, Version.V_5_2_0),
989989
TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class,
990-
MultiBucketConsumerService.TooManyBucketsException::new, 149,
991-
Version.V_7_0_0_alpha1);
990+
MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0_alpha1),
991+
CONSENSUS_MESSAGE_REJECTED_EXCEPTION(org.elasticsearch.discovery.zen2.ConsensusMessageRejectedException.class,
992+
org.elasticsearch.discovery.zen2.ConsensusMessageRejectedException::new, 150, Version.V_7_0_0_alpha1);
993+
992994

993995
final Class<? extends ElasticsearchException> exceptionClass;
994996
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.discovery.zen2;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
25+
import java.io.IOException;
26+
27+
public class ConsensusMessageRejectedException extends ElasticsearchException {
28+
public ConsensusMessageRejectedException(String msg, Object... args) {
29+
super(msg, args);
30+
}
31+
32+
public ConsensusMessageRejectedException(StreamInput in) throws IOException {
33+
super(in);
34+
}
35+
}

core/src/main/java/org/elasticsearch/discovery/zen2/ConsensusState.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,13 @@ public long lastAcceptedTerm() {
105105
*
106106
* @param newTerm The new term
107107
* @return A Vote that must be sent to at most one other node.
108-
* @throws IllegalArgumentException if the arguments were incompatible with the current state of this object.
108+
* @throws ConsensusMessageRejectedException if the arguments were incompatible with the current state of this object.
109109
*/
110110
public Vote handleStartVote(long newTerm) {
111111
if (newTerm <= getCurrentTerm()) {
112112
logger.debug("handleStartVote: ignored as term provided [{}] lower or equal than current term [{}]",
113113
newTerm, getCurrentTerm());
114-
throw new IllegalArgumentException("incoming term " + newTerm + " lower than current term " + getCurrentTerm());
114+
throw new ConsensusMessageRejectedException("incoming term " + newTerm + " lower than current term " + getCurrentTerm());
115115
}
116116

117117
logger.debug("handleStartVote: updating term from [{}] to [{}]", getCurrentTerm(), newTerm);
@@ -132,25 +132,26 @@ public Vote handleStartVote(long newTerm) {
132132
* @param sourceNode The sender of the Vote received.
133133
* @param vote The Vote received.
134134
* @return An optional PublishRequest which, if present, can be broadcast to all peers.
135-
* @throws IllegalArgumentException if the arguments were incompatible with the current state of this object.
135+
* @throws ConsensusMessageRejectedException if the arguments were incompatible with the current state of this object.
136136
*/
137137
public Optional<PublishRequest<T>> handleVote(DiscoveryNode sourceNode, Vote vote) {
138138
if (vote.getTerm() != getCurrentTerm()) {
139139
logger.debug("handleVote: ignored vote due to term mismatch (expected: [{}], actual: [{}])",
140140
getCurrentTerm(), vote.getTerm());
141-
throw new IllegalArgumentException("incoming term " + vote.getTerm() + " does not match current term " + getCurrentTerm());
141+
throw new ConsensusMessageRejectedException(
142+
"incoming term " + vote.getTerm() + " does not match current term " + getCurrentTerm());
142143
}
143144
if (vote.getFirstUncommittedSlot() > firstUncommittedSlot()) {
144145
logger.debug("handleVote: ignored vote due to slot mismatch (expected: <=[{}], actual: [{}])",
145146
firstUncommittedSlot(), vote.getFirstUncommittedSlot());
146-
throw new IllegalArgumentException("incoming slot " + vote.getFirstUncommittedSlot() + " higher than current slot " +
147-
firstUncommittedSlot());
147+
throw new ConsensusMessageRejectedException(
148+
"incoming slot " + vote.getFirstUncommittedSlot() + " higher than current slot " + firstUncommittedSlot());
148149
}
149150
final long lastAcceptedTerm = lastAcceptedTerm();
150151
if (vote.getFirstUncommittedSlot() == firstUncommittedSlot() && vote.getLastAcceptedTerm() > lastAcceptedTerm) {
151152
logger.debug("handleVote: ignored vote as voter has better last accepted term (expected: <=[{}], actual: [{}])",
152153
lastAcceptedTerm, vote.getLastAcceptedTerm());
153-
throw new IllegalArgumentException("incoming last accepted term " + vote.getLastAcceptedTerm() + " higher than " +
154+
throw new ConsensusMessageRejectedException("incoming last accepted term " + vote.getLastAcceptedTerm() + " higher than " +
154155
"current last accepted term " + lastAcceptedTerm);
155156
}
156157

@@ -177,19 +178,19 @@ public Optional<PublishRequest<T>> handleVote(DiscoveryNode sourceNode, Vote vot
177178
*
178179
* @param publishRequest The PublishRequest received.
179180
* @return A PublishResponse which can be sent back to the sender of the PublishRequest.
180-
* @throws IllegalArgumentException if the arguments were incompatible with the current state of this object.
181+
* @throws ConsensusMessageRejectedException if the arguments were incompatible with the current state of this object.
181182
*/
182183
public PublishResponse handlePublishRequest(PublishRequest<T> publishRequest) {
183184
if (publishRequest.getTerm() != getCurrentTerm()) {
184185
logger.debug("handlePublishRequest: ignored publish request due to term mismatch (expected: [{}], actual: [{}])",
185186
getCurrentTerm(), publishRequest.getTerm());
186-
throw new IllegalArgumentException("incoming term " + publishRequest.getTerm() + " does not match current term " +
187+
throw new ConsensusMessageRejectedException("incoming term " + publishRequest.getTerm() + " does not match current term " +
187188
getCurrentTerm());
188189
}
189190
if (publishRequest.getSlot() != firstUncommittedSlot()) {
190191
logger.debug("handlePublishRequest: ignored publish request due to slot mismatch (expected: [{}], actual: [{}])",
191192
firstUncommittedSlot(), publishRequest.getSlot());
192-
throw new IllegalArgumentException("incoming slot " + publishRequest.getSlot() + " does not match current slot " +
193+
throw new ConsensusMessageRejectedException("incoming slot " + publishRequest.getSlot() + " does not match current slot " +
193194
firstUncommittedSlot());
194195
}
195196

@@ -209,13 +210,13 @@ public PublishResponse handlePublishRequest(PublishRequest<T> publishRequest) {
209210
* @param publishResponse The PublishResponse received.
210211
* @return An optional ApplyCommit which, if present, may be broadcast to all peers, indicating that this publication
211212
* has been accepted at a quorum of peers and is therefore committed.
212-
* @throws IllegalArgumentException if the arguments were incompatible with the current state of this object.
213+
* @throws ConsensusMessageRejectedException if the arguments were incompatible with the current state of this object.
213214
*/
214215
public Optional<ApplyCommit> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
215216
if (publishResponse.getTerm() != getCurrentTerm()) {
216217
logger.debug("handlePublishResponse: ignored publish response due to term mismatch (expected: [{}], actual: [{}])",
217218
getCurrentTerm(), publishResponse.getTerm());
218-
throw new IllegalArgumentException("incoming term " + publishResponse.getTerm()
219+
throw new ConsensusMessageRejectedException("incoming term " + publishResponse.getTerm()
219220
+ " does not match current term " + getCurrentTerm());
220221
}
221222
if (publishResponse.getSlot() != firstUncommittedSlot()) {
@@ -225,7 +226,7 @@ public Optional<ApplyCommit> handlePublishResponse(DiscoveryNode sourceNode, Pub
225226
logger.debug("handlePublishResponse: ignored publish response due to slot mismatch (expected: [{}], actual: [{}])",
226227
firstUncommittedSlot(), publishResponse.getSlot());
227228
}
228-
throw new IllegalArgumentException("incoming slot " + publishResponse.getSlot() + " does not match current slot " +
229+
throw new ConsensusMessageRejectedException("incoming slot " + publishResponse.getSlot() + " does not match current slot " +
229230
firstUncommittedSlot());
230231
}
231232

@@ -245,20 +246,20 @@ public Optional<ApplyCommit> handlePublishResponse(DiscoveryNode sourceNode, Pub
245246
* May be called on receipt of an ApplyCommit. Updates the committed state accordingly.
246247
*
247248
* @param applyCommit The ApplyCommit received.
248-
* @throws IllegalArgumentException if the arguments were incompatible with the current state of this object.
249+
* @throws ConsensusMessageRejectedException if the arguments were incompatible with the current state of this object.
249250
*/
250251
public void handleCommit(ApplyCommit applyCommit) {
251252
if (applyCommit.getTerm() != lastAcceptedTerm()) {
252253
logger.debug("handleCommit: ignored commit request due to term mismatch " +
253254
"(expected: [term {} slot {}], actual: [term {} slot {}])",
254255
lastAcceptedTerm(), firstUncommittedSlot(), applyCommit.getTerm(), applyCommit.getSlot());
255-
throw new IllegalArgumentException("incoming term " + applyCommit.getTerm() + " does not match last accepted term " +
256+
throw new ConsensusMessageRejectedException("incoming term " + applyCommit.getTerm() + " does not match last accepted term " +
256257
lastAcceptedTerm());
257258
}
258259
if (applyCommit.getSlot() != firstUncommittedSlot()) {
259260
logger.debug("handleCommit: ignored commit request due to slot mismatch (term {}, expected: [{}], actual: [{}])",
260261
lastAcceptedTerm(), firstUncommittedSlot(), applyCommit.getSlot());
261-
throw new IllegalArgumentException("incoming slot " + applyCommit.getSlot() + " does not match current slot " +
262+
throw new ConsensusMessageRejectedException("incoming slot " + applyCommit.getSlot() + " does not match current slot " +
262263
firstUncommittedSlot());
263264
}
264265

@@ -295,13 +296,13 @@ public T generateCatchup() {
295296
/**
296297
* May be called on receipt of a catch-up message containing the current committed state from a peer.
297298
*
298-
* @throws IllegalArgumentException if the arguments were incompatible with the current state of this object.
299+
* @throws ConsensusMessageRejectedException if the arguments were incompatible with the current state of this object.
299300
*/
300301
public void applyCatchup(T newCommittedState) {
301302
if (newCommittedState.getSlot() <= getCommittedState().getSlot()) {
302303
logger.debug("applyCatchup: ignored catch up request due to slot mismatch (expected: >[{}], actual: [{}])",
303304
getCommittedState().getSlot(), newCommittedState.getSlot());
304-
throw new IllegalArgumentException("incoming slot " + newCommittedState.getSlot() + " no higher than current slot " +
305+
throw new ConsensusMessageRejectedException("incoming slot " + newCommittedState.getSlot() + " no higher than current slot " +
305306
getCommittedState().getSlot());
306307
}
307308

@@ -320,16 +321,16 @@ public void applyCatchup(T newCommittedState) {
320321
*
321322
* @param diff The RSM transition on which to achieve consensus.
322323
* @return A PublishRequest that may be broadcast to all peers.
323-
* @throws IllegalArgumentException if the arguments were incompatible with the current state of this object.
324+
* @throws ConsensusMessageRejectedException if the arguments were incompatible with the current state of this object.
324325
*/
325326
public PublishRequest<T> handleClientValue(Diff<T> diff) {
326327
if (electionWon == false) {
327328
logger.debug("handleClientValue: ignored request as election not won");
328-
throw new IllegalArgumentException("election not won");
329+
throw new ConsensusMessageRejectedException("election not won");
329330
}
330331
if (publishPermitted == false) {
331332
logger.debug("handleClientValue: ignored request as publishing is not permitted");
332-
throw new IllegalArgumentException("publishing not permitted");
333+
throw new ConsensusMessageRejectedException("publishing not permitted");
333334
}
334335
assert lastAcceptedTerm() == NO_TERM; // see https://github.com/elastic/elasticsearch-formal-models/issues/24
335336

0 commit comments

Comments
 (0)