19
19
20
20
package org .elasticsearch .cluster .routing .allocation .decider ;
21
21
22
- import org .apache .lucene .mockfile .FilterFileStore ;
23
- import org .apache .lucene .mockfile .FilterFileSystemProvider ;
24
- import org .apache .lucene .mockfile .FilterPath ;
25
- import org .apache .lucene .util .Constants ;
26
22
import org .elasticsearch .action .admin .cluster .snapshots .create .CreateSnapshotResponse ;
27
23
import org .elasticsearch .action .admin .cluster .snapshots .restore .RestoreSnapshotResponse ;
28
24
import org .elasticsearch .action .admin .indices .stats .ShardStats ;
29
25
import org .elasticsearch .action .index .IndexRequestBuilder ;
30
26
import org .elasticsearch .cluster .ClusterInfoService ;
27
+ import org .elasticsearch .cluster .DiskUsageIntegTestCase ;
31
28
import org .elasticsearch .cluster .InternalClusterInfoService ;
32
29
import org .elasticsearch .cluster .metadata .IndexMetadata ;
33
30
import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
37
34
import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider .Rebalance ;
38
35
import org .elasticsearch .cluster .service .ClusterService ;
39
36
import org .elasticsearch .common .Priority ;
40
- import org .elasticsearch .common .io .PathUtils ;
41
- import org .elasticsearch .common .io .PathUtilsForTesting ;
42
37
import org .elasticsearch .common .settings .Settings ;
43
38
import org .elasticsearch .common .unit .ByteSizeUnit ;
44
39
import org .elasticsearch .common .unit .ByteSizeValue ;
45
- import org .elasticsearch .core .internal .io .IOUtils ;
46
- import org .elasticsearch .env .Environment ;
47
40
import org .elasticsearch .env .NodeEnvironment ;
48
- import org .elasticsearch .monitor .fs .FsService ;
49
- import org .elasticsearch .plugins .Plugin ;
50
41
import org .elasticsearch .repositories .fs .FsRepository ;
51
42
import org .elasticsearch .snapshots .RestoreInfo ;
52
43
import org .elasticsearch .snapshots .SnapshotInfo ;
53
44
import org .elasticsearch .snapshots .SnapshotState ;
54
45
import org .elasticsearch .test .ESIntegTestCase ;
55
- import org .elasticsearch .test .InternalSettingsPlugin ;
56
46
import org .hamcrest .Matcher ;
57
- import org .junit .After ;
58
- import org .junit .Before ;
59
-
60
- import java .io .FileNotFoundException ;
61
- import java .io .IOException ;
62
- import java .nio .file .DirectoryStream ;
63
- import java .nio .file .FileStore ;
64
- import java .nio .file .FileSystem ;
65
- import java .nio .file .Files ;
66
- import java .nio .file .NoSuchFileException ;
67
- import java .nio .file .NotDirectoryException ;
68
- import java .nio .file .Path ;
47
+
69
48
import java .util .Arrays ;
70
- import java .util .Collection ;
71
49
import java .util .HashSet ;
72
- import java .util .List ;
73
50
import java .util .Locale ;
74
- import java .util .Map ;
75
51
import java .util .Set ;
76
52
import java .util .concurrent .TimeUnit ;
77
- import java .util .stream .Collectors ;
78
53
import java .util .stream .StreamSupport ;
79
54
80
- import static org .elasticsearch .common .util .concurrent .ConcurrentCollections .newConcurrentMap ;
81
55
import static org .elasticsearch .index .store .Store .INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING ;
82
56
import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
83
- import static org .hamcrest .Matchers .anyOf ;
84
57
import static org .hamcrest .Matchers .empty ;
85
58
import static org .hamcrest .Matchers .equalTo ;
86
- import static org .hamcrest .Matchers .greaterThan ;
87
59
import static org .hamcrest .Matchers .hasSize ;
88
60
import static org .hamcrest .Matchers .is ;
89
61
90
62
@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
91
- public class DiskThresholdDeciderIT extends ESIntegTestCase {
92
-
93
- private static TestFileSystemProvider fileSystemProvider ;
94
-
95
- private FileSystem defaultFileSystem ;
96
-
97
- @ Before
98
- public void installFilesystemProvider () {
99
- assertNull (defaultFileSystem );
100
- defaultFileSystem = PathUtils .getDefaultFileSystem ();
101
- assertNull (fileSystemProvider );
102
- fileSystemProvider = new TestFileSystemProvider (defaultFileSystem , createTempDir ());
103
- PathUtilsForTesting .installMock (fileSystemProvider .getFileSystem (null ));
104
- }
105
-
106
- @ After
107
- public void removeFilesystemProvider () {
108
- fileSystemProvider = null ;
109
- assertNotNull (defaultFileSystem );
110
- PathUtilsForTesting .installMock (defaultFileSystem ); // set the default filesystem back
111
- defaultFileSystem = null ;
112
- }
63
+ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
113
64
114
65
private static final long WATERMARK_BYTES = new ByteSizeValue (10 , ByteSizeUnit .KB ).getBytes ();
115
66
116
67
@ Override
117
68
protected Settings nodeSettings (int nodeOrdinal ) {
118
- final Path dataPath = fileSystemProvider .getRootDir ().resolve ("node-" + nodeOrdinal );
119
- try {
120
- Files .createDirectories (dataPath );
121
- } catch (IOException e ) {
122
- throw new AssertionError ("unexpected" , e );
123
- }
124
- fileSystemProvider .addTrackedPath (dataPath );
125
69
return Settings .builder ()
126
70
.put (super .nodeSettings (nodeOrdinal ))
127
- .put (Environment .PATH_DATA_SETTING .getKey (), dataPath )
128
- .put (FsService .ALWAYS_REFRESH_SETTING .getKey (), true )
129
71
.put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING .getKey (), WATERMARK_BYTES + "b" )
130
72
.put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING .getKey (), WATERMARK_BYTES + "b" )
131
73
.put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING .getKey (), "0b" )
132
74
.put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING .getKey (), "0ms" )
133
75
.build ();
134
76
}
135
77
136
- @ Override
137
- protected Collection <Class <? extends Plugin >> nodePlugins () {
138
- return List .of (InternalSettingsPlugin .class );
139
- }
140
-
141
78
public void testHighWatermarkNotExceeded () throws Exception {
142
79
internalCluster ().startMasterOnlyNode ();
143
80
internalCluster ().startDataOnlyNode ();
@@ -149,7 +86,6 @@ public void testHighWatermarkNotExceeded() throws Exception {
149
86
internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).addListener (event -> clusterInfoService .refresh ());
150
87
151
88
final String dataNode0Id = internalCluster ().getInstance (NodeEnvironment .class , dataNodeName ).nodeId ();
152
- final Path dataNode0Path = internalCluster ().getInstance (Environment .class , dataNodeName ).dataFiles ()[0 ];
153
89
154
90
final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
155
91
createIndex (indexName , Settings .builder ()
@@ -161,11 +97,11 @@ public void testHighWatermarkNotExceeded() throws Exception {
161
97
162
98
// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
163
99
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
164
- fileSystemProvider . getTestFileStore (dataNode0Path ).setTotalSpace (minShardSize + WATERMARK_BYTES - 1L );
100
+ getTestFileStore (dataNodeName ).setTotalSpace (minShardSize + WATERMARK_BYTES - 1L );
165
101
assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , empty ());
166
102
167
103
// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
168
- fileSystemProvider . getTestFileStore (dataNode0Path ).setTotalSpace (minShardSize + WATERMARK_BYTES + 1L );
104
+ getTestFileStore (dataNodeName ).setTotalSpace (minShardSize + WATERMARK_BYTES + 1L );
169
105
assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , hasSize (1 ));
170
106
}
171
107
@@ -186,7 +122,6 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
186
122
internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).addListener (event -> clusterInfoService .refresh ());
187
123
188
124
final String dataNode0Id = internalCluster ().getInstance (NodeEnvironment .class , dataNodeName ).nodeId ();
189
- final Path dataNode0Path = internalCluster ().getInstance (Environment .class , dataNodeName ).dataFiles ()[0 ];
190
125
191
126
final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
192
127
createIndex (indexName , Settings .builder ()
@@ -205,7 +140,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
205
140
assertAcked (client ().admin ().indices ().prepareDelete (indexName ).get ());
206
141
207
142
// reduce disk size of node 0 so that no shards fit below the low watermark, forcing shards to be assigned to the other data node
208
- fileSystemProvider . getTestFileStore (dataNode0Path ).setTotalSpace (minShardSize + WATERMARK_BYTES - 1L );
143
+ getTestFileStore (dataNodeName ).setTotalSpace (minShardSize + WATERMARK_BYTES - 1L );
209
144
refreshDiskUsage ();
210
145
211
146
assertAcked (client ().admin ().cluster ().prepareUpdateSettings ()
@@ -229,7 +164,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
229
164
.get ());
230
165
231
166
// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
232
- fileSystemProvider . getTestFileStore (dataNode0Path ).setTotalSpace (minShardSize + WATERMARK_BYTES + 1L );
167
+ getTestFileStore (dataNodeName ).setTotalSpace (minShardSize + WATERMARK_BYTES + 1L );
233
168
assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , hasSize (1 ));
234
169
}
235
170
@@ -307,132 +242,4 @@ private void assertBusyWithDiskUsageRefresh(
307
242
assertThat ("Mismatching shard routings: " + shardRoutings , shardRoutings , matcher );
308
243
}, 30L , TimeUnit .SECONDS );
309
244
}
310
-
311
- private static class TestFileStore extends FilterFileStore {
312
-
313
- private final Path path ;
314
-
315
- private volatile long totalSpace = -1 ;
316
-
317
- TestFileStore (FileStore delegate , String scheme , Path path ) {
318
- super (delegate , scheme );
319
- this .path = path ;
320
- }
321
-
322
- @ Override
323
- public String name () {
324
- return "fake" ; // Lucene's is-spinning-disk check expects the device name here
325
- }
326
-
327
- @ Override
328
- public long getTotalSpace () throws IOException {
329
- final long totalSpace = this .totalSpace ;
330
- if (totalSpace == -1 ) {
331
- return super .getTotalSpace ();
332
- } else {
333
- return totalSpace ;
334
- }
335
- }
336
-
337
- public void setTotalSpace (long totalSpace ) {
338
- assertThat (totalSpace , anyOf (is (-1L ), greaterThan (0L )));
339
- this .totalSpace = totalSpace ;
340
- }
341
-
342
- @ Override
343
- public long getUsableSpace () throws IOException {
344
- final long totalSpace = this .totalSpace ;
345
- if (totalSpace == -1 ) {
346
- return super .getUsableSpace ();
347
- } else {
348
- return Math .max (0L , totalSpace - getTotalFileSize (path ));
349
- }
350
- }
351
-
352
- @ Override
353
- public long getUnallocatedSpace () throws IOException {
354
- final long totalSpace = this .totalSpace ;
355
- if (totalSpace == -1 ) {
356
- return super .getUnallocatedSpace ();
357
- } else {
358
- return Math .max (0L , totalSpace - getTotalFileSize (path ));
359
- }
360
- }
361
-
362
- private static long getTotalFileSize (Path path ) throws IOException {
363
- if (Files .isRegularFile (path )) {
364
- try {
365
- return Files .size (path );
366
- } catch (NoSuchFileException | FileNotFoundException e ) {
367
- // probably removed
368
- return 0L ;
369
- }
370
- } else if (path .getFileName ().toString ().equals ("_state" ) || path .getFileName ().toString ().equals ("translog" )) {
371
- // ignore metadata and translog, since the disk threshold decider only cares about the store size
372
- return 0L ;
373
- } else {
374
- try (DirectoryStream <Path > directoryStream = Files .newDirectoryStream (path )) {
375
- long total = 0L ;
376
- for (Path subpath : directoryStream ) {
377
- total += getTotalFileSize (subpath );
378
- }
379
- return total ;
380
- } catch (NotDirectoryException | NoSuchFileException | FileNotFoundException e ) {
381
- // probably removed
382
- return 0L ;
383
- }
384
- }
385
- }
386
- }
387
-
388
- private static class TestFileSystemProvider extends FilterFileSystemProvider {
389
- private final Map <Path , TestFileStore > trackedPaths = newConcurrentMap ();
390
- private final Path rootDir ;
391
-
392
- TestFileSystemProvider (FileSystem delegateInstance , Path rootDir ) {
393
- super ("diskthreshold://" , delegateInstance );
394
- this .rootDir = new FilterPath (rootDir , fileSystem );
395
- }
396
-
397
- Path getRootDir () {
398
- return rootDir ;
399
- }
400
-
401
- void addTrackedPath (Path path ) {
402
- assertTrue (path + " starts with " + rootDir , path .startsWith (rootDir ));
403
- final FileStore fileStore ;
404
- try {
405
- fileStore = super .getFileStore (path );
406
- } catch (IOException e ) {
407
- throw new AssertionError ("unexpected" , e );
408
- }
409
- assertNull (trackedPaths .put (path , new TestFileStore (fileStore , getScheme (), path )));
410
- }
411
-
412
- @ Override
413
- public FileStore getFileStore (Path path ) {
414
- return getTestFileStore (path );
415
- }
416
-
417
- TestFileStore getTestFileStore (Path path ) {
418
- final TestFileStore fileStore = trackedPaths .get (path );
419
- if (fileStore != null ) {
420
- return fileStore ;
421
- }
422
-
423
- // On Linux, and only Linux, Lucene obtains a filestore for the index in order to determine whether it's on a spinning disk or
424
- // not so it can configure the merge scheduler accordingly
425
- assertTrue (path + " not tracked and not on Linux" , Constants .LINUX );
426
- final Set <Path > containingPaths = trackedPaths .keySet ().stream ().filter (path ::startsWith ).collect (Collectors .toSet ());
427
- assertThat (path + " not contained in a unique tracked path" , containingPaths , hasSize (1 ));
428
- return trackedPaths .get (containingPaths .iterator ().next ());
429
- }
430
-
431
- void clearTrackedPaths () throws IOException {
432
- for (Path path : trackedPaths .keySet ()) {
433
- IOUtils .rm (path );
434
- }
435
- trackedPaths .clear ();
436
- }
437
- }
438
245
}
0 commit comments