Skip to content

Commit f68c24d

Browse files
author
Hendrik Muhs
authored
[Transform] automatic deletion of old checkpoints (#49496)
add automatic deletion of old checkpoints based on count and time
1 parent 4a07802 commit f68c24d

File tree

7 files changed

+279
-48
lines changed

7 files changed

+279
-48
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
5050
// checkpoint of the indexes (sequence id's)
5151
public static final ParseField INDICES = new ParseField("indices");
5252

53-
private static final String NAME = "data_frame_transform_checkpoint";
53+
public static final String NAME = "data_frame_transform_checkpoint";
5454

5555
private static final ConstructingObjectParser<TransformCheckpoint, Void> STRICT_PARSER = createParser(false);
5656
private static final ConstructingObjectParser<TransformCheckpoint, Void> LENIENT_PARSER = createParser(true);

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.index.query.BoolQueryBuilder;
4040
import org.elasticsearch.index.query.QueryBuilder;
4141
import org.elasticsearch.index.query.QueryBuilders;
42+
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
4243
import org.elasticsearch.index.reindex.BulkByScrollResponse;
4344
import org.elasticsearch.index.reindex.DeleteByQueryAction;
4445
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
@@ -63,6 +64,7 @@
6364
import java.util.List;
6465
import java.util.Set;
6566

67+
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
6668
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
6769
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
6870

@@ -146,16 +148,18 @@ public void updateTransformConfiguration(
146148

147149
@Override
148150
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
149-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
151+
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
152+
deleteByQueryRequest.indices(
150153
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
151154
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
152-
).setQuery(
155+
);
156+
deleteByQueryRequest.setQuery(
153157
QueryBuilders.constantScoreQuery(
154158
QueryBuilders.boolQuery()
155159
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
156160
.filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)))
157161
)
158-
).setIndicesOptions(IndicesOptions.lenientExpandOpen());
162+
);
159163

160164
executeAsyncWithOrigin(
161165
client,
@@ -177,17 +181,18 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener<
177181

178182
@Override
179183
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
180-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
184+
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
185+
deleteByQueryRequest.indices(
181186
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
182187
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
183-
).setQuery(
188+
);
189+
deleteByQueryRequest.setQuery(
184190
QueryBuilders.constantScoreQuery(
185191
QueryBuilders.boolQuery()
186192
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
187193
.filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))
188194
)
189-
).setIndicesOptions(IndicesOptions.lenientExpandOpen());
190-
195+
);
191196
executeAsyncWithOrigin(
192197
client,
193198
TRANSFORM_ORIGIN,
@@ -206,6 +211,41 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener
206211
);
207212
}
208213

214+
@Override
215+
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
216+
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
217+
deleteByQueryRequest.indices(
218+
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
219+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
220+
);
221+
deleteByQueryRequest.setQuery(
222+
QueryBuilders.boolQuery()
223+
.filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId))
224+
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME))
225+
.filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(deleteCheckpointsBelow))
226+
.filter(
227+
QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()).lt(deleteOlderThan).format("epoch_millis")
228+
)
229+
);
230+
logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest());
231+
executeAsyncWithOrigin(
232+
client,
233+
TRANSFORM_ORIGIN,
234+
DeleteByQueryAction.INSTANCE,
235+
deleteByQueryRequest,
236+
ActionListener.wrap(response -> {
237+
if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
238+
Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
239+
listener.onFailure(
240+
new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
241+
);
242+
return;
243+
}
244+
listener.onResponse(response.getDeleted());
245+
}, listener::onFailure)
246+
);
247+
}
248+
209249
private void putTransformConfiguration(
210250
TransformConfig transformConfig,
211251
DocWriteRequest.OpType optType,
@@ -419,9 +459,7 @@ public void expandTransformIds(
419459

420460
@Override
421461
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
422-
DeleteByQueryRequest request = new DeleteByQueryRequest().setAbortOnVersionConflict(false); // since these documents are not
423-
// updated, a conflict just means it was
424-
// deleted previously
462+
DeleteByQueryRequest request = createDeleteByQueryRequest();
425463

426464
request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED);
427465
QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId);
@@ -675,4 +713,22 @@ private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrol
675713
}
676714
return new Tuple<>(status, reason);
677715
}
716+
717+
/**
718+
* Create DBQ request with good defaults
719+
*
720+
* @return new DeleteByQueryRequest with some defaults set
721+
*/
722+
private static DeleteByQueryRequest createDeleteByQueryRequest() {
723+
724+
DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest();
725+
726+
deleteByQuery.setAbortOnVersionConflict(false)
727+
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
728+
.setIndicesOptions(IndicesOptions.lenientExpandOpen());
729+
730+
// disable scoring by using index order
731+
deleteByQuery.getSearchRequest().source().sort(SINGLE_MAPPING_NAME);
732+
return deleteByQuery;
733+
}
678734
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ void updateTransformConfiguration(
7272
*/
7373
void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener);
7474

75+
/**
76+
* This deletes stored checkpoint documents for the given transformId, based on number and age.
77+
*
78+
* Both criteria MUST apply for the deletion to happen.
79+
*
80+
* @param transformId The transform ID referenced by the documents
81+
* @param deleteCheckpointsBelow checkpoints lower than this to delete
82+
* @param deleteOlderThan checkpoints older than this to delete
83+
* @param listener listener to alert on completion, returning number of deleted checkpoints
84+
*/
85+
void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener);
86+
7587
/**
7688
* Get a stored checkpoint, requires the transform id as well as the checkpoint id
7789
*

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.core.transform.TransformField;
2929
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
3030
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
31+
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
3132
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
3233
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
3334
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@@ -54,8 +55,9 @@ public final class TransformInternalIndex {
5455
* progress::docs_processed, progress::docs_indexed,
5556
* stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed,
5657
* stats::exponential_avg_documents_processed
57-
*
58+
* version 3 (7.5): rename to .transform-internal-xxx
5859
* version 4 (7.6): state::should_stop_at_checkpoint
60+
* checkpoint::checkpoint
5961
*/
6062

6163
// constants for mappings
@@ -77,25 +79,29 @@ public final class TransformInternalIndex {
7779

7880
public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException {
7981
IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)
80-
.patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME))
81-
.version(Version.CURRENT.id)
82-
.settings(Settings.builder()
83-
// the configurations are expected to be small
84-
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
85-
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
86-
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings()))
87-
.build();
82+
.patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME))
83+
.version(Version.CURRENT.id)
84+
.settings(
85+
Settings.builder()
86+
// the configurations are expected to be small
87+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
88+
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
89+
)
90+
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings()))
91+
.build();
8892
return transformTemplate;
8993
}
9094

9195
public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOException {
9296
IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX)
9397
.patterns(Collections.singletonList(TransformInternalIndexConstants.AUDIT_INDEX_PREFIX + "*"))
9498
.version(Version.CURRENT.id)
95-
.settings(Settings.builder()
96-
// the audits are expected to be small
97-
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
98-
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
99+
.settings(
100+
Settings.builder()
101+
// the audits are expected to be small
102+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
103+
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
104+
)
99105
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
100106
.putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))
101107
.build();
@@ -107,26 +113,27 @@ private static XContentBuilder auditMappings() throws IOException {
107113
builder.startObject(SINGLE_MAPPING_NAME);
108114
addMetaInformation(builder);
109115
builder.field(DYNAMIC, "false");
110-
builder.startObject(PROPERTIES)
111-
.startObject(TRANSFORM_ID)
112-
.field(TYPE, KEYWORD)
113-
.endObject()
114-
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
115-
.field(TYPE, KEYWORD)
116-
.endObject()
117-
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
118-
.field(TYPE, TEXT)
119-
.startObject(FIELDS)
120-
.startObject(RAW)
121-
.field(TYPE, KEYWORD)
122-
.endObject()
123-
.endObject()
116+
builder
117+
.startObject(PROPERTIES)
118+
.startObject(TRANSFORM_ID)
119+
.field(TYPE, KEYWORD)
120+
.endObject()
121+
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
122+
.field(TYPE, KEYWORD)
123+
.endObject()
124+
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
125+
.field(TYPE, TEXT)
126+
.startObject(FIELDS)
127+
.startObject(RAW)
128+
.field(TYPE, KEYWORD)
129+
.endObject()
130+
.endObject()
124131
.endObject()
125132
.startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName())
126-
.field(TYPE, DATE)
133+
.field(TYPE, DATE)
127134
.endObject()
128135
.startObject(AbstractAuditMessage.NODE_NAME.getPreferredName())
129-
.field(TYPE, KEYWORD)
136+
.field(TYPE, KEYWORD)
130137
.endObject()
131138
.endObject()
132139
.endObject()
@@ -167,7 +174,6 @@ public static XContentBuilder mappings(XContentBuilder builder) throws IOExcepti
167174
return builder;
168175
}
169176

170-
171177
private static XContentBuilder addTransformStoredDocMappings(XContentBuilder builder) throws IOException {
172178
return builder
173179
.startObject(TransformStoredDoc.STATE_FIELD.getPreferredName())
@@ -254,9 +260,6 @@ private static XContentBuilder addTransformStoredDocMappings(XContentBuilder bui
254260
.endObject()
255261
.endObject()
256262
.endObject();
257-
// This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that
258-
// we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash
259-
// .startObject("checkpointing").field(ENABLED, false).endObject();
260263
}
261264

262265
public static XContentBuilder addTransformsConfigMappings(XContentBuilder builder) throws IOException {
@@ -299,6 +302,9 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu
299302
.endObject()
300303
.startObject(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName())
301304
.field(TYPE, DATE)
305+
.endObject()
306+
.startObject(TransformCheckpoint.CHECKPOINT.getPreferredName())
307+
.field(TYPE, LONG)
302308
.endObject();
303309
}
304310

@@ -310,9 +316,7 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu
310316
* @throws IOException On write error
311317
*/
312318
private static XContentBuilder addMetaInformation(XContentBuilder builder) throws IOException {
313-
return builder.startObject("_meta")
314-
.field("version", Version.CURRENT)
315-
.endObject();
319+
return builder.startObject("_meta").field("version", Version.CURRENT).endObject();
316320
}
317321

318322
/**

0 commit comments

Comments
 (0)