-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Keeps index commits up to the current global checkpoint #27367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
We need to keep index commits and translog operations up to the current global checkpoint for the operation-based recovery. These can be done by introducing a new deletion policy. The new policy keeps the latest (eg. youngest) commit whose local checkpoint is not greater than the current global checkpoint, and also keeps all subsequent commits. Once those commits are kept, a CombinedDeletionPolicy will retain translog operations at least up to the current global checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great I left some comments
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), | ||
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() | ||
); | ||
this.deletionPolicy = new CombinedDeletionPolicy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that you had to do this sucks. I think we should make createWriter
static and pass all the things to it that it needs rather than expecting members to be initialized. this should be done in a followup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I will address this in a followup.
@Override | ||
public void onCommit(List<? extends IndexCommit> commits) throws IOException { | ||
final long globalCheckpoint = globalCheckpointSupplier.getAsLong(); | ||
for (int i = commits.size() - 1; i >= 0; i--) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep this simple and just try to find the offset we need to delete from and then exit the loop. Then do the delete in a second loop outside of it. It would make it much simpler to read. Also I think we can safely iterate from 0 to N for simplicity. This is not perf critical in that place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 9ee44d2
@s1monw, I have addressed your comments. Could you please take another look? Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments.
import java.util.function.LongSupplier; | ||
|
||
/** | ||
* An {@link IndexDeletionPolicy} keeps the latest (eg. youngest) commit whose local checkpoint is not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the wording in this Javadoc is confusing. We want to keep the oldest commits (not the latest, not the youngest).
} | ||
} | ||
|
||
// commits are sorted by age (the 0th one is the oldest commit). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is better placed inside the method.
return -1; | ||
} | ||
|
||
private static long localCheckpoint(IndexCommit commit) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be called localCheckpointFromCommit
yet I question if a method is really needed?
|
||
public class KeepUntilGlobalCheckpointDeletionPolicyTests extends EngineTestCase { | ||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
final AtomicInteger docId = new AtomicInteger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these really have to be test instance fields? Can they be passed around? They are easy enough to construct.
@jasontedor I have addressed your feedbacks. Could you please have another quick look? Thank you. |
import static org.hamcrest.Matchers.hasSize; | ||
|
||
public class KeepUntilGlobalCheckpointDeletionPolicyTests extends EngineTestCase { | ||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, I was referring to both globalCheckpoint
and docId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @jasontedor, I overlooked your comment. I pushed a8b3dd2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
I think the implementation is incorrect. We should compare the current global checkpoint to the max sequence number in an index commit. |
You are correct @dnhatn. |
Thanks @jasontedor, I will update the PR. |
@jasontedor, I have updated the implementation. Could you please take a look? Thank you! |
Today, we keep only the last index commit and use only it to calculate the minimum required translog generation. This may no longer be correct as we introduced a new deletion policy which keeps multiple index commits. This changes adjust the `CombinedDeletionPolicy` so that it can work correctly with a new index deletion policy. Relates to elastic#10708, elastic#27367
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @dnhatn . I left a very small request to limit the scenarios in which we accept not finding a proper commit point in the deletion policy. Looks great.
We also need to discuss when and whether we clean up commits as soon as we can. Currently, I expect us to keep two commits most of the time. This is because that when we commit the global checkpoint will likely be lagging. It will quickly catch up but we have no mechanism to clean up the unneeded old commit. To be clear - I don't think this is necessarily bad and should also by no mean stop this PR. I'm just double checking if that has gotten some thought.
// Commits are sorted by age (the 0th one is the oldest commit). | ||
for (int i = commits.size() - 1; i >= 0; i--) { | ||
final IndexCommit commit = commits.get(i); | ||
long maxSeqNoFromCommit = Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
watch out for legacy indices - this needs to go to 6.x and thus needs to read 5.x commits. I'm fine with doing the backport as a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b587e20
return i; | ||
} | ||
} | ||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should hunt down when this is possible - I can't think of a case where an existing (i.e. was fully initialized and it's translog committed) has this trait. A peer recovery index (where we create the translog) may have unknown GCP (and a single commit point which we can assert on). Instead of blindly accepting the fact that we have found no commit point, can we maybe rely on the fact that the GCP is UNASSIGNED_SEQ_NO
and otherwise throw an exception? Re empty indices - I'm not sure about the initialization order - we should check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was not possible before but can happen with a new limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may happen when we upgrade from the previous 6.x versions. In the previous 6.x, we keep only the last commit - the max_seq_no of this commit is likely greater than the global checkpoint if indexing operations are in progress. Therefore, after upgrading to this version, we may not find a proper commit (eg. whose max_seq_no is less or equal to the current global checkpoint) with an old index until we reserve proper commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Can we assert that's the case? i.e. give the deletion policy the index creation version and assert that the index was created before 6.x and that the commit has MAX_SEQ_NO in it? also please add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The also can happen in peer-recovery. If the file-based happens, a replica will be received the latest commit from a primary. However, that commit may not be a safe commit if writes are in progress.
I've documented these two cases. I think we need to discuss on the assertion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some very minor comments. One more iteration and I think we're good.
final Map<String, String> commitUserData = commits.get(i).getUserData(); | ||
// Index from 5.x does not contain MAX_SEQ_NO, we should keep either the more recent commit with MAX_SEQ_NO, | ||
// or the last commit. | ||
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is correct? we shouldn't we keep the commit with no seq# info? this means it was done before sequence numbers were done and implicitly doesn't have ops above the global checkpoint? this also means we can just return i?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both Math.min(i + 1, commits.size() - 1)
and i
are correct. Returning i
is much simpler, but Math.min(i + 1, commits.size() - 1)
will clean up unneeded commits sooner. It can be explained as follows. We have one commit (c1) without max_seq_no (from 5.x), then we have a new commit (c2) with max_seq_no. We don't need to keep the former commit (c1) if we keep c2. Returning i
will keep both commits, but Math.min(i + 1, commits.size() - 1)
will keep only c2.
However, I believe that I over-thinked about it. I pushed 3d5d323 to remove this optimization.
return i; | ||
} | ||
} | ||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Can we assert that's the case? i.e. give the deletion policy the index creation version and assert that the index was created before 6.x and that the commit has MAX_SEQ_NO in it? also please add a comment.
@bleskes Could you please take another look? I've addressed your comments. |
Today, we keep only the last index commit and use only it to calculate the minimum required translog generation. This may no longer be correct as we introduced a new deletion policy which keeps multiple index commits. This change adjusts the CombinedDeletionPolicy so that it can work correctly with a new index deletion policy. Relates to #10708, #27367
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the extra iterations.
verify(commit2, times(0)).delete(); | ||
verify(commit3, times(0)).delete(); | ||
|
||
deletionPolicy.onCommit(Arrays.asList(commit1, commit2, commit3)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - add a check that we have a good commit with a sequence numbers we keep that and drop the old ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the test.
One more thought. Now that #27456 is merged, can we add on the engine level that these components work correctly together? i.e., that we keep commits until the global checkpoint is incremented and that translog files are maintained etc. |
Today, we keep only the last index commit and use only it to calculate the minimum required translog generation. This may no longer be correct as we introduced a new deletion policy which keeps multiple index commits. This change adjusts the CombinedDeletionPolicy so that it can work correctly with a new index deletion policy. Relates to #10708, #27367
@bleskes, Unfortunately, these components don't work together. We've used a single commit assumption in some places. Both elasticsearch/core/src/main/java/org/elasticsearch/index/translog/Translog.java Lines 371 to 383 in 89ba899
I am addressing these. Should I include it into this PR or make a separate PR before this? Thank you. |
Aye. That's a good catch. +1 to fix this in a different PR first. You already mentioned one problem in #27456 concerning snapshots, which we have solved but there's also a problem with the flushing frequentie see IndexShard#shouldFlush() (I think we will now flush on every write until the global checkpoint advances). Did you see anything else? |
You're correct. I will fix it using the translog generation of the last commit to calculate
The recovering commit (eg. the first commit) of a shrunk index does not have elasticsearch/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java Lines 170 to 177 in 303e0c0
|
Yeah, that's a bit funky. I do think it's OK as we always keep all commits when we don't have a global checkpoint. Once we have it we'll clean it up. |
Today we can not distinguish between index commits that are kept by the primary policy and those are kept for snapshotting with a SnapshotDeletionPolicy. Since we enclose a SnapshotDeletionPolicy in a CombinedDeletionPolicy, we also we can not distinguish between those with a CombinedDeletionPolicy. This can be a problem if we update the TranslogDeletionPolicy to keep the minimum translog generation of undeleted index commits as we may keep the translog of a snapshotting commit even though it is "deleted" by the primary policy. To solve this, we enclose a CombinedDeletionPolicy in a SnapshotDeletionPolicy and track if an index commit is deleted by the primary policy, then use that value to maintain translog rather than the actual deletion of an index commit. Relates elastic#27456 elastic#27367
This is superseded by #27606 |
We need to keep index commits and translog operations up to the current global checkpoint for the operation-based recovery. These can be done by introducing a new deletion policy. The new policy keeps the oldest commit whose local checkpoint is not greater than the current global checkpoint, and also keeps all subsequent commits. Once those commits are kept, a CombinedDeletionPolicy will retain translog operations at least up to the current global checkpoint.
Relates to #10708