7
7
package org .elasticsearch .xpack .slm ;
8
8
9
9
import org .apache .http .util .EntityUtils ;
10
+ import org .elasticsearch .action .admin .cluster .settings .ClusterUpdateSettingsRequest ;
10
11
import org .elasticsearch .action .index .IndexRequestBuilder ;
11
12
import org .elasticsearch .client .Request ;
12
13
import org .elasticsearch .client .Response ;
13
14
import org .elasticsearch .client .ResponseException ;
14
15
import org .elasticsearch .client .RestClient ;
15
16
import org .elasticsearch .common .Strings ;
17
+ import org .elasticsearch .common .settings .Settings ;
18
+ import org .elasticsearch .common .unit .TimeValue ;
16
19
import org .elasticsearch .common .xcontent .DeprecationHandler ;
17
20
import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
18
21
import org .elasticsearch .common .xcontent .ToXContent ;
22
25
import org .elasticsearch .common .xcontent .XContentType ;
23
26
import org .elasticsearch .common .xcontent .json .JsonXContent ;
24
27
import org .elasticsearch .test .rest .ESRestTestCase ;
28
+ import org .elasticsearch .xpack .core .indexlifecycle .LifecycleSettings ;
25
29
import org .elasticsearch .xpack .core .snapshotlifecycle .SnapshotLifecyclePolicy ;
26
30
import org .elasticsearch .xpack .core .snapshotlifecycle .SnapshotRetentionConfiguration ;
27
31
@@ -222,6 +226,93 @@ public void testPolicyManualExecution() throws Exception {
222
226
});
223
227
}
224
228
229
+ public void testBasicTimeBasedRetenion () throws Exception {
230
+ final String indexName = "test" ;
231
+ final String policyName = "test-policy" ;
232
+ final String repoId = "my-repo" ;
233
+ int docCount = randomIntBetween (10 , 50 );
234
+ List <IndexRequestBuilder > indexReqs = new ArrayList <>();
235
+ for (int i = 0 ; i < docCount ; i ++) {
236
+ index (client (), indexName , "" + i , "foo" , "bar" );
237
+ }
238
+
239
+ // Create a snapshot repo
240
+ inializeRepo (repoId );
241
+
242
+ // Create a policy with a retention period of 1 millisecond
243
+ createSnapshotPolicy (policyName , "snap" , "1 2 3 4 5 ?" , repoId , indexName , true ,
244
+ new SnapshotRetentionConfiguration (TimeValue .timeValueMillis (1 )));
245
+
246
+ // Manually create a snapshot
247
+ Response executeResp = client ().performRequest (new Request ("PUT" , "/_slm/policy/" + policyName + "/_execute" ));
248
+
249
+ final String snapshotName ;
250
+ try (XContentParser parser = JsonXContent .jsonXContent .createParser (NamedXContentRegistry .EMPTY ,
251
+ DeprecationHandler .THROW_UNSUPPORTED_OPERATION , EntityUtils .toByteArray (executeResp .getEntity ()))) {
252
+ snapshotName = parser .mapStrings ().get ("snapshot_name" );
253
+
254
+ // Check that the executed snapshot is created
255
+ assertBusy (() -> {
256
+ try {
257
+ Response response = client ().performRequest (new Request ("GET" , "/_snapshot/" + repoId + "/" + snapshotName ));
258
+ Map <String , Object > snapshotResponseMap ;
259
+ try (InputStream is = response .getEntity ().getContent ()) {
260
+ snapshotResponseMap = XContentHelper .convertToMap (XContentType .JSON .xContent (), is , true );
261
+ }
262
+ assertThat (snapshotResponseMap .size (), greaterThan (0 ));
263
+ final Map <String , Object > metadata = extractMetadata (snapshotResponseMap , snapshotName );
264
+ assertNotNull (metadata );
265
+ assertThat (metadata .get ("policy" ), equalTo (policyName ));
266
+ assertHistoryIsPresent (policyName , true , repoId );
267
+ } catch (ResponseException e ) {
268
+ fail ("expected snapshot to exist but it does not: " + EntityUtils .toString (e .getResponse ().getEntity ()));
269
+ }
270
+ });
271
+ }
272
+
273
+ // Run retention every second
274
+ ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest ();
275
+ req .transientSettings (Settings .builder ().put (LifecycleSettings .SLM_RETENTION_SCHEDULE , "*/1 * * * * ?" ));
276
+ try (XContentBuilder builder = jsonBuilder ()) {
277
+ req .toXContent (builder , ToXContent .EMPTY_PARAMS );
278
+ Request r = new Request ("PUT" , "/_cluster/settings" );
279
+ r .setJsonEntity (Strings .toString (builder ));
280
+ Response updateSettingsResp = client ().performRequest (r );
281
+ }
282
+
283
+ try {
284
+ // Check that the snapshot created by the policy has been removed by retention
285
+ assertBusy (() -> {
286
+ // We expect a failed response because the snapshot should not exist
287
+ try {
288
+ Response response = client ().performRequest (new Request ("GET" , "/_snapshot/" + repoId + "/" + snapshotName ));
289
+ assertThat (EntityUtils .toString (response .getEntity ()), containsString ("snapshot_missing_exception" ));
290
+ } catch (ResponseException e ) {
291
+ assertThat (EntityUtils .toString (e .getResponse ().getEntity ()), containsString ("snapshot_missing_exception" ));
292
+ }
293
+ });
294
+
295
+ Request delReq = new Request ("DELETE" , "/_slm/policy/" + policyName );
296
+ assertOK (client ().performRequest (delReq ));
297
+
298
+ // It's possible there could have been a snapshot in progress when the
299
+ // policy is deleted, so wait for it to be finished
300
+ assertBusy (() -> {
301
+ assertThat (wipeSnapshots ().size (), equalTo (0 ));
302
+ });
303
+ } finally {
304
+ // Unset retention
305
+ ClusterUpdateSettingsRequest unsetRequest = new ClusterUpdateSettingsRequest ();
306
+ unsetRequest .transientSettings (Settings .builder ().put (LifecycleSettings .SLM_RETENTION_SCHEDULE , (String ) null ));
307
+ try (XContentBuilder builder = jsonBuilder ()) {
308
+ unsetRequest .toXContent (builder , ToXContent .EMPTY_PARAMS );
309
+ Request r = new Request ("PUT" , "/_cluster/settings" );
310
+ r .setJsonEntity (Strings .toString (builder ));
311
+ client ().performRequest (r );
312
+ }
313
+ }
314
+ }
315
+
225
316
@ SuppressWarnings ("unchecked" )
226
317
private static Map <String , Object > extractMetadata (Map <String , Object > snapshotResponseMap , String snapshotPrefix ) {
227
318
List <Map <String , Object >> snapResponse = ((List <Map <String , Object >>) snapshotResponseMap .get ("responses" )).stream ()
@@ -284,6 +375,13 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r
284
375
285
376
private void createSnapshotPolicy (String policyName , String snapshotNamePattern , String schedule , String repoId ,
286
377
String indexPattern , boolean ignoreUnavailable ) throws IOException {
378
+ createSnapshotPolicy (policyName , snapshotNamePattern , schedule , repoId , indexPattern ,
379
+ ignoreUnavailable , SnapshotRetentionConfiguration .EMPTY );
380
+ }
381
+
382
+ private void createSnapshotPolicy (String policyName , String snapshotNamePattern , String schedule , String repoId ,
383
+ String indexPattern , boolean ignoreUnavailable ,
384
+ SnapshotRetentionConfiguration retention ) throws IOException {
287
385
Map <String , Object > snapConfig = new HashMap <>();
288
386
snapConfig .put ("indices" , Collections .singletonList (indexPattern ));
289
387
snapConfig .put ("ignore_unavailable" , ignoreUnavailable );
@@ -295,8 +393,8 @@ private void createSnapshotPolicy(String policyName, String snapshotNamePattern,
295
393
() -> randomAlphaOfLength (5 )), randomAlphaOfLength (4 ));
296
394
}
297
395
}
298
- SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy (policyName , snapshotNamePattern , schedule , repoId , snapConfig ,
299
- SnapshotRetentionConfiguration . EMPTY );
396
+ SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy (policyName , snapshotNamePattern , schedule ,
397
+ repoId , snapConfig , retention );
300
398
301
399
Request putLifecycle = new Request ("PUT" , "/_slm/policy/" + policyName );
302
400
XContentBuilder lifecycleBuilder = JsonXContent .contentBuilder ();
0 commit comments