33
33
import org .elasticsearch .action .bulk .BackoffPolicy ;
34
34
import org .elasticsearch .action .bulk .BulkItemResponse ;
35
35
import org .elasticsearch .action .index .IndexRequest ;
36
+ import org .elasticsearch .action .search .SearchRequest ;
36
37
import org .elasticsearch .client .Client ;
37
38
import org .elasticsearch .client .ParentTaskAssigningClient ;
38
39
import org .elasticsearch .client .RestClient ;
47
48
import org .elasticsearch .common .xcontent .XContentParser ;
48
49
import org .elasticsearch .common .xcontent .XContentType ;
49
50
import org .elasticsearch .index .VersionType ;
51
+ import org .elasticsearch .index .mapper .SeqNoFieldMapper ;
50
52
import org .elasticsearch .index .mapper .VersionFieldMapper ;
51
53
import org .elasticsearch .index .reindex .remote .RemoteScrollableHitSource ;
52
54
import org .elasticsearch .script .Script ;
53
55
import org .elasticsearch .script .ScriptService ;
56
+ import org .elasticsearch .search .sort .FieldSortBuilder ;
57
+ import org .elasticsearch .search .sort .SortBuilder ;
58
+ import org .elasticsearch .search .sort .SortOrder ;
54
59
import org .elasticsearch .threadpool .ThreadPool ;
55
60
56
61
import java .io .IOException ;
@@ -92,17 +97,41 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen
92
97
}
93
98
94
99
public void execute (BulkByScrollTask task , ReindexRequest request , ActionListener <BulkByScrollResponse > listener ) {
100
+ request .getSearchRequest ().allowPartialSearchResults (false );
101
+ // Notice that this is called both on leader and workers when slicing.
102
+ String resumableSortingField = request .getRemoteInfo () == null ? getOrAddRestartFromField (request .getSearchRequest ()) : null ;
103
+
95
104
BulkByScrollParallelizationHelper .executeSlicedAction (task , request , ReindexAction .INSTANCE , listener , client ,
96
105
clusterService .localNode (),
97
106
() -> {
98
107
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient (client , clusterService .localNode (), task );
99
108
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction (task , logger , assigningClient , threadPool ,
100
- scriptService , reindexSslConfig , request , listener );
109
+ scriptService , reindexSslConfig , request , resumableSortingField , listener );
101
110
searchAction .start ();
102
111
});
103
112
104
113
}
105
114
115
+ private static String getOrAddRestartFromField (SearchRequest searchRequest ) {
116
+ // we keep with the tradition of modifying the input request, though this can lead to strange results (in transport clients).
117
+ List <SortBuilder <?>> sorts = searchRequest .source ().sorts ();
118
+ if (sorts != null && sorts .size () >= 1 ) {
119
+ SortBuilder <?> firstSort = sorts .get (0 );
120
+ if (firstSort instanceof FieldSortBuilder ) {
121
+ FieldSortBuilder fieldSort = (FieldSortBuilder ) firstSort ;
122
+ if (SeqNoFieldMapper .NAME .equals (fieldSort .getFieldName ())
123
+ && fieldSort .order () == SortOrder .ASC ) {
124
+ return SeqNoFieldMapper .NAME ;
125
+ }
126
+ // todo: support non seq_no fields and descending, but need to check field is numeric and handle missing values too then.
127
+ }
128
+ return null ;
129
+ }
130
+
131
+ searchRequest .source ().sort (SeqNoFieldMapper .NAME );
132
+ return SeqNoFieldMapper .NAME ;
133
+ }
134
+
106
135
/**
107
136
* Build the {@link RestClient} used for reindexing from remote clusters.
108
137
* @param remoteInfo connection information for the remote cluster
@@ -170,18 +199,20 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
170
199
171
200
AsyncIndexBySearchAction (BulkByScrollTask task , Logger logger , ParentTaskAssigningClient client ,
172
201
ThreadPool threadPool , ScriptService scriptService , ReindexSslConfig sslConfig , ReindexRequest request ,
173
- ActionListener <BulkByScrollResponse > listener ) {
202
+ String restartFromField , ActionListener <BulkByScrollResponse > listener ) {
174
203
super (task ,
175
204
/*
176
205
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
177
206
* external versioning.
178
207
*/
179
208
request .getDestination ().versionType () != VersionType .INTERNAL ,
180
- false , logger , client , threadPool , request , listener , scriptService , sslConfig );
209
+ SeqNoFieldMapper .NAME .equals (restartFromField ), logger , client , threadPool , request , listener ,
210
+ scriptService , sslConfig , restartFromField );
181
211
}
182
212
183
213
@ Override
184
- protected ScrollableHitSource buildScrollableResultSource (BackoffPolicy backoffPolicy ) {
214
+ protected ScrollableHitSource buildScrollableResultSource (BackoffPolicy backoffPolicy ,
215
+ String restartFromField ) {
185
216
if (mainRequest .getRemoteInfo () != null ) {
186
217
RemoteInfo remoteInfo = mainRequest .getRemoteInfo ();
187
218
createdThreads = synchronizedList (new ArrayList <>());
@@ -191,7 +222,7 @@ protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffP
191
222
this ::onScrollResponse , this ::finishHim ,
192
223
restClient , remoteInfo .getQuery (), mainRequest .getSearchRequest ());
193
224
}
194
- return super .buildScrollableResultSource (backoffPolicy );
225
+ return super .buildScrollableResultSource (backoffPolicy , restartFromField );
195
226
}
196
227
197
228
@ Override
0 commit comments