5
5
*/
6
6
package org .elasticsearch .xpack .core .indexlifecycle ;
7
7
8
+ import org .apache .logging .log4j .LogManager ;
9
+ import org .apache .logging .log4j .Logger ;
8
10
import org .elasticsearch .action .ActionListener ;
11
+ import org .elasticsearch .action .admin .indices .segments .IndexSegments ;
9
12
import org .elasticsearch .action .admin .indices .segments .IndicesSegmentsRequest ;
13
+ import org .elasticsearch .action .admin .indices .segments .ShardSegments ;
10
14
import org .elasticsearch .client .Client ;
11
15
import org .elasticsearch .cluster .metadata .IndexMetaData ;
16
+ import org .elasticsearch .cluster .routing .ShardRouting ;
12
17
import org .elasticsearch .common .ParseField ;
13
18
import org .elasticsearch .common .Strings ;
14
19
import org .elasticsearch .common .xcontent .ConstructingObjectParser ;
17
22
18
23
import java .io .IOException ;
19
24
import java .util .Arrays ;
25
+ import java .util .List ;
26
+ import java .util .Map ;
20
27
import java .util .Objects ;
21
- import java .util .stream .StreamSupport ;
28
+ import java .util .stream .Collectors ;
22
29
23
30
/**
24
31
* This {@link Step} evaluates whether force_merge was successful by checking the segment count.
25
32
*/
26
33
public class SegmentCountStep extends AsyncWaitStep {
34
+
35
+ private static final Logger logger = LogManager .getLogger (SegmentCountStep .class );
27
36
public static final String NAME = "segment-count" ;
28
37
29
38
private final int maxNumSegments ;
@@ -41,10 +50,19 @@ public int getMaxNumSegments() {
41
50
public void evaluateCondition (IndexMetaData indexMetaData , Listener listener ) {
42
51
getClient ().admin ().indices ().segments (new IndicesSegmentsRequest (indexMetaData .getIndex ().getName ()),
43
52
ActionListener .wrap (response -> {
44
- long numberShardsLeftToMerge =
45
- StreamSupport .stream (response .getIndices ().get (indexMetaData .getIndex ().getName ()).spliterator (), false )
46
- .filter (iss -> Arrays .stream (iss .getShards ()).anyMatch (p -> p .getSegments ().size () > maxNumSegments )).count ();
47
- listener .onResponse (numberShardsLeftToMerge == 0 , new Info (numberShardsLeftToMerge ));
53
+ IndexSegments segments = response .getIndices ().get (indexMetaData .getIndex ().getName ());
54
+ List <ShardSegments > unmergedShards = segments .getShards ().values ().stream ()
55
+ .flatMap (iss -> Arrays .stream (iss .getShards ()))
56
+ .filter (shardSegments -> shardSegments .getSegments ().size () > maxNumSegments )
57
+ .collect (Collectors .toList ());
58
+ if (unmergedShards .size () > 0 ) {
59
+ Map <ShardRouting , Integer > unmergedShardCounts = unmergedShards .stream ()
60
+ .collect (Collectors .toMap (ShardSegments ::getShardRouting , ss -> ss .getSegments ().size ()));
61
+ logger .info ("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}" ,
62
+ indexMetaData .getIndex ().getName (), maxNumSegments , unmergedShards .size (), unmergedShardCounts );
63
+ }
64
+ // Force merging is best effort, so always return true that the condition has been met.
65
+ listener .onResponse (true , new Info (unmergedShards .size ()));
48
66
}, listener ::onFailure ));
49
67
}
50
68
@@ -90,8 +108,12 @@ public long getNumberShardsLeftToMerge() {
90
108
@ Override
91
109
public XContentBuilder toXContent (XContentBuilder builder , Params params ) throws IOException {
92
110
builder .startObject ();
93
- builder .field (MESSAGE .getPreferredName (),
94
- "Waiting for [" + numberShardsLeftToMerge + "] shards " + "to forcemerge" );
111
+ if (numberShardsLeftToMerge == 0 ) {
112
+ builder .field (MESSAGE .getPreferredName (), "all shards force merged successfully" );
113
+ } else {
114
+ builder .field (MESSAGE .getPreferredName (),
115
+ "[" + numberShardsLeftToMerge + "] shards did not successfully force merge" );
116
+ }
95
117
builder .field (SHARDS_TO_MERGE .getPreferredName (), numberShardsLeftToMerge );
96
118
builder .endObject ();
97
119
return builder ;
0 commit comments