Skip to content

Commit b7b4e39

Browse files
committed
inline PublicationTargetStateMachine
1 parent 43e3399 commit b7b4e39

File tree

1 file changed

+48
-79
lines changed

1 file changed

+48
-79
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

+48-79
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private void onPossibleCompletion() {
9494

9595
if (timedOut == false) {
9696
for (final PublicationTarget target : publicationTargets) {
97-
if (target.publicationTargetStateMachine.isActive()) {
97+
if (target.isActive()) {
9898
return;
9999
}
100100
}
@@ -119,7 +119,7 @@ private void onPossibleCompletion() {
119119
private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() {
120120
if (timedOut == false) {
121121
for (final PublicationTarget target : publicationTargets) {
122-
if (target.publicationTargetStateMachine.isActive()) {
122+
if (target.isActive()) {
123123
return isCompleted == false;
124124
}
125125
}
@@ -135,10 +135,10 @@ private void onPossibleCommitFailure() {
135135

136136
final CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection();
137137
for (PublicationTarget publicationTarget : publicationTargets) {
138-
if (publicationTarget.publicationTargetStateMachine.mayCommitInFuture()) {
138+
if (publicationTarget.mayCommitInFuture()) {
139139
possiblySuccessfulNodes.addVote(publicationTarget.discoveryNode);
140140
} else {
141-
assert publicationTarget.publicationTargetStateMachine.isFailed() : publicationTarget.publicationTargetStateMachine;
141+
assert publicationTarget.isFailed() : publicationTarget;
142142
}
143143
}
144144

@@ -180,84 +180,37 @@ enum PublicationTargetState {
180180
APPLIED_COMMIT,
181181
}
182182

183-
static class PublicationTargetStateMachine {
184-
private PublicationTargetState state = PublicationTargetState.NOT_STARTED;
185-
186-
public void setState(PublicationTargetState newState) {
187-
switch (newState) {
188-
case NOT_STARTED:
189-
assert false : state + " -> " + newState;
190-
break;
191-
case SENT_PUBLISH_REQUEST:
192-
assert state == PublicationTargetState.NOT_STARTED : state + " -> " + newState;
193-
break;
194-
case WAITING_FOR_QUORUM:
195-
assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + newState;
196-
break;
197-
case SENT_APPLY_COMMIT:
198-
assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + newState;
199-
break;
200-
case APPLIED_COMMIT:
201-
assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + newState;
202-
break;
203-
case FAILED:
204-
assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + newState;
205-
break;
206-
}
207-
state = newState;
208-
}
209-
210-
public boolean isActive() {
211-
return state != PublicationTargetState.FAILED
212-
&& state != PublicationTargetState.APPLIED_COMMIT;
213-
}
214-
215-
public boolean isWaitingForQuorum() {
216-
return state == PublicationTargetState.WAITING_FOR_QUORUM;
217-
}
218-
219-
public boolean mayCommitInFuture() {
220-
return (state == PublicationTargetState.NOT_STARTED
221-
|| state == PublicationTargetState.SENT_PUBLISH_REQUEST
222-
|| state == PublicationTargetState.WAITING_FOR_QUORUM);
223-
}
224-
225-
public boolean isFailed() {
226-
return state == PublicationTargetState.FAILED;
227-
}
228-
229-
@Override
230-
public String toString() {
231-
return state.toString();
232-
}
233-
}
234-
235183
private class PublicationTarget {
236184
private final DiscoveryNode discoveryNode;
237-
private final PublicationTargetStateMachine publicationTargetStateMachine = new PublicationTargetStateMachine();
238185
private boolean ackIsPending = true;
186+
private PublicationTargetState state = PublicationTargetState.NOT_STARTED;
239187

240188
private PublicationTarget(DiscoveryNode discoveryNode) {
241189
this.discoveryNode = discoveryNode;
242190
}
243191

244192
@Override
245193
public String toString() {
246-
return discoveryNode.getId();
194+
return "PublicationTarget{" +
195+
"discoveryNode=" + discoveryNode +
196+
", state=" + state +
197+
", ackIsPending=" + ackIsPending +
198+
'}';
247199
}
248200

249201
public void sendPublishRequest() {
250-
if (publicationTargetStateMachine.isFailed()) {
202+
if (isFailed()) {
251203
return;
252204
}
253-
publicationTargetStateMachine.setState(PublicationTargetState.SENT_PUBLISH_REQUEST);
205+
assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST;
206+
state = PublicationTargetState.SENT_PUBLISH_REQUEST;
254207
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublicationTarget.PublishResponseHandler());
255208
// TODO Can this ^ fail with an exception? Target should be failed if so.
256209
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
257210
}
258211

259212
void handlePublishResponse(PublishResponse publishResponse) {
260-
assert publicationTargetStateMachine.isWaitingForQuorum() : publicationTargetStateMachine;
213+
assert isWaitingForQuorum() : this;
261214
logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode);
262215
if (applyCommitRequest.isPresent()) {
263216
sendApplyCommit();
@@ -272,23 +225,22 @@ void handlePublishResponse(PublishResponse publishResponse) {
272225
}
273226

274227
public void sendApplyCommit() {
275-
publicationTargetStateMachine.setState(PublicationTargetState.SENT_APPLY_COMMIT);
228+
assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + PublicationTargetState.SENT_APPLY_COMMIT;
229+
state = PublicationTargetState.SENT_APPLY_COMMIT;
276230
assert applyCommitRequest.isPresent();
277231
Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new PublicationTarget.ApplyCommitResponseHandler());
278232
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
279233
}
280234

281-
public boolean isWaitingForQuorum() {
282-
return publicationTargetStateMachine.isWaitingForQuorum();
283-
}
284-
285-
public boolean isActive() {
286-
return publicationTargetStateMachine.isActive();
235+
public void setAppliedCommit() {
236+
assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT;
237+
state = PublicationTargetState.APPLIED_COMMIT;
238+
ackOnce(null);
287239
}
288240

289241
public void setFailed(Exception e) {
290-
assert isActive();
291-
publicationTargetStateMachine.setState(PublicationTargetState.FAILED);
242+
assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + PublicationTargetState.FAILED;
243+
state = PublicationTargetState.FAILED;
292244
ackOnce(e);
293245
}
294246

@@ -307,11 +259,30 @@ private void ackOnce(Exception e) {
307259
}
308260
}
309261

262+
public boolean isActive() {
263+
return state != PublicationTargetState.FAILED
264+
&& state != PublicationTargetState.APPLIED_COMMIT;
265+
}
266+
267+
public boolean isWaitingForQuorum() {
268+
return state == PublicationTargetState.WAITING_FOR_QUORUM;
269+
}
270+
271+
public boolean mayCommitInFuture() {
272+
return (state == PublicationTargetState.NOT_STARTED
273+
|| state == PublicationTargetState.SENT_PUBLISH_REQUEST
274+
|| state == PublicationTargetState.WAITING_FOR_QUORUM);
275+
}
276+
277+
public boolean isFailed() {
278+
return state == PublicationTargetState.FAILED;
279+
}
280+
310281
private class PublishResponseHandler implements ActionListener<PublishWithJoinResponse> {
311282

312283
@Override
313284
public void onResponse(PublishWithJoinResponse response) {
314-
if (publicationTargetStateMachine.isFailed()) {
285+
if (isFailed()) {
315286
logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode);
316287
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
317288
return;
@@ -320,7 +291,8 @@ public void onResponse(PublishWithJoinResponse response) {
320291
// TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join.
321292
onPossibleJoin(discoveryNode, response);
322293

323-
publicationTargetStateMachine.setState(PublicationTargetState.WAITING_FOR_QUORUM);
294+
assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
295+
state = PublicationTargetState.WAITING_FOR_QUORUM;
324296
handlePublishResponse(response.getPublishResponse());
325297

326298
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
@@ -335,10 +307,9 @@ public void onFailure(Exception e) {
335307
} else {
336308
logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp);
337309
}
338-
publicationTargetStateMachine.setState(PublicationTargetState.FAILED);
310+
setFailed(e);
339311
onPossibleCommitFailure();
340312
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
341-
ackOnce(exp);
342313
}
343314

344315
}
@@ -347,15 +318,14 @@ private class ApplyCommitResponseHandler implements ActionListener<TransportResp
347318

348319
@Override
349320
public void onResponse(TransportResponse.Empty ignored) {
350-
if (publicationTargetStateMachine.isFailed()) {
321+
if (isFailed()) {
351322
logger.debug("ApplyCommitResponseHandler.handleResponse: already failed, ignoring response from [{}]",
352323
discoveryNode);
353324
return;
354325
}
355-
publicationTargetStateMachine.setState(PublicationTargetState.APPLIED_COMMIT);
326+
setAppliedCommit();
356327
onPossibleCompletion();
357328
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
358-
ackOnce(null);
359329
}
360330

361331
@Override
@@ -367,10 +337,9 @@ public void onFailure(Exception e) {
367337
} else {
368338
logger.debug(() -> new ParameterizedMessage("ApplyCommitResponseHandler: [{}] failed", discoveryNode), exp);
369339
}
370-
publicationTargetStateMachine.setState(PublicationTargetState.FAILED);
340+
setFailed(e);
371341
onPossibleCompletion();
372342
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
373-
ackOnce(exp);
374343
}
375344
}
376345
}

0 commit comments

Comments
 (0)