17
17
import org .elasticsearch .cluster .metadata .IndexMetaData ;
18
18
import org .elasticsearch .cluster .metadata .MappingMetaData ;
19
19
import org .elasticsearch .cluster .routing .IndexRoutingTable ;
20
+ import org .elasticsearch .cluster .service .ClusterService ;
20
21
import org .elasticsearch .common .settings .Settings ;
21
22
import org .elasticsearch .common .unit .TimeValue ;
22
23
import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
23
24
import org .elasticsearch .common .xcontent .XContentType ;
24
25
import org .elasticsearch .index .Index ;
25
26
import org .elasticsearch .index .IndexNotFoundException ;
27
+ import org .elasticsearch .index .engine .CommitStats ;
28
+ import org .elasticsearch .index .engine .Engine ;
26
29
import org .elasticsearch .index .seqno .SeqNoStats ;
27
30
import org .elasticsearch .index .shard .ShardId ;
28
31
import org .elasticsearch .index .shard .ShardNotFoundException ;
47
50
import java .util .function .LongConsumer ;
48
51
49
52
import static org .elasticsearch .xpack .ccr .CcrLicenseChecker .wrapClient ;
53
+ import static org .elasticsearch .xpack .ccr .action .TransportResumeFollowAction .extractLeaderShardHistoryUUIDs ;
50
54
51
55
public class ShardFollowTasksExecutor extends PersistentTasksExecutor <ShardFollowTask > {
52
56
53
57
private final Client client ;
54
58
private final ThreadPool threadPool ;
59
+ private final ClusterService clusterService ;
55
60
56
- public ShardFollowTasksExecutor (Settings settings , Client client , ThreadPool threadPool ) {
61
+ public ShardFollowTasksExecutor (Settings settings , Client client , ThreadPool threadPool , ClusterService clusterService ) {
57
62
super (settings , ShardFollowTask .NAME , Ccr .CCR_THREAD_POOL_NAME );
58
63
this .client = client ;
59
64
this .threadPool = threadPool ;
65
+ this .clusterService = clusterService ;
60
66
}
61
67
62
68
@ Override
@@ -99,8 +105,10 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
99
105
}
100
106
}
101
107
};
102
- return new ShardFollowNodeTask (
103
- id , type , action , getDescription (taskInProgress ), parentTaskId , headers , params , scheduler , System ::nanoTime ) {
108
+
109
+ final String recordedLeaderShardHistoryUUID = getLeaderShardHistoryUUID (params );
110
+ return new ShardFollowNodeTask (id , type , action , getDescription (taskInProgress ), parentTaskId , headers , params ,
111
+ scheduler , System ::nanoTime ) {
104
112
105
113
@ Override
106
114
protected void innerUpdateMapping (LongConsumer handler , Consumer <Exception > errorHandler ) {
@@ -135,12 +143,14 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
135
143
136
144
@ Override
137
145
protected void innerSendBulkShardOperationsRequest (
138
- final List <Translog .Operation > operations ,
139
- final long maxSeqNoOfUpdatesOrDeletes ,
140
- final Consumer <BulkShardOperationsResponse > handler ,
141
- final Consumer <Exception > errorHandler ) {
142
- final BulkShardOperationsRequest request = new BulkShardOperationsRequest (
143
- params .getFollowShardId (), operations , maxSeqNoOfUpdatesOrDeletes );
146
+ final String followerHistoryUUID ,
147
+ final List <Translog .Operation > operations ,
148
+ final long maxSeqNoOfUpdatesOrDeletes ,
149
+ final Consumer <BulkShardOperationsResponse > handler ,
150
+ final Consumer <Exception > errorHandler ) {
151
+
152
+ final BulkShardOperationsRequest request = new BulkShardOperationsRequest (params .getFollowShardId (),
153
+ followerHistoryUUID , operations , maxSeqNoOfUpdatesOrDeletes );
144
154
followerClient .execute (BulkShardOperationsAction .INSTANCE , request ,
145
155
ActionListener .wrap (response -> handler .accept (response ), errorHandler ));
146
156
}
@@ -149,7 +159,7 @@ protected void innerSendBulkShardOperationsRequest(
149
159
protected void innerSendShardChangesRequest (long from , int maxOperationCount , Consumer <ShardChangesAction .Response > handler ,
150
160
Consumer <Exception > errorHandler ) {
151
161
ShardChangesAction .Request request =
152
- new ShardChangesAction .Request (params .getLeaderShardId (), params . getRecordedLeaderIndexHistoryUUID () );
162
+ new ShardChangesAction .Request (params .getLeaderShardId (), recordedLeaderShardHistoryUUID );
153
163
request .setFromSeqNo (from );
154
164
request .setMaxOperationCount (maxOperationCount );
155
165
request .setMaxBatchSize (params .getMaxBatchSize ());
@@ -159,8 +169,15 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
159
169
};
160
170
}
161
171
162
- interface BiLongConsumer {
163
- void accept (long x , long y );
172
+ private String getLeaderShardHistoryUUID (ShardFollowTask params ) {
173
+ IndexMetaData followIndexMetaData = clusterService .state ().metaData ().index (params .getFollowShardId ().getIndex ());
174
+ Map <String , String > ccrIndexMetadata = followIndexMetaData .getCustomData (Ccr .CCR_CUSTOM_METADATA_KEY );
175
+ String [] recordedLeaderShardHistoryUUIDs = extractLeaderShardHistoryUUIDs (ccrIndexMetadata );
176
+ return recordedLeaderShardHistoryUUIDs [params .getLeaderShardId ().id ()];
177
+ }
178
+
179
+ interface FollowerStatsInfoHandler {
180
+ void accept (String followerHistoryUUID , long globalCheckpoint , long maxSeqNo );
164
181
}
165
182
166
183
@ Override
@@ -169,7 +186,9 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll
169
186
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask ) task ;
170
187
logger .info ("{} Starting to track leader shard {}" , params .getFollowShardId (), params .getLeaderShardId ());
171
188
172
- BiLongConsumer handler = (followerGCP , maxSeqNo ) -> shardFollowNodeTask .start (followerGCP , maxSeqNo , followerGCP , maxSeqNo );
189
+ FollowerStatsInfoHandler handler = (followerHistoryUUID , followerGCP , maxSeqNo ) -> {
190
+ shardFollowNodeTask .start (followerHistoryUUID , followerGCP , maxSeqNo , followerGCP , maxSeqNo );
191
+ };
173
192
Consumer <Exception > errorHandler = e -> {
174
193
if (shardFollowNodeTask .isStopped ()) {
175
194
return ;
@@ -184,13 +203,13 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll
184
203
}
185
204
};
186
205
187
- fetchGlobalCheckpoint (followerClient , params .getFollowShardId (), handler , errorHandler );
206
+ fetchFollowerShardInfo (followerClient , params .getFollowShardId (), handler , errorHandler );
188
207
}
189
208
190
- private void fetchGlobalCheckpoint (
209
+ private void fetchFollowerShardInfo (
191
210
final Client client ,
192
211
final ShardId shardId ,
193
- final BiLongConsumer handler ,
212
+ final FollowerStatsInfoHandler handler ,
194
213
final Consumer <Exception > errorHandler ) {
195
214
client .admin ().indices ().stats (new IndicesStatsRequest ().indices (shardId .getIndexName ()), ActionListener .wrap (r -> {
196
215
IndexStats indexStats = r .getIndex (shardId .getIndexName ());
@@ -204,10 +223,14 @@ private void fetchGlobalCheckpoint(
204
223
.filter (shardStats -> shardStats .getShardRouting ().primary ())
205
224
.findAny ();
206
225
if (filteredShardStats .isPresent ()) {
207
- final SeqNoStats seqNoStats = filteredShardStats .get ().getSeqNoStats ();
226
+ final ShardStats shardStats = filteredShardStats .get ();
227
+ final CommitStats commitStats = shardStats .getCommitStats ();
228
+ final String historyUUID = commitStats .getUserData ().get (Engine .HISTORY_UUID_KEY );
229
+
230
+ final SeqNoStats seqNoStats = shardStats .getSeqNoStats ();
208
231
final long globalCheckpoint = seqNoStats .getGlobalCheckpoint ();
209
232
final long maxSeqNo = seqNoStats .getMaxSeqNo ();
210
- handler .accept (globalCheckpoint , maxSeqNo );
233
+ handler .accept (historyUUID , globalCheckpoint , maxSeqNo );
211
234
} else {
212
235
errorHandler .accept (new ShardNotFoundException (shardId ));
213
236
}
0 commit comments