17
17
import org .elasticsearch .cluster .AckedClusterStateUpdateTask ;
18
18
import org .elasticsearch .cluster .ClusterState ;
19
19
import org .elasticsearch .cluster .ClusterStateTaskExecutor ;
20
+ import org .elasticsearch .cluster .ClusterStateTaskExecutor .ClusterTasksResult ;
20
21
import org .elasticsearch .cluster .block .ClusterBlock ;
21
22
import org .elasticsearch .cluster .block .ClusterBlocks ;
22
23
import org .elasticsearch .cluster .routing .RoutingTable ;
@@ -50,13 +51,12 @@ public class MetadataUpdateSettingsService {
50
51
private static final Logger logger = LogManager .getLogger (MetadataUpdateSettingsService .class );
51
52
52
53
private final ClusterService clusterService ;
53
-
54
54
private final AllocationService allocationService ;
55
-
56
55
private final IndexScopedSettings indexScopedSettings ;
57
56
private final IndicesService indicesService ;
58
57
private final ShardLimitValidator shardLimitValidator ;
59
58
private final ThreadPool threadPool ;
59
+ private final ClusterStateTaskExecutor <AckedClusterStateUpdateTask > executor ;
60
60
61
61
public MetadataUpdateSettingsService (
62
62
ClusterService clusterService ,
@@ -67,11 +67,28 @@ public MetadataUpdateSettingsService(
67
67
ThreadPool threadPool
68
68
) {
69
69
this .clusterService = clusterService ;
70
- this .threadPool = threadPool ;
71
70
this .allocationService = allocationService ;
72
71
this .indexScopedSettings = indexScopedSettings ;
73
72
this .indicesService = indicesService ;
74
73
this .shardLimitValidator = shardLimitValidator ;
74
+ this .threadPool = threadPool ;
75
+ this .executor = (currentState , tasks ) -> {
76
+ ClusterTasksResult .Builder <AckedClusterStateUpdateTask > builder = ClusterTasksResult .builder ();
77
+ ClusterState state = currentState ;
78
+ for (AckedClusterStateUpdateTask task : tasks ) {
79
+ try {
80
+ state = task .execute (state );
81
+ builder .success (task );
82
+ } catch (Exception e ) {
83
+ builder .failure (task , e );
84
+ }
85
+ }
86
+ if (state != currentState ) {
87
+ // reroute in case things change that require it (like number of replicas)
88
+ state = allocationService .reroute (state , "settings update" );
89
+ }
90
+ return builder .build (state );
91
+ };
75
92
}
76
93
77
94
public void updateSettings (final UpdateSettingsClusterStateUpdateRequest request , final ActionListener <AcknowledgedResponse > listener ) {
@@ -105,149 +122,149 @@ public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request
105
122
final Settings openSettings = settingsForOpenIndices .build ();
106
123
final boolean preserveExisting = request .isPreserveExisting ();
107
124
108
- clusterService .submitStateUpdateTask (
109
- "update-settings " + Arrays .toString (request .indices ()),
110
- new AckedClusterStateUpdateTask (Priority .URGENT , request , wrapPreservingContext (listener , threadPool .getThreadContext ())) {
125
+ // TODO: move this to custom class instead of AckedClusterStateUpdateTask
126
+ AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask (
127
+ Priority .URGENT ,
128
+ request ,
129
+ wrapPreservingContext (listener , threadPool .getThreadContext ())
130
+ ) {
131
+ @ Override
132
+ public ClusterState execute (ClusterState currentState ) {
133
+ RoutingTable .Builder routingTableBuilder = null ;
134
+ Metadata .Builder metadataBuilder = Metadata .builder (currentState .metadata ());
111
135
112
- @ Override
113
- public ClusterState execute (ClusterState currentState ) {
114
-
115
- RoutingTable .Builder routingTableBuilder = null ;
116
- Metadata .Builder metadataBuilder = Metadata .builder (currentState .metadata ());
117
-
118
- // allow to change any settings to a close index, and only allow dynamic settings to be changed
119
- // on an open index
120
- Set <Index > openIndices = new HashSet <>();
121
- Set <Index > closeIndices = new HashSet <>();
122
- final String [] actualIndices = new String [request .indices ().length ];
123
- for (int i = 0 ; i < request .indices ().length ; i ++) {
124
- Index index = request .indices ()[i ];
125
- actualIndices [i ] = index .getName ();
126
- final IndexMetadata metadata = currentState .metadata ().getIndexSafe (index );
127
- if (metadata .getState () == IndexMetadata .State .OPEN ) {
128
- openIndices .add (index );
129
- } else {
130
- closeIndices .add (index );
131
- }
136
+ // allow to change any settings to a closed index, and only allow dynamic settings to be changed
137
+ // on an open index
138
+ Set <Index > openIndices = new HashSet <>();
139
+ Set <Index > closedIndices = new HashSet <>();
140
+ final String [] actualIndices = new String [request .indices ().length ];
141
+ for (int i = 0 ; i < request .indices ().length ; i ++) {
142
+ Index index = request .indices ()[i ];
143
+ actualIndices [i ] = index .getName ();
144
+ final IndexMetadata metadata = currentState .metadata ().getIndexSafe (index );
145
+ if (metadata .getState () == IndexMetadata .State .OPEN ) {
146
+ openIndices .add (index );
147
+ } else {
148
+ closedIndices .add (index );
132
149
}
150
+ }
133
151
134
- if (skippedSettings .isEmpty () == false && openIndices .isEmpty () == false ) {
135
- throw new IllegalArgumentException (
136
- String .format (
137
- Locale .ROOT ,
138
- "Can't update non dynamic settings [%s] for open indices %s" ,
139
- skippedSettings ,
140
- openIndices
141
- )
142
- );
143
- }
152
+ if (skippedSettings .isEmpty () == false && openIndices .isEmpty () == false ) {
153
+ throw new IllegalArgumentException (
154
+ String .format (
155
+ Locale .ROOT ,
156
+ "Can't update non dynamic settings [%s] for open indices %s" ,
157
+ skippedSettings ,
158
+ openIndices
159
+ )
160
+ );
161
+ }
144
162
145
- if (IndexMetadata .INDEX_NUMBER_OF_REPLICAS_SETTING .exists (openSettings )) {
146
- final int updatedNumberOfReplicas = IndexMetadata .INDEX_NUMBER_OF_REPLICAS_SETTING .get (openSettings );
147
- if (preserveExisting == false ) {
148
- // Verify that this won't take us over the cluster shard limit.
149
- shardLimitValidator .validateShardLimitOnReplicaUpdate (currentState , request .indices (), updatedNumberOfReplicas );
163
+ if (IndexMetadata .INDEX_NUMBER_OF_REPLICAS_SETTING .exists (openSettings )) {
164
+ final int updatedNumberOfReplicas = IndexMetadata .INDEX_NUMBER_OF_REPLICAS_SETTING .get (openSettings );
165
+ if (preserveExisting == false ) {
166
+ // Verify that this won't take us over the cluster shard limit.
167
+ shardLimitValidator .validateShardLimitOnReplicaUpdate (currentState , request .indices (), updatedNumberOfReplicas );
150
168
151
- /*
152
- * We do not update the in-sync allocation IDs as they will be removed upon the first index operation
153
- * which makes these copies stale.
154
- *
155
- * TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
156
- */
157
- routingTableBuilder = RoutingTable .builder (currentState .routingTable ());
158
- routingTableBuilder .updateNumberOfReplicas (updatedNumberOfReplicas , actualIndices );
159
- metadataBuilder .updateNumberOfReplicas (updatedNumberOfReplicas , actualIndices );
160
- logger .info ("updating number_of_replicas to [{}] for indices {}" , updatedNumberOfReplicas , actualIndices );
161
- }
169
+ /*
170
+ * We do not update the in-sync allocation IDs as they will be removed upon the first index operation
171
+ * which makes these copies stale.
172
+ *
173
+ * TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
174
+ */
175
+ routingTableBuilder = RoutingTable .builder (currentState .routingTable ());
176
+ routingTableBuilder .updateNumberOfReplicas (updatedNumberOfReplicas , actualIndices );
177
+ metadataBuilder .updateNumberOfReplicas (updatedNumberOfReplicas , actualIndices );
178
+ logger .info ("updating number_of_replicas to [{}] for indices {}" , updatedNumberOfReplicas , actualIndices );
162
179
}
180
+ }
163
181
164
- updateIndexSettings (
165
- openIndices ,
166
- metadataBuilder ,
167
- (index , indexSettings ) -> indexScopedSettings .updateDynamicSettings (
168
- openSettings ,
169
- indexSettings ,
170
- Settings .builder (),
171
- index .getName ()
172
- ),
173
- preserveExisting ,
174
- indexScopedSettings
175
- );
182
+ updateIndexSettings (
183
+ openIndices ,
184
+ metadataBuilder ,
185
+ (index , indexSettings ) -> indexScopedSettings .updateDynamicSettings (
186
+ openSettings ,
187
+ indexSettings ,
188
+ Settings .builder (),
189
+ index .getName ()
190
+ ),
191
+ preserveExisting ,
192
+ indexScopedSettings
193
+ );
176
194
177
- updateIndexSettings (
178
- closeIndices ,
179
- metadataBuilder ,
180
- (index , indexSettings ) -> indexScopedSettings .updateSettings (
181
- closedSettings ,
182
- indexSettings ,
183
- Settings .builder (),
184
- index .getName ()
185
- ),
186
- preserveExisting ,
187
- indexScopedSettings
188
- );
195
+ updateIndexSettings (
196
+ closedIndices ,
197
+ metadataBuilder ,
198
+ (index , indexSettings ) -> indexScopedSettings .updateSettings (
199
+ closedSettings ,
200
+ indexSettings ,
201
+ Settings .builder (),
202
+ index .getName ()
203
+ ),
204
+ preserveExisting ,
205
+ indexScopedSettings
206
+ );
189
207
190
- if (IndexSettings .INDEX_TRANSLOG_RETENTION_AGE_SETTING .exists (normalizedSettings )
191
- || IndexSettings .INDEX_TRANSLOG_RETENTION_SIZE_SETTING .exists (normalizedSettings )) {
192
- for (String index : actualIndices ) {
193
- final Settings settings = metadataBuilder .get (index ).getSettings ();
194
- MetadataCreateIndexService .validateTranslogRetentionSettings (settings );
195
- MetadataCreateIndexService .validateStoreTypeSetting (settings );
196
- }
208
+ if (IndexSettings .INDEX_TRANSLOG_RETENTION_AGE_SETTING .exists (normalizedSettings )
209
+ || IndexSettings .INDEX_TRANSLOG_RETENTION_SIZE_SETTING .exists (normalizedSettings )) {
210
+ for (String index : actualIndices ) {
211
+ final Settings settings = metadataBuilder .get (index ).getSettings ();
212
+ MetadataCreateIndexService .validateTranslogRetentionSettings (settings );
213
+ MetadataCreateIndexService .validateStoreTypeSetting (settings );
197
214
}
198
- boolean changed = false ;
199
- // increment settings versions
200
- for ( final String index : actualIndices ) {
201
- if ( same ( currentState . metadata (). index ( index ). getSettings (), metadataBuilder . get ( index ). getSettings ()) == false ) {
202
- changed = true ;
203
- final IndexMetadata . Builder builder = IndexMetadata . builder ( metadataBuilder . get ( index )) ;
204
- builder . settingsVersion ( 1 + builder . settingsVersion ( ));
205
- metadataBuilder . put ( builder );
206
- }
215
+ }
216
+ boolean changed = false ;
217
+ // increment settings versions
218
+ for ( final String index : actualIndices ) {
219
+ if ( same ( currentState . metadata (). index ( index ). getSettings (), metadataBuilder . get ( index ). getSettings ()) == false ) {
220
+ changed = true ;
221
+ final IndexMetadata . Builder builder = IndexMetadata . builder ( metadataBuilder . get ( index ));
222
+ builder . settingsVersion ( 1 + builder . settingsVersion () );
223
+ metadataBuilder . put ( builder );
207
224
}
225
+ }
208
226
209
- final ClusterBlocks .Builder blocks = ClusterBlocks .builder ().blocks (currentState .blocks ());
210
- boolean changedBlocks = false ;
211
- for (IndexMetadata .APIBlock block : IndexMetadata .APIBlock .values ()) {
212
- changedBlocks |= maybeUpdateClusterBlock (actualIndices , blocks , block .block , block .setting , openSettings );
213
- }
214
- changed |= changedBlocks ;
227
+ final ClusterBlocks .Builder blocks = ClusterBlocks .builder ().blocks (currentState .blocks ());
228
+ boolean changedBlocks = false ;
229
+ for (IndexMetadata .APIBlock block : IndexMetadata .APIBlock .values ()) {
230
+ changedBlocks |= maybeUpdateClusterBlock (actualIndices , blocks , block .block , block .setting , openSettings );
231
+ }
232
+ changed |= changedBlocks ;
215
233
216
- if (changed == false ) {
217
- return currentState ;
218
- }
234
+ if (changed == false ) {
235
+ return currentState ;
236
+ }
219
237
220
- ClusterState updatedState = ClusterState .builder (currentState )
221
- .metadata (metadataBuilder )
222
- .routingTable (routingTableBuilder == null ? currentState .routingTable () : routingTableBuilder .build ())
223
- .blocks (changedBlocks ? blocks .build () : currentState .blocks ())
224
- .build ();
238
+ ClusterState updatedState = ClusterState .builder (currentState )
239
+ .metadata (metadataBuilder )
240
+ .routingTable (routingTableBuilder == null ? currentState .routingTable () : routingTableBuilder .build ())
241
+ .blocks (changedBlocks ? blocks .build () : currentState .blocks ())
242
+ .build ();
225
243
226
- // now, reroute in case things change that require it (like number of replicas)
227
- updatedState = allocationService .reroute (updatedState , "settings update" );
228
- try {
229
- for (Index index : openIndices ) {
230
- final IndexMetadata currentMetadata = currentState .getMetadata ().getIndexSafe (index );
231
- final IndexMetadata updatedMetadata = updatedState .metadata ().getIndexSafe (index );
232
- indicesService .verifyIndexMetadata (currentMetadata , updatedMetadata );
233
- }
234
- for (Index index : closeIndices ) {
235
- final IndexMetadata currentMetadata = currentState .getMetadata ().getIndexSafe (index );
236
- final IndexMetadata updatedMetadata = updatedState .metadata ().getIndexSafe (index );
237
- // Verifies that the current index settings can be updated with the updated dynamic settings.
238
- indicesService .verifyIndexMetadata (currentMetadata , updatedMetadata );
239
- // Now check that we can create the index with the updated settings (dynamic and non-dynamic).
240
- // This step is mandatory since we allow to update non-dynamic settings on closed indices.
241
- indicesService .verifyIndexMetadata (updatedMetadata , updatedMetadata );
242
- }
243
- } catch (IOException ex ) {
244
- throw ExceptionsHelper .convertToElastic (ex );
244
+ try {
245
+ for (Index index : openIndices ) {
246
+ final IndexMetadata currentMetadata = currentState .metadata ().getIndexSafe (index );
247
+ final IndexMetadata updatedMetadata = updatedState .metadata ().getIndexSafe (index );
248
+ indicesService .verifyIndexMetadata (currentMetadata , updatedMetadata );
245
249
}
246
- return updatedState ;
250
+ for (Index index : closedIndices ) {
251
+ final IndexMetadata currentMetadata = currentState .metadata ().getIndexSafe (index );
252
+ final IndexMetadata updatedMetadata = updatedState .metadata ().getIndexSafe (index );
253
+ // Verifies that the current index settings can be updated with the updated dynamic settings.
254
+ indicesService .verifyIndexMetadata (currentMetadata , updatedMetadata );
255
+ // Now check that we can create the index with the updated settings (dynamic and non-dynamic).
256
+ // This step is mandatory since we allow to update non-dynamic settings on closed indices.
257
+ indicesService .verifyIndexMetadata (updatedMetadata , updatedMetadata );
258
+ }
259
+ } catch (IOException ex ) {
260
+ throw ExceptionsHelper .convertToElastic (ex );
247
261
}
248
- },
249
- ClusterStateTaskExecutor .unbatched ()
250
- );
262
+
263
+ return updatedState ;
264
+ }
265
+ };
266
+
267
+ clusterService .submitStateUpdateTask ("update-settings " + Arrays .toString (request .indices ()), clusterTask , this .executor );
251
268
}
252
269
253
270
public static void updateIndexSettings (
@@ -256,7 +273,6 @@ public static void updateIndexSettings(
256
273
BiFunction <Index , Settings .Builder , Boolean > settingUpdater ,
257
274
Boolean preserveExisting ,
258
275
IndexScopedSettings indexScopedSettings
259
-
260
276
) {
261
277
for (Index index : indices ) {
262
278
IndexMetadata indexMetadata = metadataBuilder .getSafe (index );
0 commit comments