25
25
import org .elasticsearch .discovery .zen2 .Messages .ApplyCommit ;
26
26
import org .elasticsearch .discovery .zen2 .Messages .PublishRequest ;
27
27
import org .elasticsearch .discovery .zen2 .Messages .PublishResponse ;
28
- import org .elasticsearch .discovery .zen2 .Messages .SlotTermDiff ;
29
28
import org .elasticsearch .discovery .zen2 .Messages .Vote ;
30
29
31
30
import java .util .HashMap ;
@@ -48,15 +47,15 @@ public class ConsensusState<T extends ConsensusState.CommittedState> extends Abs
48
47
// persisted state
49
48
private long currentTerm ;
50
49
private T committedState ;
51
- private Optional <SlotTermDiff <T >> acceptedState ;
50
+ private Optional <AcceptedState <T >> acceptedState ;
52
51
// transient state
53
52
private boolean electionWon ;
54
53
private boolean electionValueForced ;
55
54
private NodeCollection joinVotes ;
56
55
private boolean publishPermitted ;
57
56
private NodeCollection publishVotes ;
58
57
59
- public ConsensusState (Settings settings , long currentTerm , T committedState , Optional <SlotTermDiff <T >> acceptedState ,
58
+ public ConsensusState (Settings settings , long currentTerm , T committedState , Optional <AcceptedState <T >> acceptedState ,
60
59
Persistence <T > persistence ) {
61
60
// TODO idea: just pass in a Persistence and let it provide the persisted state.
62
61
@@ -76,8 +75,7 @@ public ConsensusState(Settings settings, long currentTerm, T committedState, Opt
76
75
this .publishVotes = new NodeCollection ();
77
76
78
77
assert currentTerm >= 0 ;
79
- assert acceptedState .isPresent () == false || acceptedState .get ().getTerm () <= currentTerm ;
80
- assert acceptedState .isPresent () == false || acceptedState .get ().getSlot () <= firstUncommittedSlot ();
78
+ assert lastAcceptedTerm () <= currentTerm ;
81
79
}
82
80
83
81
public T getCommittedState () {
@@ -98,8 +96,8 @@ public long firstUncommittedSlot() {
98
96
return committedState .getSlot () + 1 ;
99
97
}
100
98
101
- public long lastAcceptedTermInSlot () {
102
- if (acceptedState .isPresent () && firstUncommittedSlot () == acceptedState . get (). getSlot () ) {
99
+ public long lastAcceptedTerm () {
100
+ if (acceptedState .isPresent ()) {
103
101
return acceptedState .get ().getTerm ();
104
102
} else {
105
103
return NO_TERM ;
@@ -131,7 +129,7 @@ public Vote handleStartVote(long newTerm) {
131
129
publishPermitted = true ;
132
130
publishVotes = new NodeCollection ();
133
131
134
- return new Vote (firstUncommittedSlot (), currentTerm , lastAcceptedTermInSlot ());
132
+ return new Vote (firstUncommittedSlot (), currentTerm , lastAcceptedTerm ());
135
133
}
136
134
137
135
/**
@@ -155,7 +153,7 @@ public Optional<PublishRequest<T>> handleVote(DiscoveryNode sourceNode, Vote vot
155
153
firstUncommittedSlot ());
156
154
}
157
155
if (vote .getFirstUncommittedSlot () == firstUncommittedSlot () && vote .getLastAcceptedTerm () != NO_TERM ) {
158
- final long lastAcceptedTermInSlot = lastAcceptedTermInSlot ();
156
+ final long lastAcceptedTermInSlot = lastAcceptedTerm ();
159
157
if (vote .getLastAcceptedTerm () > lastAcceptedTermInSlot ) {
160
158
logger .debug ("handleVote: ignored vote as voter has better last accepted term (expected: <=[{}], actual: [{}])" ,
161
159
lastAcceptedTermInSlot , vote .getLastAcceptedTerm ());
@@ -212,8 +210,9 @@ public PublishResponse handlePublishRequest(PublishRequest<T> publishRequest) {
212
210
213
211
logger .trace ("handlePublishRequest: storing publish request for slot [{}] and term [{}]" ,
214
212
publishRequest .getSlot (), publishRequest .getTerm ());
215
- persistence .persistAcceptedState (publishRequest );
216
- acceptedState = Optional .of (publishRequest );
213
+ final AcceptedState <T > termDiff = publishRequest .getAcceptedState ();
214
+ persistence .persistAcceptedState (termDiff );
215
+ acceptedState = Optional .of (termDiff );
217
216
218
217
return new PublishResponse (publishRequest .getSlot (), publishRequest .getTerm ());
219
218
}
@@ -264,11 +263,11 @@ public Optional<ApplyCommit> handlePublishResponse(DiscoveryNode sourceNode, Pub
264
263
* @throws IllegalArgumentException if the arguments were incompatible with the current state of this object.
265
264
*/
266
265
public void handleCommit (ApplyCommit applyCommit ) {
267
- if (applyCommit .getTerm () != lastAcceptedTermInSlot ()) {
266
+ if (applyCommit .getTerm () != lastAcceptedTerm ()) {
268
267
logger .debug ("handleCommit: ignored commit request due to term mismatch (expected: [{}], actual: [{}])" ,
269
- lastAcceptedTermInSlot (), applyCommit .getTerm ());
268
+ lastAcceptedTerm (), applyCommit .getTerm ());
270
269
throw new IllegalArgumentException ("incoming term " + applyCommit .getTerm () + " does not match last accepted term " +
271
- lastAcceptedTermInSlot ());
270
+ lastAcceptedTerm ());
272
271
}
273
272
if (applyCommit .getSlot () != firstUncommittedSlot ()) {
274
273
logger .debug ("handleCommit: ignored commit request due to slot mismatch (expected: [{}], actual: [{}])" ,
@@ -280,13 +279,13 @@ public void handleCommit(ApplyCommit applyCommit) {
280
279
logger .trace ("handleCommit: applying commit request for slot [{}]" , applyCommit .getSlot ());
281
280
282
281
assert acceptedState .isPresent ();
283
- assert acceptedState .get ().getSlot () == committedState .getSlot () + 1 ;
284
282
final T newCommittedState = acceptedState .get ().getDiff ().apply (committedState );
285
283
logger .trace ("handleCommit: newCommittedState = [{}]" , newCommittedState );
286
284
assert newCommittedState .getSlot () == committedState .getSlot () + 1 ;
287
285
288
286
persistence .persistCommittedState (newCommittedState );
289
287
committedState = newCommittedState ;
288
+ acceptedState = Optional .empty ();
290
289
publishPermitted = true ;
291
290
electionValueForced = false ;
292
291
publishVotes = new NodeCollection ();
@@ -318,6 +317,7 @@ public void applyCatchup(T newCommittedState) {
318
317
logger .debug ("applyCatchup: applying catch up for slot [{}]" , newCommittedState .getSlot ());
319
318
persistence .persistCommittedState (newCommittedState );
320
319
committedState = newCommittedState ;
320
+ acceptedState = Optional .empty ();
321
321
electionValueForced = false ;
322
322
joinVotes = new NodeCollection ();
323
323
electionWon = false ;
@@ -360,12 +360,57 @@ public interface CommittedState {
360
360
NodeCollection getVotingNodes ();
361
361
}
362
362
363
+ public static class AcceptedState <T > {
364
+ protected final long term ;
365
+ protected final Diff <T > diff ;
366
+
367
+ public AcceptedState (long term , Diff <T > diff ) {
368
+ this .term = term ;
369
+ this .diff = diff ;
370
+ }
371
+
372
+ public long getTerm () {
373
+ return term ;
374
+ }
375
+
376
+ public Diff <T > getDiff () {
377
+ return diff ;
378
+ }
379
+
380
+ @ Override
381
+ public boolean equals (Object o ) {
382
+ if (this == o ) return true ;
383
+ if (o == null || getClass () != o .getClass ()) return false ;
384
+
385
+ AcceptedState <?> termDiff = (AcceptedState <?>) o ;
386
+
387
+ if (term != termDiff .term ) return false ;
388
+ return diff .equals (termDiff .diff );
389
+ }
390
+
391
+ @ Override
392
+ public int hashCode () {
393
+ int result = (int ) (term ^ (term >>> 32 ));
394
+ result = 31 * result + diff .hashCode ();
395
+ return result ;
396
+ }
397
+
398
+
399
+ @ Override
400
+ public String toString () {
401
+ return "AcceptedState{" +
402
+ "term=" + term +
403
+ ", diff=" + diff +
404
+ '}' ;
405
+ }
406
+ }
407
+
363
408
public interface Persistence <T > {
364
409
void persistCurrentTerm (long currentTerm );
365
410
366
411
void persistCommittedState (T committedState );
367
412
368
- void persistAcceptedState (SlotTermDiff <T > slotTermDiff );
413
+ void persistAcceptedState (AcceptedState <T > termDiff );
369
414
}
370
415
371
416
/**
0 commit comments