5
5
*/
6
6
package org .elasticsearch .xpack .upgrade ;
7
7
8
+ import org .apache .logging .log4j .LogManager ;
9
+ import org .apache .logging .log4j .Logger ;
10
+ import org .elasticsearch .ElasticsearchException ;
8
11
import org .elasticsearch .action .ActionListener ;
9
12
import org .elasticsearch .action .support .master .AcknowledgedResponse ;
10
13
import org .elasticsearch .client .Client ;
15
18
import org .elasticsearch .cluster .metadata .IndexMetaData ;
16
19
import org .elasticsearch .cluster .metadata .MetaData ;
17
20
import org .elasticsearch .cluster .service .ClusterService ;
21
+ import org .elasticsearch .common .Strings ;
18
22
import org .elasticsearch .common .settings .Settings ;
19
23
import org .elasticsearch .index .IndexNotFoundException ;
20
24
import org .elasticsearch .index .reindex .BulkByScrollResponse ;
25
29
import org .elasticsearch .transport .TransportResponse ;
26
30
27
31
import java .util .function .BiConsumer ;
28
- import java .util .function .Consumer ;
29
32
30
33
import static org .elasticsearch .index .IndexSettings .same ;
31
34
39
42
* - Delete index .{name} and add alias .{name} to .{name}-6
40
43
*/
41
44
public class InternalIndexReindexer <T > {
45
+ private static final Logger logger = LogManager .getLogger (InternalIndexReindexer .class );
42
46
43
47
private final Client client ;
44
48
private final ClusterService clusterService ;
45
49
private final Script transformScript ;
46
50
private final String [] types ;
47
51
private final int version ;
48
- private final Consumer < ActionListener <T >> preUpgrade ;
52
+ private final BiConsumer < ClusterState , ActionListener <T >> preUpgrade ;
49
53
private final BiConsumer <T , ActionListener <TransportResponse .Empty >> postUpgrade ;
50
54
51
55
public InternalIndexReindexer (Client client , ClusterService clusterService , int version , Script transformScript , String [] types ,
52
- Consumer < ActionListener <T >> preUpgrade ,
56
+ BiConsumer < ClusterState , ActionListener <T >> preUpgrade ,
53
57
BiConsumer <T , ActionListener <TransportResponse .Empty >> postUpgrade ) {
54
58
this .client = client ;
55
59
this .clusterService = clusterService ;
@@ -62,7 +66,7 @@ public InternalIndexReindexer(Client client, ClusterService clusterService, int
62
66
63
67
public void upgrade (TaskId task , String index , ClusterState clusterState , ActionListener <BulkByScrollResponse > listener ) {
64
68
ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient (client , task );
65
- preUpgrade .accept (ActionListener .wrap (
69
+ preUpgrade .accept (clusterState , ActionListener .wrap (
66
70
t -> innerUpgrade (parentAwareClient , index , clusterState , ActionListener .wrap (
67
71
response -> postUpgrade .accept (t , ActionListener .wrap (
68
72
empty -> listener .onResponse (response ),
@@ -76,32 +80,61 @@ public void upgrade(TaskId task, String index, ClusterState clusterState, Action
76
80
private void innerUpgrade (ParentTaskAssigningClient parentAwareClient , String index , ClusterState clusterState ,
77
81
ActionListener <BulkByScrollResponse > listener ) {
78
82
String newIndex = index + "-" + version ;
83
+ logger .trace ("upgrading index {} to new index {}" , index , newIndex );
79
84
try {
80
85
checkMasterAndDataNodeVersion (clusterState );
81
- parentAwareClient .admin ().indices ().prepareCreate (newIndex ).execute (ActionListener .wrap (createIndexResponse ->
82
- setReadOnlyBlock (index , ActionListener .wrap (setReadOnlyResponse ->
83
- reindex (parentAwareClient , index , newIndex , ActionListener .wrap (
84
- bulkByScrollResponse -> // Successful completion of reindexing - delete old index
85
- removeReadOnlyBlock (parentAwareClient , index , ActionListener .wrap (unsetReadOnlyResponse ->
86
- parentAwareClient .admin ().indices ().prepareAliases ().removeIndex (index )
87
- .addAlias (newIndex , index ).execute (ActionListener .wrap (deleteIndexResponse ->
88
- listener .onResponse (bulkByScrollResponse ), listener ::onFailure
89
- )), listener ::onFailure
90
- )),
91
- e -> // Something went wrong during reindexing - remove readonly flag and report the error
92
- removeReadOnlyBlock (parentAwareClient , index , ActionListener .wrap (unsetReadOnlyResponse -> {
93
- listener .onFailure (e );
94
- }, e1 -> {
95
- listener .onFailure (e );
96
- }))
97
- )), listener ::onFailure
98
- )), listener ::onFailure
99
- ));
86
+ parentAwareClient .admin ().indices ().prepareCreate (newIndex ).execute (ActionListener .wrap (createIndexResponse -> {
87
+ setReadOnlyBlock (index , ActionListener .wrap (
88
+ setReadOnlyResponse -> reindex (parentAwareClient , index , newIndex , ActionListener .wrap (bulkByScrollResponse -> {
89
+ if ((bulkByScrollResponse .getBulkFailures () != null
90
+ && bulkByScrollResponse .getBulkFailures ().isEmpty () == false )
91
+ || (bulkByScrollResponse .getSearchFailures () != null
92
+ && bulkByScrollResponse .getSearchFailures ().isEmpty () == false )) {
93
+ ElasticsearchException ex = logAndThrowExceptionForFailures (bulkByScrollResponse );
94
+ removeReadOnlyBlockOnReindexFailure (parentAwareClient , index , listener , ex );
95
+ } else {
96
+ // Successful completion of reindexing - remove read only and delete old index
97
+ removeReadOnlyBlock (parentAwareClient , index ,
98
+ ActionListener .wrap (unsetReadOnlyResponse -> parentAwareClient .admin ().indices ().prepareAliases ()
99
+ .removeIndex (index ).addAlias (newIndex , index )
100
+ .execute (ActionListener .wrap (
101
+ deleteIndexResponse -> listener .onResponse (bulkByScrollResponse ),
102
+ listener ::onFailure )),
103
+ listener ::onFailure ));
104
+ }
105
+ }, e -> {
106
+ logger .error ("error occurred while reindexing" , e );
107
+ removeReadOnlyBlockOnReindexFailure (parentAwareClient , index , listener , e );
108
+ })), listener ::onFailure ));
109
+ }, listener ::onFailure ));
100
110
} catch (Exception ex ) {
111
+ logger .error ("error occurred while upgrading index" , ex );
112
+ removeReadOnlyBlockOnReindexFailure (parentAwareClient , index , listener , ex );
101
113
listener .onFailure (ex );
102
114
}
103
115
}
104
116
117
+ private void removeReadOnlyBlockOnReindexFailure (ParentTaskAssigningClient parentAwareClient , String index ,
118
+ ActionListener <BulkByScrollResponse > listener , Exception ex ) {
119
+ removeReadOnlyBlock (parentAwareClient , index , ActionListener .wrap (unsetReadOnlyResponse -> {
120
+ listener .onFailure (ex );
121
+ }, e1 -> {
122
+ listener .onFailure (ex );
123
+ }));
124
+ }
125
+
126
+ private ElasticsearchException logAndThrowExceptionForFailures (BulkByScrollResponse bulkByScrollResponse ) {
127
+ String bulkFailures = (bulkByScrollResponse .getBulkFailures () != null )
128
+ ? Strings .collectionToCommaDelimitedString (bulkByScrollResponse .getBulkFailures ())
129
+ : "" ;
130
+ String searchFailures = (bulkByScrollResponse .getSearchFailures () != null )
131
+ ? Strings .collectionToCommaDelimitedString (bulkByScrollResponse .getSearchFailures ())
132
+ : "" ;
133
+ logger .error ("error occurred while reindexing, bulk failures [{}], search failures [{}]" , bulkFailures , searchFailures );
134
+ return new ElasticsearchException ("error occurred while reindexing, bulk failures [{}], search failures [{}]" , bulkFailures ,
135
+ searchFailures );
136
+ }
137
+
105
138
private void checkMasterAndDataNodeVersion (ClusterState clusterState ) {
106
139
if (clusterState .nodes ().getMinNodeVersion ().before (Upgrade .UPGRADE_INTRODUCED )) {
107
140
throw new IllegalStateException ("All nodes should have at least version [" + Upgrade .UPGRADE_INTRODUCED + "] to upgrade" );
0 commit comments