51
51
import org .elasticsearch .cluster .metadata .IndexMetaData ;
52
52
import org .elasticsearch .cluster .metadata .MetaData ;
53
53
import org .elasticsearch .common .CheckedConsumer ;
54
+ import org .elasticsearch .common .Nullable ;
54
55
import org .elasticsearch .common .io .stream .ReleasableBytesStreamOutput ;
55
56
import org .elasticsearch .common .lease .Releasable ;
56
57
import org .elasticsearch .common .logging .Loggers ;
64
65
import org .elasticsearch .common .xcontent .XContentType ;
65
66
import org .elasticsearch .core .internal .io .IOUtils ;
66
67
import org .elasticsearch .env .NodeEnvironment ;
68
+ import org .elasticsearch .env .NodeMetaData ;
67
69
import org .elasticsearch .index .Index ;
68
70
69
71
import java .io .Closeable ;
@@ -155,17 +157,7 @@ public Writer createWriter() throws IOException {
155
157
final Directory directory = createDirectory (path .resolve (METADATA_DIRECTORY_NAME ));
156
158
closeables .add (directory );
157
159
158
- final IndexWriterConfig indexWriterConfig = new IndexWriterConfig (new KeywordAnalyzer ());
159
- // start empty since we re-write the whole cluster state to ensure it is all using the same format version
160
- indexWriterConfig .setOpenMode (IndexWriterConfig .OpenMode .CREATE );
161
- // only commit when specifically instructed, we must not write any intermediate states
162
- indexWriterConfig .setCommitOnClose (false );
163
- // most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer
164
- indexWriterConfig .setRAMBufferSizeMB (1.0 );
165
- // merge on the write thread (e.g. while flushing)
166
- indexWriterConfig .setMergeScheduler (new SerialMergeScheduler ());
167
-
168
- final IndexWriter indexWriter = new IndexWriter (directory , indexWriterConfig );
160
+ final IndexWriter indexWriter = createIndexWriter (directory , false );
169
161
closeables .add (indexWriter );
170
162
metaDataIndexWriters .add (new MetaDataIndexWriter (directory , indexWriter ));
171
163
}
@@ -178,6 +170,20 @@ public Writer createWriter() throws IOException {
178
170
return new Writer (metaDataIndexWriters , nodeId , bigArrays );
179
171
}
180
172
173
+ private static IndexWriter createIndexWriter (Directory directory , boolean openExisting ) throws IOException {
174
+ final IndexWriterConfig indexWriterConfig = new IndexWriterConfig (new KeywordAnalyzer ());
175
+ // start empty since we re-write the whole cluster state to ensure it is all using the same format version
176
+ indexWriterConfig .setOpenMode (openExisting ? IndexWriterConfig .OpenMode .APPEND : IndexWriterConfig .OpenMode .CREATE );
177
+ // only commit when specifically instructed, we must not write any intermediate states
178
+ indexWriterConfig .setCommitOnClose (false );
179
+ // most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer
180
+ indexWriterConfig .setRAMBufferSizeMB (1.0 );
181
+ // merge on the write thread (e.g. while flushing)
182
+ indexWriterConfig .setMergeScheduler (new SerialMergeScheduler ());
183
+
184
+ return new IndexWriter (directory , indexWriterConfig );
185
+ }
186
+
181
187
/**
182
188
* Remove all persisted cluster states from the given data paths, for use in tests. Should only be called when there is no open
183
189
* {@link Writer} on these paths.
@@ -196,6 +202,10 @@ Directory createDirectory(Path path) throws IOException {
196
202
return new SimpleFSDirectory (path );
197
203
}
198
204
205
+ public Path [] getDataPaths () {
206
+ return dataPaths ;
207
+ }
208
+
199
209
public static class OnDiskState {
200
210
private static final OnDiskState NO_ON_DISK_STATE = new OnDiskState (null , null , 0L , 0L , MetaData .EMPTY_META_DATA );
201
211
@@ -218,6 +228,66 @@ public boolean empty() {
218
228
}
219
229
}
220
230
231
+ /**
232
+ * Returns the node metadata for the given data paths, and checks if the node ids are unique
233
+ * @param dataPaths the data paths to scan
234
+ */
235
+ @ Nullable
236
+ public static NodeMetaData nodeMetaData (Path ... dataPaths ) throws IOException {
237
+ String nodeId = null ;
238
+ Version version = null ;
239
+ for (final Path dataPath : dataPaths ) {
240
+ final Path indexPath = dataPath .resolve (METADATA_DIRECTORY_NAME );
241
+ if (Files .exists (indexPath )) {
242
+ try (DirectoryReader reader = DirectoryReader .open (new SimpleFSDirectory (dataPath .resolve (METADATA_DIRECTORY_NAME )))) {
243
+ final Map <String , String > userData = reader .getIndexCommit ().getUserData ();
244
+ assert userData .get (NODE_VERSION_KEY ) != null ;
245
+
246
+ final String thisNodeId = userData .get (NODE_ID_KEY );
247
+ assert thisNodeId != null ;
248
+ if (nodeId != null && nodeId .equals (thisNodeId ) == false ) {
249
+ throw new IllegalStateException ("unexpected node ID in metadata, found [" + thisNodeId +
250
+ "] in [" + dataPath + "] but expected [" + nodeId + "]" );
251
+ } else if (nodeId == null ) {
252
+ nodeId = thisNodeId ;
253
+ version = Version .fromId (Integer .parseInt (userData .get (NODE_VERSION_KEY )));
254
+ }
255
+ } catch (IndexNotFoundException e ) {
256
+ logger .debug (new ParameterizedMessage ("no on-disk state at {}" , indexPath ), e );
257
+ }
258
+ }
259
+ }
260
+ if (nodeId == null ) {
261
+ return null ;
262
+ }
263
+ return new NodeMetaData (nodeId , version );
264
+ }
265
+
266
+ /**
267
+ * Overrides the version field for the metadata in the given data path
268
+ */
269
+ public static void overrideVersion (Version newVersion , Path ... dataPaths ) throws IOException {
270
+ for (final Path dataPath : dataPaths ) {
271
+ final Path indexPath = dataPath .resolve (METADATA_DIRECTORY_NAME );
272
+ if (Files .exists (indexPath )) {
273
+ try (DirectoryReader reader = DirectoryReader .open (new SimpleFSDirectory (dataPath .resolve (METADATA_DIRECTORY_NAME )))) {
274
+ final Map <String , String > userData = reader .getIndexCommit ().getUserData ();
275
+ assert userData .get (NODE_VERSION_KEY ) != null ;
276
+
277
+ try (IndexWriter indexWriter =
278
+ createIndexWriter (new SimpleFSDirectory (dataPath .resolve (METADATA_DIRECTORY_NAME )), true )) {
279
+ final Map <String , String > commitData = new HashMap <>(userData );
280
+ commitData .put (NODE_VERSION_KEY , Integer .toString (newVersion .id ));
281
+ indexWriter .setLiveCommitData (commitData .entrySet ());
282
+ indexWriter .commit ();
283
+ }
284
+ } catch (IndexNotFoundException e ) {
285
+ logger .debug (new ParameterizedMessage ("no on-disk state at {}" , indexPath ), e );
286
+ }
287
+ }
288
+ }
289
+ }
290
+
221
291
/**
222
292
* Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
223
293
*/
0 commit comments