Skip to content

Commit c33be29

Browse files
author
Hendrik Muhs
committed
[Transform] automatic deletion of old checkpoints (#49496)
add automatic deletion of old checkpoints based on count and time
1 parent 602369c commit c33be29

File tree

7 files changed

+262
-32
lines changed

7 files changed

+262
-32
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
5050
// checkpoint of the indexes (sequence id's)
5151
public static final ParseField INDICES = new ParseField("indices");
5252

53-
private static final String NAME = "data_frame_transform_checkpoint";
53+
public static final String NAME = "data_frame_transform_checkpoint";
5454

5555
private static final ConstructingObjectParser<TransformCheckpoint, Void> STRICT_PARSER = createParser(false);
5656
private static final ConstructingObjectParser<TransformCheckpoint, Void> LENIENT_PARSER = createParser(true);

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.index.query.BoolQueryBuilder;
4040
import org.elasticsearch.index.query.QueryBuilder;
4141
import org.elasticsearch.index.query.QueryBuilders;
42+
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
4243
import org.elasticsearch.index.reindex.BulkByScrollResponse;
4344
import org.elasticsearch.index.reindex.DeleteByQueryAction;
4445
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
@@ -63,6 +64,7 @@
6364
import java.util.List;
6465
import java.util.Set;
6566

67+
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
6668
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
6769
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
6870

@@ -146,16 +148,18 @@ public void updateTransformConfiguration(
146148

147149
@Override
148150
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
149-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
151+
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
152+
deleteByQueryRequest.indices(
150153
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
151154
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
152-
).setQuery(
155+
);
156+
deleteByQueryRequest.setQuery(
153157
QueryBuilders.constantScoreQuery(
154158
QueryBuilders.boolQuery()
155159
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
156160
.filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)))
157161
)
158-
).setIndicesOptions(IndicesOptions.lenientExpandOpen());
162+
);
159163

160164
executeAsyncWithOrigin(
161165
client,
@@ -177,17 +181,18 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener<
177181

178182
@Override
179183
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
180-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
184+
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
185+
deleteByQueryRequest.indices(
181186
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
182187
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
183-
).setQuery(
188+
);
189+
deleteByQueryRequest.setQuery(
184190
QueryBuilders.constantScoreQuery(
185191
QueryBuilders.boolQuery()
186192
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
187193
.filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))
188194
)
189-
).setIndicesOptions(IndicesOptions.lenientExpandOpen());
190-
195+
);
191196
executeAsyncWithOrigin(
192197
client,
193198
TRANSFORM_ORIGIN,
@@ -206,6 +211,41 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener
206211
);
207212
}
208213

214+
@Override
215+
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
216+
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
217+
deleteByQueryRequest.indices(
218+
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
219+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
220+
);
221+
deleteByQueryRequest.setQuery(
222+
QueryBuilders.boolQuery()
223+
.filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId))
224+
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME))
225+
.filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(deleteCheckpointsBelow))
226+
.filter(
227+
QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()).lt(deleteOlderThan).format("epoch_millis")
228+
)
229+
);
230+
logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest());
231+
executeAsyncWithOrigin(
232+
client,
233+
TRANSFORM_ORIGIN,
234+
DeleteByQueryAction.INSTANCE,
235+
deleteByQueryRequest,
236+
ActionListener.wrap(response -> {
237+
if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
238+
Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
239+
listener.onFailure(
240+
new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
241+
);
242+
return;
243+
}
244+
listener.onResponse(response.getDeleted());
245+
}, listener::onFailure)
246+
);
247+
}
248+
209249
private void putTransformConfiguration(
210250
TransformConfig transformConfig,
211251
DocWriteRequest.OpType optType,
@@ -419,9 +459,7 @@ public void expandTransformIds(
419459

420460
@Override
421461
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
422-
DeleteByQueryRequest request = new DeleteByQueryRequest().setAbortOnVersionConflict(false); // since these documents are not
423-
// updated, a conflict just means it was
424-
// deleted previously
462+
DeleteByQueryRequest request = createDeleteByQueryRequest();
425463

426464
request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED);
427465
QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId);
@@ -675,4 +713,22 @@ private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrol
675713
}
676714
return new Tuple<>(status, reason);
677715
}
716+
717+
/**
718+
* Create DBQ request with good defaults
719+
*
720+
* @return new DeleteByQueryRequest with some defaults set
721+
*/
722+
private static DeleteByQueryRequest createDeleteByQueryRequest() {
723+
724+
DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest();
725+
726+
deleteByQuery.setAbortOnVersionConflict(false)
727+
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
728+
.setIndicesOptions(IndicesOptions.lenientExpandOpen());
729+
730+
// disable scoring by using index order
731+
deleteByQuery.getSearchRequest().source().sort(SINGLE_MAPPING_NAME);
732+
return deleteByQuery;
733+
}
678734
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ void updateTransformConfiguration(
7272
*/
7373
void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener);
7474

75+
/**
76+
* This deletes stored checkpoint documents for the given transformId, based on number and age.
77+
*
78+
* Both criteria MUST apply for the deletion to happen.
79+
*
80+
* @param transformId The transform ID referenced by the documents
81+
* @param deleteCheckpointsBelow checkpoints lower than this to delete
82+
* @param deleteOlderThan checkpoints older than this to delete
83+
* @param listener listener to alert on completion, returning number of deleted checkpoints
84+
*/
85+
void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener);
86+
7587
/**
7688
* Get a stored checkpoint, requires the transform id as well as the checkpoint id
7789
*

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.xpack.core.transform.TransformField;
3030
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
3131
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
32+
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
3233
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
3334
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
3435
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@@ -55,8 +56,9 @@ public final class TransformInternalIndex {
5556
* progress::docs_processed, progress::docs_indexed,
5657
* stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed,
5758
* stats::exponential_avg_documents_processed
58-
*
59+
* version 3 (7.5): rename to .transform-internal-xxx
5960
* version 4 (7.6): state::should_stop_at_checkpoint
61+
* checkpoint::checkpoint
6062
*/
6163

6264
// constants for mappings
@@ -115,26 +117,27 @@ private static XContentBuilder auditMappings() throws IOException {
115117
builder.startObject(SINGLE_MAPPING_NAME);
116118
addMetaInformation(builder);
117119
builder.field(DYNAMIC, "false");
118-
builder.startObject(PROPERTIES)
119-
.startObject(TRANSFORM_ID)
120-
.field(TYPE, KEYWORD)
121-
.endObject()
122-
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
123-
.field(TYPE, KEYWORD)
124-
.endObject()
125-
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
126-
.field(TYPE, TEXT)
127-
.startObject(FIELDS)
128-
.startObject(RAW)
129-
.field(TYPE, KEYWORD)
130-
.endObject()
131-
.endObject()
120+
builder
121+
.startObject(PROPERTIES)
122+
.startObject(TRANSFORM_ID)
123+
.field(TYPE, KEYWORD)
124+
.endObject()
125+
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
126+
.field(TYPE, KEYWORD)
127+
.endObject()
128+
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
129+
.field(TYPE, TEXT)
130+
.startObject(FIELDS)
131+
.startObject(RAW)
132+
.field(TYPE, KEYWORD)
133+
.endObject()
134+
.endObject()
132135
.endObject()
133136
.startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName())
134-
.field(TYPE, DATE)
137+
.field(TYPE, DATE)
135138
.endObject()
136139
.startObject(AbstractAuditMessage.NODE_NAME.getPreferredName())
137-
.field(TYPE, KEYWORD)
140+
.field(TYPE, KEYWORD)
138141
.endObject()
139142
.endObject()
140143
.endObject()
@@ -260,9 +263,6 @@ private static XContentBuilder addTransformStoredDocMappings(XContentBuilder bui
260263
.endObject()
261264
.endObject()
262265
.endObject();
263-
// This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that
264-
// we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash
265-
// .startObject("checkpointing").field(ENABLED, false).endObject();
266266
}
267267

268268
public static XContentBuilder addTransformsConfigMappings(XContentBuilder builder) throws IOException {
@@ -303,6 +303,9 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu
303303
.endObject()
304304
.startObject(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName())
305305
.field(TYPE, DATE)
306+
.endObject()
307+
.startObject(TransformCheckpoint.CHECKPOINT.getPreferredName())
308+
.field(TYPE, LONG)
306309
.endObject();
307310
}
308311

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,14 @@ private enum RunState {
7979

8080
public static final int MINIMUM_PAGE_SIZE = 10;
8181
public static final String COMPOSITE_AGGREGATION_NAME = "_transform";
82+
8283
private static final Logger logger = LogManager.getLogger(TransformIndexer.class);
8384

85+
// constant for checkpoint retention, static for now
86+
private static final long NUMBER_OF_CHECKPOINTS_TO_KEEP = 10;
87+
private static final long RETENTION_OF_CHECKPOINTS_MS = 864000000L; // 10 days
88+
private static final long CHECKPOINT_CLEANUP_INTERVAL = 100L; // every 100 checkpoints
89+
8490
protected final TransformConfigManager transformsConfigManager;
8591
private final CheckpointProvider checkpointProvider;
8692
private final TransformProgressGatherer progressGatherer;
@@ -111,6 +117,8 @@ private enum RunState {
111117
private volatile Map<String, Set<String>> changedBuckets;
112118
private volatile Map<String, Object> changedBucketsAfterKey;
113119

120+
private volatile long lastCheckpointCleanup = 0L;
121+
114122
public TransformIndexer(
115123
Executor executor,
116124
TransformConfigManager transformsConfigManager,
@@ -375,7 +383,13 @@ protected void onFinish(ActionListener<Void> listener) {
375383
if (context.shouldStopAtCheckpoint()) {
376384
stop();
377385
}
378-
listener.onResponse(null);
386+
387+
if (checkpoint - lastCheckpointCleanup > CHECKPOINT_CLEANUP_INTERVAL) {
388+
// delete old checkpoints, on a failure we keep going
389+
cleanupOldCheckpoints(listener);
390+
} else {
391+
listener.onResponse(null);
392+
}
379393
} catch (Exception e) {
380394
listener.onFailure(e);
381395
}
@@ -492,6 +506,44 @@ synchronized void handleFailure(Exception e) {
492506
}
493507
}
494508

509+
/**
510+
* Cleanup old checkpoints
511+
*
512+
* @param listener listener to call after done
513+
*/
514+
private void cleanupOldCheckpoints(ActionListener<Void> listener) {
515+
long now = getTime();
516+
long checkpointLowerBound = context.getCheckpoint() - NUMBER_OF_CHECKPOINTS_TO_KEEP;
517+
long lowerBoundEpochMs = now - RETENTION_OF_CHECKPOINTS_MS;
518+
519+
if (checkpointLowerBound > 0 && lowerBoundEpochMs > 0) {
520+
transformsConfigManager.deleteOldCheckpoints(
521+
transformConfig.getId(),
522+
checkpointLowerBound,
523+
lowerBoundEpochMs,
524+
ActionListener.wrap(deletes -> {
525+
logger.debug("[{}] deleted [{}] outdated checkpoints", getJobId(), deletes);
526+
listener.onResponse(null);
527+
lastCheckpointCleanup = context.getCheckpoint();
528+
}, e -> {
529+
logger.warn(
530+
new ParameterizedMessage("[{}] failed to cleanup old checkpoints, retrying after next checkpoint", getJobId()),
531+
e
532+
);
533+
auditor.warning(
534+
getJobId(),
535+
"Failed to cleanup old checkpoints, retrying after next checkpoint. Exception: " + e.getMessage()
536+
);
537+
538+
listener.onResponse(null);
539+
})
540+
);
541+
} else {
542+
logger.debug("[{}] checked for outdated checkpoints", getJobId());
543+
listener.onResponse(null);
544+
}
545+
}
546+
495547
private void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
496548
checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> {
497549
logger.trace("[{}] change detected [{}].", getJobId(), hasChanged);
@@ -788,6 +840,13 @@ protected void failIndexer(String failureMessage) {
788840
context.markAsFailed(failureMessage);
789841
}
790842

843+
/*
844+
* Get the current time, abstracted for the purpose of testing
845+
*/
846+
long getTime() {
847+
return System.currentTimeMillis();
848+
}
849+
791850
/**
792851
* Indicates if an audit message should be written when onFinish is called for the given checkpoint
793852
* We audit the first checkpoint, and then every 10 checkpoints until completedCheckpoint == 99

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener
8181
listener.onResponse(true);
8282
}
8383

84+
@Override
85+
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
86+
List<TransformCheckpoint> checkpointsById = checkpoints.get(transformId);
87+
int sizeBeforeDelete = checkpointsById.size();
88+
if (checkpointsById != null) {
89+
checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan; });
90+
}
91+
listener.onResponse(Long.valueOf(sizeBeforeDelete - checkpointsById.size()));
92+
}
93+
8494
@Override
8595
public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<TransformCheckpoint> resultListener) {
8696
List<TransformCheckpoint> checkpointsById = checkpoints.get(transformId);

0 commit comments

Comments
 (0)