22
22
import org .apache .logging .log4j .Logger ;
23
23
import org .apache .logging .log4j .message .ParameterizedMessage ;
24
24
import org .apache .lucene .document .Field ;
25
- import org .apache .lucene .document .LongPoint ;
26
25
import org .apache .lucene .document .NumericDocValuesField ;
27
26
import org .apache .lucene .index .DirectoryReader ;
28
27
import org .apache .lucene .index .IndexCommit ;
38
37
import org .apache .lucene .index .SoftDeletesRetentionMergePolicy ;
39
38
import org .apache .lucene .index .Term ;
40
39
import org .apache .lucene .search .IndexSearcher ;
41
- import org .apache .lucene .search .Query ;
42
40
import org .apache .lucene .search .ReferenceManager ;
43
41
import org .apache .lucene .search .SearcherFactory ;
44
42
import org .apache .lucene .search .SearcherManager ;
@@ -153,6 +151,7 @@ public class InternalEngine extends Engine {
153
151
private final CounterMetric numDocUpdates = new CounterMetric ();
154
152
private final NumericDocValuesField softDeleteField = Lucene .newSoftDeleteField ();
155
153
private final boolean softDeleteEnabled ;
154
+ private final SoftDeletesPolicy softDeletesPolicy ;
156
155
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener ;
157
156
158
157
/**
@@ -177,7 +176,6 @@ public InternalEngine(EngineConfig engineConfig) {
177
176
if (engineConfig .isAutoGeneratedIDsOptimizationEnabled () == false ) {
178
177
maxUnsafeAutoIdTimestamp .set (Long .MAX_VALUE );
179
178
}
180
- this .softDeleteEnabled = engineConfig .getIndexSettings ().isSoftDeleteEnabled ();
181
179
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy (
182
180
engineConfig .getIndexSettings ().getTranslogRetentionSize ().getBytes (),
183
181
engineConfig .getIndexSettings ().getTranslogRetentionAge ().getMillis ()
@@ -199,8 +197,10 @@ public InternalEngine(EngineConfig engineConfig) {
199
197
assert translog .getGeneration () != null ;
200
198
this .translog = translog ;
201
199
this .localCheckpointTracker = createLocalCheckpointTracker (localCheckpointTrackerSupplier );
200
+ this .softDeleteEnabled = engineConfig .getIndexSettings ().isSoftDeleteEnabled ();
201
+ this .softDeletesPolicy = newSoftDeletesPolicy ();
202
202
this .combinedDeletionPolicy =
203
- new CombinedDeletionPolicy (logger , translogDeletionPolicy , translog ::getLastSyncedGlobalCheckpoint );
203
+ new CombinedDeletionPolicy (logger , translogDeletionPolicy , softDeletesPolicy , translog ::getLastSyncedGlobalCheckpoint );
204
204
writer = createWriter ();
205
205
bootstrapAppendOnlyInfoFromWriter (writer );
206
206
historyUUID = loadHistoryUUID (writer );
@@ -257,6 +257,18 @@ private LocalCheckpointTracker createLocalCheckpointTracker(
257
257
return localCheckpointTrackerSupplier .apply (maxSeqNo , localCheckpoint );
258
258
}
259
259
260
+ private SoftDeletesPolicy newSoftDeletesPolicy () throws IOException {
261
+ final Map <String , String > commitUserData = store .readLastCommittedSegmentsInfo ().userData ;
262
+ final long lastMinRetainedSeqNo ;
263
+ if (commitUserData .containsKey (Engine .MIN_RETAINED_SEQNO )) {
264
+ lastMinRetainedSeqNo = Long .parseLong (commitUserData .get (Engine .MIN_RETAINED_SEQNO ));
265
+ } else {
266
+ lastMinRetainedSeqNo = Long .parseLong (commitUserData .get (SequenceNumbers .MAX_SEQ_NO )) + 1 ;
267
+ }
268
+ return new SoftDeletesPolicy (translog ::getLastSyncedGlobalCheckpoint , lastMinRetainedSeqNo ,
269
+ engineConfig .getIndexSettings ().getSoftDeleteRetentionOperations ());
270
+ }
271
+
260
272
/**
261
273
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
262
274
* The main purpose for this is that if we have external refreshes happening we don't issue extra
@@ -468,18 +480,39 @@ public void syncTranslog() throws IOException {
468
480
}
469
481
470
482
@ Override
471
- public Closeable acquireTranslogRetentionLock () {
472
- return getTranslog ().acquireRetentionLock ( );
483
+ public Translog . Snapshot newTranslogSnapshotBetween ( long minSeqNo , long maxSeqNo ) throws IOException {
484
+ return getTranslog ().getSnapshotBetween ( minSeqNo , maxSeqNo );
473
485
}
474
486
487
+ /**
488
+ * Creates a new history snapshot for reading operations since the provided seqno.
489
+ * The returned snapshot can be retrieved from either Lucene index or translog files.
490
+ */
475
491
@ Override
476
- public Translog .Snapshot newTranslogSnapshotBetween (long minSeqNo , long maxSeqNo ) throws IOException {
477
- return getTranslog ().getSnapshotBetween (minSeqNo , maxSeqNo );
492
+ public Translog .Snapshot readHistoryOperations (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
493
+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
494
+ return newLuceneChangesSnapshot (source , mapperService , Math .max (0 , startingSeqNo ), Long .MAX_VALUE , false );
495
+ } else {
496
+ return getTranslog ().getSnapshotBetween (startingSeqNo , Long .MAX_VALUE );
497
+ }
478
498
}
479
499
500
+ /**
501
+ * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
502
+ */
480
503
@ Override
481
- public int estimateTranslogOperationsFromMinSeq (long minSeqNo ) {
482
- return getTranslog ().estimateTotalOperationsFromMinSeq (minSeqNo );
504
+ public int estimateNumberOfHistoryOperations (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
505
+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
506
+ try (Translog .Snapshot snapshot =
507
+ newLuceneChangesSnapshot (source , mapperService , Math .max (0 , startingSeqNo ), Long .MAX_VALUE , false )) {
508
+ return snapshot .totalOperations ();
509
+ } catch (IOException ex ) {
510
+ maybeFailEngine (source , ex );
511
+ throw ex ;
512
+ }
513
+ } else {
514
+ return getTranslog ().estimateTotalOperationsFromMinSeq (startingSeqNo );
515
+ }
483
516
}
484
517
485
518
@ Override
@@ -2070,8 +2103,8 @@ private IndexWriterConfig getIndexWriterConfig() {
2070
2103
MergePolicy mergePolicy = config ().getMergePolicy ();
2071
2104
if (softDeleteEnabled ) {
2072
2105
iwc .setSoftDeletesField (Lucene .SOFT_DELETE_FIELD );
2073
- mergePolicy = new RecoverySourcePruneMergePolicy (SourceFieldMapper .RECOVERY_SOURCE_NAME , this :: softDeletesRetentionQuery ,
2074
- new SoftDeletesRetentionMergePolicy (Lucene .SOFT_DELETE_FIELD , this :: softDeletesRetentionQuery , mergePolicy ));
2106
+ mergePolicy = new RecoverySourcePruneMergePolicy (SourceFieldMapper .RECOVERY_SOURCE_NAME , softDeletesPolicy :: getRetentionQuery ,
2107
+ new SoftDeletesRetentionMergePolicy (Lucene .SOFT_DELETE_FIELD , softDeletesPolicy :: getRetentionQuery , mergePolicy ));
2075
2108
}
2076
2109
iwc .setMergePolicy (new ElasticsearchMergePolicy (mergePolicy ));
2077
2110
iwc .setSimilarity (engineConfig .getSimilarity ());
@@ -2084,20 +2117,6 @@ private IndexWriterConfig getIndexWriterConfig() {
2084
2117
return iwc ;
2085
2118
}
2086
2119
2087
- /**
2088
- * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
2089
- */
2090
- private Query softDeletesRetentionQuery () {
2091
- ensureOpen ();
2092
- // TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
2093
- final long retainedExtraOps = engineConfig .getIndexSettings ().getSoftDeleteRetentionOperations ();
2094
- // Prefer using the global checkpoint which is persisted on disk than an in-memory value.
2095
- // If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
2096
- // then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
2097
- final long persistedGlobalCheckpoint = translog .getLastSyncedGlobalCheckpoint ();
2098
- return LongPoint .newRangeQuery (SeqNoFieldMapper .NAME , persistedGlobalCheckpoint + 1 - retainedExtraOps , Long .MAX_VALUE );
2099
- }
2100
-
2101
2120
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
2102
2121
static final class SearchFactory extends EngineSearcherFactory {
2103
2122
private final Engine .Warmer warmer ;
@@ -2284,6 +2303,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
2284
2303
commitData .put (SequenceNumbers .MAX_SEQ_NO , Long .toString (localCheckpointTracker .getMaxSeqNo ()));
2285
2304
commitData .put (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID , Long .toString (maxUnsafeAutoIdTimestamp .get ()));
2286
2305
commitData .put (HISTORY_UUID_KEY , historyUUID );
2306
+ if (softDeleteEnabled ) {
2307
+ commitData .put (Engine .MIN_RETAINED_SEQNO , Long .toString (softDeletesPolicy .getMinRetainedSeqNo ()));
2308
+ }
2287
2309
logger .trace ("committing writer with commit data [{}]" , commitData );
2288
2310
return commitData .entrySet ().iterator ();
2289
2311
});
@@ -2339,6 +2361,8 @@ public void onSettingsChanged() {
2339
2361
final IndexSettings indexSettings = engineConfig .getIndexSettings ();
2340
2362
translogDeletionPolicy .setRetentionAgeInMillis (indexSettings .getTranslogRetentionAge ().getMillis ());
2341
2363
translogDeletionPolicy .setRetentionSizeInBytes (indexSettings .getTranslogRetentionSize ().getBytes ());
2364
+
2365
+ softDeletesPolicy .setRetentionOperations (indexSettings .getSoftDeleteRetentionOperations ());
2342
2366
}
2343
2367
2344
2368
public MergeStats getMergeStats () {
@@ -2452,6 +2476,41 @@ public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService m
2452
2476
}
2453
2477
}
2454
2478
2479
+ @ Override
2480
+ public boolean hasCompleteOperationHistory (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
2481
+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
2482
+ return getMinRetainedSeqNo () <= startingSeqNo ;
2483
+ } else {
2484
+ final long currentLocalCheckpoint = getLocalCheckpointTracker ().getCheckpoint ();
2485
+ final LocalCheckpointTracker tracker = new LocalCheckpointTracker (startingSeqNo , startingSeqNo - 1 );
2486
+ try (Translog .Snapshot snapshot = getTranslog ().getSnapshotBetween (startingSeqNo , Long .MAX_VALUE )) {
2487
+ Translog .Operation operation ;
2488
+ while ((operation = snapshot .next ()) != null ) {
2489
+ if (operation .seqNo () != SequenceNumbers .UNASSIGNED_SEQ_NO ) {
2490
+ tracker .markSeqNoAsCompleted (operation .seqNo ());
2491
+ }
2492
+ }
2493
+ }
2494
+ return tracker .getCheckpoint () >= currentLocalCheckpoint ;
2495
+ }
2496
+ }
2497
+
2498
+ /**
2499
+ * Returns the minimum seqno that is retained in the Lucene index.
2500
+ * Operations whose seq# are at least this value should exist in the Lucene index.
2501
+ */
2502
+ final long getMinRetainedSeqNo () {
2503
+ assert softDeleteEnabled : Thread .currentThread ().getName ();
2504
+ return softDeletesPolicy .getMinRetainedSeqNo ();
2505
+ }
2506
+
2507
+ @ Override
2508
+ public Closeable acquireRetentionLockForPeerRecovery () {
2509
+ final Closeable translogLock = translog .acquireRetentionLock ();
2510
+ final Releasable softDeletesLock = softDeletesPolicy .acquireRetentionLock ();
2511
+ return () -> IOUtils .close (translogLock , softDeletesLock );
2512
+ }
2513
+
2455
2514
@ Override
2456
2515
public boolean isRecovering () {
2457
2516
return pendingTranslogRecovery .get ();
0 commit comments