22
22
import org .apache .logging .log4j .message .ParameterizedMessage ;
23
23
import org .elasticsearch .ElasticsearchException ;
24
24
import org .elasticsearch .action .ActionListener ;
25
+ import org .elasticsearch .Version ;
25
26
import org .elasticsearch .action .ActionType ;
26
27
import org .elasticsearch .action .FailedNodeException ;
27
28
import org .elasticsearch .action .support .ActionFilters ;
45
46
import org .elasticsearch .gateway .AsyncShardFetch ;
46
47
import org .elasticsearch .index .IndexService ;
47
48
import org .elasticsearch .index .IndexSettings ;
49
+ import org .elasticsearch .index .seqno .ReplicationTracker ;
50
+ import org .elasticsearch .index .seqno .RetentionLease ;
48
51
import org .elasticsearch .index .shard .IndexShard ;
49
52
import org .elasticsearch .index .shard .ShardId ;
50
53
import org .elasticsearch .index .shard .ShardPath ;
55
58
import org .elasticsearch .transport .TransportService ;
56
59
57
60
import java .io .IOException ;
61
+ import java .util .Collections ;
58
62
import java .util .Iterator ;
59
63
import java .util .List ;
60
64
import java .util .concurrent .TimeUnit ;
@@ -127,15 +131,16 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
127
131
IndexShard indexShard = indexService .getShardOrNull (shardId .id ());
128
132
if (indexShard != null ) {
129
133
try {
130
- final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData (shardId , indexShard .snapshotStoreMetadata ());
134
+ final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData (shardId ,
135
+ indexShard .snapshotStoreMetadata (), indexShard .getPeerRecoveryRetentionLeases ());
131
136
exists = true ;
132
137
return storeFilesMetaData ;
133
138
} catch (org .apache .lucene .index .IndexNotFoundException e ) {
134
139
logger .trace (new ParameterizedMessage ("[{}] node is missing index, responding with empty" , shardId ), e );
135
- return new StoreFilesMetaData (shardId , Store .MetadataSnapshot .EMPTY );
140
+ return new StoreFilesMetaData (shardId , Store .MetadataSnapshot .EMPTY , Collections . emptyList () );
136
141
} catch (IOException e ) {
137
142
logger .warn (new ParameterizedMessage ("[{}] can't read metadata from store, responding with empty" , shardId ), e );
138
- return new StoreFilesMetaData (shardId , Store .MetadataSnapshot .EMPTY );
143
+ return new StoreFilesMetaData (shardId , Store .MetadataSnapshot .EMPTY , Collections . emptyList () );
139
144
}
140
145
}
141
146
}
@@ -150,20 +155,23 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
150
155
}
151
156
if (metaData == null ) {
152
157
logger .trace ("{} node doesn't have meta data for the requests index, responding with empty" , shardId );
153
- return new StoreFilesMetaData (shardId , Store .MetadataSnapshot .EMPTY );
158
+ return new StoreFilesMetaData (shardId , Store .MetadataSnapshot .EMPTY , Collections . emptyList () );
154
159
}
155
160
final IndexSettings indexSettings = indexService != null ? indexService .getIndexSettings () :
156
161
new IndexSettings (metaData , settings );
157
162
final ShardPath shardPath = ShardPath .loadShardPath (logger , nodeEnv , shardId , indexSettings );
158
163
if (shardPath == null ) {
159
- return new StoreFilesMetaData (shardId , Store .MetadataSnapshot .EMPTY );
164
+ return new StoreFilesMetaData (shardId , Store .MetadataSnapshot .EMPTY , Collections . emptyList () );
160
165
}
161
166
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
162
167
// 1) a shard is being constructed, which means the master will not use a copy of this replica
163
168
// 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not
164
169
// reuse local resources.
165
- return new StoreFilesMetaData (shardId , Store .readMetadataSnapshot (shardPath .resolveIndex (), shardId ,
166
- nodeEnv ::shardLock , logger ));
170
+ final Store .MetadataSnapshot metadataSnapshot =
171
+ Store .readMetadataSnapshot (shardPath .resolveIndex (), shardId , nodeEnv ::shardLock , logger );
172
+ // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when
173
+ // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard.
174
+ return new StoreFilesMetaData (shardId , metadataSnapshot , Collections .emptyList ());
167
175
} finally {
168
176
TimeValue took = new TimeValue (System .nanoTime () - startTimeNS , TimeUnit .NANOSECONDS );
169
177
if (exists ) {
@@ -175,17 +183,34 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
175
183
}
176
184
177
185
public static class StoreFilesMetaData implements Iterable <StoreFileMetaData >, Writeable {
178
- private ShardId shardId ;
179
- Store .MetadataSnapshot metadataSnapshot ;
186
+ private final ShardId shardId ;
187
+ private final Store .MetadataSnapshot metadataSnapshot ;
188
+ private final List <RetentionLease > peerRecoveryRetentionLeases ;
189
+
190
+ public StoreFilesMetaData (ShardId shardId , Store .MetadataSnapshot metadataSnapshot ,
191
+ List <RetentionLease > peerRecoveryRetentionLeases ) {
192
+ this .shardId = shardId ;
193
+ this .metadataSnapshot = metadataSnapshot ;
194
+ this .peerRecoveryRetentionLeases = peerRecoveryRetentionLeases ;
195
+ }
180
196
181
197
public StoreFilesMetaData (StreamInput in ) throws IOException {
182
198
this .shardId = new ShardId (in );
183
199
this .metadataSnapshot = new Store .MetadataSnapshot (in );
200
+ if (in .getVersion ().onOrAfter (Version .V_7_5_0 )) {
201
+ this .peerRecoveryRetentionLeases = in .readList (RetentionLease ::new );
202
+ } else {
203
+ this .peerRecoveryRetentionLeases = Collections .emptyList ();
204
+ }
184
205
}
185
206
186
- public StoreFilesMetaData (ShardId shardId , Store .MetadataSnapshot metadataSnapshot ) {
187
- this .shardId = shardId ;
188
- this .metadataSnapshot = metadataSnapshot ;
207
+ @ Override
208
+ public void writeTo (StreamOutput out ) throws IOException {
209
+ shardId .writeTo (out );
210
+ metadataSnapshot .writeTo (out );
211
+ if (out .getVersion ().onOrAfter (Version .V_7_5_0 )) {
212
+ out .writeList (peerRecoveryRetentionLeases );
213
+ }
189
214
}
190
215
191
216
public ShardId shardId () {
@@ -209,10 +234,18 @@ public StoreFileMetaData file(String name) {
209
234
return metadataSnapshot .asMap ().get (name );
210
235
}
211
236
212
- @ Override
213
- public void writeTo (StreamOutput out ) throws IOException {
214
- shardId .writeTo (out );
215
- metadataSnapshot .writeTo (out );
237
+ /**
238
+ * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1.
239
+ */
240
+ public long getPeerRecoveryRetentionLeaseRetainingSeqNo (DiscoveryNode node ) {
241
+ assert node != null ;
242
+ final String retentionLeaseId = ReplicationTracker .getPeerRecoveryRetentionLeaseId (node .getId ());
243
+ return peerRecoveryRetentionLeases .stream ().filter (lease -> lease .id ().equals (retentionLeaseId ))
244
+ .mapToLong (RetentionLease ::retainingSequenceNumber ).findFirst ().orElse (-1L );
245
+ }
246
+
247
+ public List <RetentionLease > peerRecoveryRetentionLeases () {
248
+ return peerRecoveryRetentionLeases ;
216
249
}
217
250
218
251
/**
0 commit comments