23
23
import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
24
24
import org .apache .logging .log4j .LogManager ;
25
25
import org .apache .logging .log4j .Logger ;
26
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
27
+ import org .elasticsearch .Version ;
26
28
import org .elasticsearch .action .ActionListener ;
27
29
import org .elasticsearch .action .support .GroupedActionListener ;
28
30
import org .elasticsearch .client .Client ;
33
35
import org .elasticsearch .cluster .metadata .IndexMetaData ;
34
36
import org .elasticsearch .cluster .routing .RerouteService ;
35
37
import org .elasticsearch .cluster .routing .RoutingNode ;
38
+ import org .elasticsearch .cluster .routing .RoutingNodes ;
36
39
import org .elasticsearch .cluster .routing .ShardRouting ;
37
40
import org .elasticsearch .common .Priority ;
38
41
import org .elasticsearch .common .Strings ;
39
42
import org .elasticsearch .common .collect .ImmutableOpenMap ;
43
+ import org .elasticsearch .common .logging .DeprecationLogger ;
40
44
import org .elasticsearch .common .settings .ClusterSettings ;
41
45
import org .elasticsearch .common .settings .Settings ;
42
46
import org .elasticsearch .common .util .set .Sets ;
47
51
import java .util .concurrent .atomic .AtomicLong ;
48
52
import java .util .function .LongSupplier ;
49
53
import java .util .function .Supplier ;
54
+ import java .util .stream .Collectors ;
55
+ import java .util .stream .StreamSupport ;
50
56
51
57
/**
52
58
* Listens for a node to go over the high watermark and kicks off an empty
@@ -65,6 +71,7 @@ public class DiskThresholdMonitor {
65
71
private final RerouteService rerouteService ;
66
72
private final AtomicLong lastRunTimeMillis = new AtomicLong (Long .MIN_VALUE );
67
73
private final AtomicBoolean checkInProgress = new AtomicBoolean ();
74
+ private final DeprecationLogger deprecationLogger = new DeprecationLogger (logger );
68
75
69
76
public DiskThresholdMonitor (Settings settings , Supplier <ClusterState > clusterStateSupplier , ClusterSettings clusterSettings ,
70
77
Client client , LongSupplier currentTimeMillisSupplier , RerouteService rerouteService ) {
@@ -73,6 +80,10 @@ public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterSta
73
80
this .rerouteService = rerouteService ;
74
81
this .diskThresholdSettings = new DiskThresholdSettings (settings , clusterSettings );
75
82
this .client = client ;
83
+ if (diskThresholdSettings .isAutoReleaseIndexEnabled () == false ) {
84
+ deprecationLogger .deprecated ("[{}] will be removed in version {}" ,
85
+ DiskThresholdSettings .AUTO_RELEASE_INDEX_ENABLED_KEY , Version .V_7_4_0 .major + 1 );
86
+ }
76
87
}
77
88
78
89
/**
@@ -136,21 +147,33 @@ public void onNewInfo(ClusterInfo info) {
136
147
}
137
148
final ClusterState state = clusterStateSupplier .get ();
138
149
final Set <String > indicesToMarkReadOnly = new HashSet <>();
150
+ RoutingNodes routingNodes = state .getRoutingNodes ();
151
+ Set <String > indicesNotToAutoRelease = new HashSet <>();
152
+ markNodesMissingUsageIneligibleForRelease (routingNodes , usages , indicesNotToAutoRelease );
139
153
140
154
for (final ObjectObjectCursor <String , DiskUsage > entry : usages ) {
141
155
final String node = entry .key ;
142
156
final DiskUsage usage = entry .value ;
143
157
warnAboutDiskIfNeeded (usage );
158
+ RoutingNode routingNode = routingNodes .node (node );
159
+ // Only unblock index if all nodes that contain shards of it are below the high disk watermark
144
160
if (usage .getFreeBytes () < diskThresholdSettings .getFreeBytesThresholdFloodStage ().getBytes () ||
145
161
usage .getFreeDiskAsPercentage () < diskThresholdSettings .getFreeDiskThresholdFloodStage ()) {
146
- final RoutingNode routingNode = state .getRoutingNodes ().node (node );
147
162
if (routingNode != null ) { // this might happen if we haven't got the full cluster-state yet?!
148
163
for (ShardRouting routing : routingNode ) {
149
- indicesToMarkReadOnly .add (routing .index ().getName ());
164
+ String indexName = routing .index ().getName ();
165
+ indicesToMarkReadOnly .add (indexName );
166
+ indicesNotToAutoRelease .add (indexName );
150
167
}
151
168
}
152
169
} else if (usage .getFreeBytes () < diskThresholdSettings .getFreeBytesThresholdHigh ().getBytes () ||
153
170
usage .getFreeDiskAsPercentage () < diskThresholdSettings .getFreeDiskThresholdHigh ()) {
171
+ if (routingNode != null ) {
172
+ for (ShardRouting routing : routingNode ) {
173
+ String indexName = routing .index ().getName ();
174
+ indicesNotToAutoRelease .add (indexName );
175
+ }
176
+ }
154
177
if (lastRunTimeMillis .get () < currentTimeMillis - diskThresholdSettings .getRerouteInterval ().millis ()) {
155
178
reroute = true ;
156
179
explanation = "high disk watermark exceeded on one or more nodes" ;
@@ -182,7 +205,7 @@ public void onNewInfo(ClusterInfo info) {
182
205
}
183
206
}
184
207
185
- final ActionListener <Void > listener = new GroupedActionListener <>(ActionListener .wrap (this ::checkFinished ), 2 );
208
+ final ActionListener <Void > listener = new GroupedActionListener <>(ActionListener .wrap (this ::checkFinished ), 3 );
186
209
187
210
if (reroute ) {
188
211
logger .info ("rerouting shards: [{}]" , explanation );
@@ -197,30 +220,70 @@ public void onNewInfo(ClusterInfo info) {
197
220
} else {
198
221
listener .onResponse (null );
199
222
}
223
+ Set <String > indicesToAutoRelease = StreamSupport .stream (state .routingTable ().indicesRouting ()
224
+ .spliterator (), false )
225
+ .map (c -> c .key )
226
+ .filter (index -> indicesNotToAutoRelease .contains (index ) == false )
227
+ .filter (index -> state .getBlocks ().hasIndexBlock (index , IndexMetaData .INDEX_READ_ONLY_ALLOW_DELETE_BLOCK ))
228
+ .collect (Collectors .toSet ());
229
+
230
+ if (indicesToAutoRelease .isEmpty () == false ) {
231
+ if (diskThresholdSettings .isAutoReleaseIndexEnabled ()) {
232
+ logger .info ("releasing read-only-allow-delete block on indices: [{}]" , indicesToAutoRelease );
233
+ updateIndicesReadOnly (indicesToAutoRelease , listener , false );
234
+ } else {
235
+ deprecationLogger .deprecated ("[{}] will be removed in version {}" ,
236
+ DiskThresholdSettings .AUTO_RELEASE_INDEX_ENABLED_KEY , Version .V_7_4_0 .major + 1 );
237
+ logger .debug ("[{}] disabled, not releasing read-only-allow-delete block on indices: [{}]" ,
238
+ DiskThresholdSettings .AUTO_RELEASE_INDEX_ENABLED_KEY , indicesToAutoRelease );
239
+ listener .onResponse (null );
240
+ }
241
+ } else {
242
+ listener .onResponse (null );
243
+ }
200
244
201
245
indicesToMarkReadOnly .removeIf (index -> state .getBlocks ().indexBlocked (ClusterBlockLevel .WRITE , index ));
202
246
if (indicesToMarkReadOnly .isEmpty () == false ) {
203
- markIndicesReadOnly (indicesToMarkReadOnly , ActionListener .wrap (r -> {
204
- setLastRunTimeMillis ();
205
- listener .onResponse (r );
206
- }, e -> {
207
- logger .debug ("marking indices readonly failed" , e );
208
- setLastRunTimeMillis ();
209
- listener .onFailure (e );
210
- }));
247
+ updateIndicesReadOnly (indicesToMarkReadOnly , listener , true );
211
248
} else {
212
249
listener .onResponse (null );
213
250
}
214
251
}
215
252
253
+ private void markNodesMissingUsageIneligibleForRelease (RoutingNodes routingNodes , ImmutableOpenMap <String , DiskUsage > usages ,
254
+ Set <String > indicesToMarkIneligibleForAutoRelease ) {
255
+ for (RoutingNode routingNode : routingNodes ) {
256
+ if (usages .containsKey (routingNode .nodeId ()) == false ) {
257
+ if (routingNode != null ) {
258
+ for (ShardRouting routing : routingNode ) {
259
+ String indexName = routing .index ().getName ();
260
+ indicesToMarkIneligibleForAutoRelease .add (indexName );
261
+ }
262
+ }
263
+ }
264
+ }
265
+
266
+ }
267
+
216
268
private void setLastRunTimeMillis () {
217
269
lastRunTimeMillis .getAndUpdate (l -> Math .max (l , currentTimeMillisSupplier .getAsLong ()));
218
270
}
219
271
220
- protected void markIndicesReadOnly (Set <String > indicesToMarkReadOnly , ActionListener <Void > listener ) {
272
+ protected void updateIndicesReadOnly (Set <String > indicesToUpdate , ActionListener <Void > listener , boolean readOnly ) {
221
273
// set read-only block but don't block on the response
222
- client .admin ().indices ().prepareUpdateSettings (indicesToMarkReadOnly .toArray (Strings .EMPTY_ARRAY ))
223
- .setSettings (Settings .builder ().put (IndexMetaData .SETTING_READ_ONLY_ALLOW_DELETE , true ).build ())
224
- .execute (ActionListener .map (listener , r -> null ));
274
+ ActionListener <Void > wrappedListener = ActionListener .wrap (r -> {
275
+ setLastRunTimeMillis ();
276
+ listener .onResponse (r );
277
+ }, e -> {
278
+ logger .debug (new ParameterizedMessage ("setting indices [{}] read-only failed" , readOnly ), e );
279
+ setLastRunTimeMillis ();
280
+ listener .onFailure (e );
281
+ });
282
+ Settings readOnlySettings = readOnly ? Settings .builder ()
283
+ .put (IndexMetaData .SETTING_READ_ONLY_ALLOW_DELETE , Boolean .TRUE .toString ()).build () :
284
+ Settings .builder ().putNull (IndexMetaData .SETTING_READ_ONLY_ALLOW_DELETE ).build ();
285
+ client .admin ().indices ().prepareUpdateSettings (indicesToUpdate .toArray (Strings .EMPTY_ARRAY ))
286
+ .setSettings (readOnlySettings )
287
+ .execute (ActionListener .map (wrappedListener , r -> null ));
225
288
}
226
289
}
0 commit comments