37
37
import org .elasticsearch .threadpool .ThreadPool ;
38
38
39
39
import java .io .Closeable ;
40
+ import java .util .ArrayList ;
40
41
import java .util .Collection ;
41
42
import java .util .Collections ;
42
43
import java .util .HashMap ;
47
48
import java .util .stream .Collectors ;
48
49
import java .util .stream .Stream ;
49
50
51
+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
50
52
import static org .hamcrest .Matchers .anyOf ;
51
53
import static org .hamcrest .Matchers .contains ;
52
54
import static org .hamcrest .Matchers .empty ;
53
55
import static org .hamcrest .Matchers .equalTo ;
54
56
55
57
@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
56
- public class RetentionLeaseSyncIT extends ESIntegTestCase {
58
+ public class RetentionLeaseIT extends ESIntegTestCase {
57
59
58
60
public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
59
61
@@ -68,7 +70,7 @@ public List<Setting<?>> getSettings() {
68
70
protected Collection <Class <? extends Plugin >> nodePlugins () {
69
71
return Stream .concat (
70
72
super .nodePlugins ().stream (),
71
- Stream .of (RetentionLeaseBackgroundSyncIT . RetentionLeaseSyncIntervalSettingPlugin .class ))
73
+ Stream .of (RetentionLeaseSyncIntervalSettingPlugin .class ))
72
74
.collect (Collectors .toList ());
73
75
}
74
76
@@ -205,9 +207,62 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception {
205
207
}
206
208
}
207
209
208
- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/38487" )
210
+ public void testBackgroundRetentionLeaseSync () throws Exception {
211
+ final int numberOfReplicas = 2 - scaledRandomIntBetween (0 , 2 );
212
+ internalCluster ().ensureAtLeastNumDataNodes (1 + numberOfReplicas );
213
+ final Settings settings = Settings .builder ()
214
+ .put ("index.number_of_shards" , 1 )
215
+ .put ("index.number_of_replicas" , numberOfReplicas )
216
+ .put (IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (), "1s" )
217
+ .build ();
218
+ createIndex ("index" , settings );
219
+ ensureGreen ("index" );
220
+ final String primaryShardNodeId = clusterService ().state ().routingTable ().index ("index" ).shard (0 ).primaryShard ().currentNodeId ();
221
+ final String primaryShardNodeName = clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
222
+ final IndexShard primary = internalCluster ()
223
+ .getInstance (IndicesService .class , primaryShardNodeName )
224
+ .getShardOrNull (new ShardId (resolveIndex ("index" ), 0 ));
225
+ // we will add multiple retention leases and expect to see them synced to all replicas
226
+ final int length = randomIntBetween (1 , 8 );
227
+ final Map <String , RetentionLease > currentRetentionLeases = new HashMap <>(length );
228
+ final List <String > ids = new ArrayList <>(length );
229
+ for (int i = 0 ; i < length ; i ++) {
230
+ final String id = randomValueOtherThanMany (currentRetentionLeases .keySet ()::contains , () -> randomAlphaOfLength (8 ));
231
+ ids .add (id );
232
+ final long retainingSequenceNumber = randomLongBetween (0 , Long .MAX_VALUE );
233
+ final String source = randomAlphaOfLength (8 );
234
+ final CountDownLatch latch = new CountDownLatch (1 );
235
+ // put a new lease
236
+ currentRetentionLeases .put (
237
+ id ,
238
+ primary .addRetentionLease (id , retainingSequenceNumber , source , ActionListener .wrap (latch ::countDown )));
239
+ latch .await ();
240
+ // now renew all existing leases; we expect to see these synced to the replicas
241
+ for (int j = 0 ; j <= i ; j ++) {
242
+ currentRetentionLeases .put (
243
+ ids .get (j ),
244
+ primary .renewRetentionLease (
245
+ ids .get (j ),
246
+ randomLongBetween (currentRetentionLeases .get (ids .get (j )).retainingSequenceNumber (), Long .MAX_VALUE ),
247
+ source ));
248
+ }
249
+ assertBusy (() -> {
250
+ // check all retention leases have been synced to all replicas
251
+ for (final ShardRouting replicaShard : clusterService ().state ().routingTable ().index ("index" ).shard (0 ).replicaShards ()) {
252
+ final String replicaShardNodeId = replicaShard .currentNodeId ();
253
+ final String replicaShardNodeName = clusterService ().state ().nodes ().get (replicaShardNodeId ).getName ();
254
+ final IndexShard replica = internalCluster ()
255
+ .getInstance (IndicesService .class , replicaShardNodeName )
256
+ .getShardOrNull (new ShardId (resolveIndex ("index" ), 0 ));
257
+ assertThat (replica .getRetentionLeases (), equalTo (primary .getRetentionLeases ()));
258
+ }
259
+ });
260
+ }
261
+ }
262
+
209
263
public void testRetentionLeasesSyncOnRecovery () throws Exception {
210
- final int numberOfReplicas = 1 ;
264
+ final int numberOfReplicas = 2 - scaledRandomIntBetween (0 , 2 );
265
+ internalCluster ().ensureAtLeastNumDataNodes (1 + numberOfReplicas );
211
266
/*
212
267
* We effectively disable the background sync to ensure that the retention leases are not synced in the background so that the only
213
268
* source of retention leases on the replicas would be from the commit point and recovery.
@@ -217,10 +272,9 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
217
272
.put ("index.number_of_replicas" , 0 )
218
273
.put (IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (), TimeValue .timeValueHours (24 ))
219
274
.build ();
220
- createIndex ("index" , settings );
275
+ // when we increase the number of replicas below we want to exclude the replicas from being allocated so that they do not recover
276
+ assertAcked (prepareCreate ("index" , 1 ).setSettings (settings ));
221
277
ensureYellow ("index" );
222
- // exclude the replicas from being allocated
223
- allowNodes ("index" , 1 );
224
278
final AcknowledgedResponse response = client ().admin ()
225
279
.indices ()
226
280
.prepareUpdateSettings ("index" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , numberOfReplicas ).build ())
@@ -261,11 +315,6 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
261
315
.getShardOrNull (new ShardId (resolveIndex ("index" ), 0 ));
262
316
final Map <String , RetentionLease > retentionLeasesOnReplica = RetentionLeases .toMap (replica .getRetentionLeases ());
263
317
assertThat (retentionLeasesOnReplica , equalTo (currentRetentionLeases ));
264
-
265
- // check retention leases have been committed on the replica
266
- final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases .decodeRetentionLeases (
267
- replica .acquireLastIndexCommit (false ).getIndexCommit ().getUserData ().get (Engine .RETENTION_LEASES ));
268
- assertThat (currentRetentionLeases , equalTo (RetentionLeases .toMap (replicaCommittedRetentionLeases )));
269
318
}
270
319
}
271
320
0 commit comments