5
5
*/
6
6
package org .elasticsearch .xpack .ccr .action ;
7
7
8
+ import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
8
9
import org .elasticsearch .Version ;
9
10
import org .elasticsearch .action .admin .cluster .state .ClusterStateResponse ;
11
+ import org .elasticsearch .action .support .replication .ClusterStateCreationUtils ;
10
12
import org .elasticsearch .client .Client ;
11
13
import org .elasticsearch .cluster .ClusterName ;
12
14
import org .elasticsearch .cluster .ClusterState ;
18
20
import org .elasticsearch .cluster .routing .ShardRoutingState ;
19
21
import org .elasticsearch .cluster .routing .TestShardRouting ;
20
22
import org .elasticsearch .cluster .service .ClusterService ;
23
+ import org .elasticsearch .common .UUIDs ;
21
24
import org .elasticsearch .common .collect .Tuple ;
22
25
import org .elasticsearch .common .settings .ClusterSettings ;
23
26
import org .elasticsearch .common .settings .Settings ;
24
27
import org .elasticsearch .common .unit .ByteSizeValue ;
25
28
import org .elasticsearch .common .unit .TimeValue ;
29
+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
26
30
import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
27
31
import org .elasticsearch .index .Index ;
28
32
import org .elasticsearch .index .IndexSettings ;
44
48
import java .util .LinkedList ;
45
49
import java .util .List ;
46
50
import java .util .Map ;
51
+ import java .util .Set ;
47
52
import java .util .concurrent .CountDownLatch ;
48
53
import java .util .concurrent .ExecutorService ;
49
54
import java .util .concurrent .Executors ;
50
55
import java .util .concurrent .atomic .AtomicInteger ;
56
+ import java .util .concurrent .atomic .AtomicReference ;
51
57
import java .util .function .BiConsumer ;
52
58
import java .util .function .Consumer ;
53
59
import java .util .function .Function ;
57
63
import static org .elasticsearch .xpack .ccr .action .AutoFollowCoordinator .AutoFollower .recordLeaderIndexAsFollowFunction ;
58
64
import static org .hamcrest .Matchers .equalTo ;
59
65
import static org .hamcrest .Matchers .greaterThan ;
66
+ import static org .hamcrest .Matchers .hasItem ;
60
67
import static org .hamcrest .Matchers .is ;
61
68
import static org .hamcrest .Matchers .notNullValue ;
62
69
import static org .hamcrest .Matchers .nullValue ;
@@ -416,6 +423,26 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() {
416
423
assertThat (result .get (1 ).getName (), equalTo ("index2" ));
417
424
}
418
425
426
+ public void testGetLeaderIndicesToFollowWithClosedIndices () {
427
+ final AutoFollowPattern autoFollowPattern = new AutoFollowPattern ("remote" , Collections .singletonList ("*" ),
428
+ null , null , null , null , null , null , null , null , null , null , null );
429
+
430
+ // index is opened
431
+ ClusterState remoteState = ClusterStateCreationUtils .stateWithActivePrimary ("test-index" , true , randomIntBetween (1 , 3 ), 0 );
432
+ List <Index > result = AutoFollower .getLeaderIndicesToFollow (autoFollowPattern , remoteState , Collections .emptyList ());
433
+ assertThat (result .size (), equalTo (1 ));
434
+ assertThat (result , hasItem (remoteState .metaData ().index ("test-index" ).getIndex ()));
435
+
436
+ // index is closed
437
+ remoteState = ClusterState .builder (remoteState )
438
+ .metaData (MetaData .builder (remoteState .metaData ())
439
+ .put (IndexMetaData .builder (remoteState .metaData ().index ("test-index" )).state (IndexMetaData .State .CLOSE ).build (), true )
440
+ .build ())
441
+ .build ();
442
+ result = AutoFollower .getLeaderIndicesToFollow (autoFollowPattern , remoteState , Collections .emptyList ());
443
+ assertThat (result .size (), equalTo (0 ));
444
+ }
445
+
419
446
public void testRecordLeaderIndexAsFollowFunction () {
420
447
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (Collections .emptyMap (),
421
448
Collections .singletonMap ("pattern1" , Collections .emptyList ()), Collections .emptyMap ());
@@ -763,7 +790,9 @@ void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunctio
763
790
autoFollower .start ();
764
791
assertThat (allResults .size (), equalTo (states .length ));
765
792
for (int i = 0 ; i < states .length ; i ++) {
766
- assertThat (allResults .get (i ).autoFollowExecutionResults .containsKey (new Index ("logs-" + i , "_na_" )), is (true ));
793
+ final String indexName = "logs-" + i ;
794
+ assertThat (allResults .get (i ).autoFollowExecutionResults .keySet ().stream ()
795
+ .anyMatch (index -> index .getName ().equals (indexName )), is (true ));
767
796
}
768
797
}
769
798
@@ -1049,6 +1078,87 @@ void updateAutoFollowMetadata(
1049
1078
}
1050
1079
}
1051
1080
1081
+ public void testClosedIndicesAreNotAutoFollowed () {
1082
+ final Client client = mock (Client .class );
1083
+ when (client .getRemoteClusterClient (anyString ())).thenReturn (client );
1084
+
1085
+ final String pattern = "pattern1" ;
1086
+ final ClusterState localState = ClusterState .builder (new ClusterName ("local" ))
1087
+ .metaData (MetaData .builder ()
1088
+ .putCustom (AutoFollowMetadata .TYPE ,
1089
+ new AutoFollowMetadata (Collections .singletonMap (pattern ,
1090
+ new AutoFollowPattern ("remote" , Collections .singletonList ("docs-*" ), null , null , null , null , null , null , null , null ,
1091
+ null , null , null )),
1092
+ Collections .singletonMap (pattern , Collections .emptyList ()),
1093
+ Collections .singletonMap (pattern , Collections .emptyMap ()))))
1094
+ .build ();
1095
+
1096
+ ClusterState remoteState = null ;
1097
+ final int nbLeaderIndices = randomInt (15 );
1098
+ for (int i = 0 ; i < nbLeaderIndices ; i ++) {
1099
+ String indexName = "docs-" + i ;
1100
+ if (remoteState == null ) {
1101
+ remoteState = createRemoteClusterState (indexName , true );
1102
+ } else {
1103
+ remoteState = createRemoteClusterState (remoteState , indexName );
1104
+ }
1105
+ if (randomBoolean ()) {
1106
+ // randomly close the index
1107
+ remoteState = ClusterState .builder (remoteState .getClusterName ())
1108
+ .routingTable (remoteState .routingTable ())
1109
+ .metaData (MetaData .builder (remoteState .metaData ())
1110
+ .put (IndexMetaData .builder (remoteState .metaData ().index (indexName )).state (IndexMetaData .State .CLOSE ).build (), true )
1111
+ .build ())
1112
+ .build ();
1113
+ }
1114
+ }
1115
+
1116
+ final ClusterState finalRemoteState = remoteState ;
1117
+ final AtomicReference <ClusterState > lastModifiedClusterState = new AtomicReference <>(localState );
1118
+ final List <AutoFollowCoordinator .AutoFollowResult > results = new ArrayList <>();
1119
+ final Set <Object > followedIndices = ConcurrentCollections .newConcurrentSet ();
1120
+ final AutoFollower autoFollower =
1121
+ new AutoFollower ("remote" , results ::addAll , localClusterStateSupplier (localState ), () -> 1L , Runnable ::run ) {
1122
+ @ Override
1123
+ void getRemoteClusterState (String remoteCluster ,
1124
+ long metadataVersion ,
1125
+ BiConsumer <ClusterStateResponse , Exception > handler ) {
1126
+ assertThat (remoteCluster , equalTo ("remote" ));
1127
+ handler .accept (new ClusterStateResponse (new ClusterName ("remote" ), finalRemoteState , false ), null );
1128
+ }
1129
+
1130
+ @ Override
1131
+ void createAndFollow (Map <String , String > headers ,
1132
+ PutFollowAction .Request followRequest ,
1133
+ Runnable successHandler ,
1134
+ Consumer <Exception > failureHandler ) {
1135
+ followedIndices .add (followRequest .getLeaderIndex ());
1136
+ successHandler .run ();
1137
+ }
1138
+
1139
+ @ Override
1140
+ void updateAutoFollowMetadata (Function <ClusterState , ClusterState > updateFunction , Consumer <Exception > handler ) {
1141
+ lastModifiedClusterState .updateAndGet (updateFunction ::apply );
1142
+ handler .accept (null );
1143
+ }
1144
+
1145
+ @ Override
1146
+ void cleanFollowedRemoteIndices (ClusterState remoteClusterState , List <String > patterns ) {
1147
+ // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
1148
+ }
1149
+ };
1150
+ autoFollower .start ();
1151
+
1152
+ assertThat (results , notNullValue ());
1153
+ assertThat (results .size (), equalTo (1 ));
1154
+
1155
+ for (ObjectObjectCursor <String , IndexMetaData > index : remoteState .metaData ().indices ()) {
1156
+ boolean expect = index .value .getState () == IndexMetaData .State .OPEN ;
1157
+ assertThat (results .get (0 ).autoFollowExecutionResults .containsKey (index .value .getIndex ()), is (expect ));
1158
+ assertThat (followedIndices .contains (index .key ), is (expect ));
1159
+ }
1160
+ }
1161
+
1052
1162
private static ClusterState createRemoteClusterState (String indexName , Boolean enableSoftDeletes ) {
1053
1163
Settings .Builder indexSettings ;
1054
1164
if (enableSoftDeletes != null ) {
@@ -1075,19 +1185,21 @@ private static ClusterState createRemoteClusterState(String indexName, Boolean e
1075
1185
1076
1186
private static ClusterState createRemoteClusterState (ClusterState previous , String indexName ) {
1077
1187
IndexMetaData indexMetaData = IndexMetaData .builder (indexName )
1078
- .settings (settings (Version .CURRENT ).put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true ))
1188
+ .settings (settings (Version .CURRENT )
1189
+ .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true )
1190
+ .put (IndexMetaData .SETTING_INDEX_UUID , UUIDs .randomBase64UUID (random ())))
1079
1191
.numberOfShards (1 )
1080
1192
.numberOfReplicas (0 )
1081
1193
.build ();
1082
- ClusterState .Builder csBuilder = ClusterState .builder (new ClusterName ( "remote" ))
1194
+ ClusterState .Builder csBuilder = ClusterState .builder (previous . getClusterName ( ))
1083
1195
.metaData (MetaData .builder (previous .metaData ())
1084
1196
.version (previous .metaData ().version () + 1 )
1085
1197
.put (indexMetaData , true ));
1086
1198
1087
1199
ShardRouting shardRouting =
1088
1200
TestShardRouting .newShardRouting (indexName , 0 , "1" , true , ShardRoutingState .INITIALIZING ).moveToStarted ();
1089
1201
IndexRoutingTable indexRoutingTable = IndexRoutingTable .builder (indexMetaData .getIndex ()).addShard (shardRouting ).build ();
1090
- csBuilder .routingTable (RoutingTable .builder ().add (indexRoutingTable ).build ()).build ();
1202
+ csBuilder .routingTable (RoutingTable .builder (previous . routingTable () ).add (indexRoutingTable ).build ()).build ();
1091
1203
1092
1204
return csBuilder .build ();
1093
1205
}
0 commit comments