8
8
9
9
import org .apache .lucene .index .IndexCommit ;
10
10
import org .elasticsearch .Version ;
11
+ import org .elasticsearch .action .admin .cluster .state .ClusterStateResponse ;
12
+ import org .elasticsearch .action .support .PlainActionFuture ;
13
+ import org .elasticsearch .client .Client ;
11
14
import org .elasticsearch .cluster .metadata .IndexMetaData ;
12
15
import org .elasticsearch .cluster .metadata .MetaData ;
13
16
import org .elasticsearch .cluster .metadata .RepositoryMetaData ;
14
17
import org .elasticsearch .cluster .node .DiscoveryNode ;
18
+ import org .elasticsearch .common .Strings ;
19
+ import org .elasticsearch .common .collect .ImmutableOpenMap ;
15
20
import org .elasticsearch .common .component .AbstractLifecycleComponent ;
16
21
import org .elasticsearch .common .settings .Settings ;
22
+ import org .elasticsearch .index .Index ;
23
+ import org .elasticsearch .index .engine .EngineException ;
17
24
import org .elasticsearch .index .shard .IndexShard ;
25
+ import org .elasticsearch .index .shard .IndexShardRecoveryException ;
18
26
import org .elasticsearch .index .shard .ShardId ;
19
27
import org .elasticsearch .index .snapshots .IndexShardSnapshotStatus ;
20
28
import org .elasticsearch .index .store .Store ;
25
33
import org .elasticsearch .snapshots .SnapshotId ;
26
34
import org .elasticsearch .snapshots .SnapshotInfo ;
27
35
import org .elasticsearch .snapshots .SnapshotShardFailure ;
36
+ import org .elasticsearch .snapshots .SnapshotState ;
37
+ import org .elasticsearch .xpack .ccr .Ccr ;
38
+ import org .elasticsearch .xpack .ccr .CcrLicenseChecker ;
28
39
29
40
import java .io .IOException ;
41
+ import java .util .ArrayList ;
42
+ import java .util .Collections ;
43
+ import java .util .HashMap ;
30
44
import java .util .List ;
45
+ import java .util .Map ;
46
+ import java .util .Set ;
31
47
32
48
/**
33
49
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
34
50
* restore shards/indexes that exist on the remote cluster.
35
51
*/
36
52
public class CcrRepository extends AbstractLifecycleComponent implements Repository {
37
53
54
+ public static final String LATEST = "_latest_" ;
38
55
public static final String TYPE = "_ccr_" ;
39
56
public static final String NAME_PREFIX = "_ccr_" ;
57
+ private static final SnapshotId SNAPSHOT_ID = new SnapshotId (LATEST , LATEST );
40
58
41
59
private final RepositoryMetaData metadata ;
60
+ private final String remoteClusterAlias ;
61
+ private final Client client ;
62
+ private final CcrLicenseChecker ccrLicenseChecker ;
42
63
43
- public CcrRepository (RepositoryMetaData metadata , Settings settings ) {
64
+ public CcrRepository (RepositoryMetaData metadata , Client client , CcrLicenseChecker ccrLicenseChecker , Settings settings ) {
44
65
super (settings );
45
66
this .metadata = metadata ;
67
+ assert metadata .name ().startsWith (NAME_PREFIX ) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX ;
68
+ this .remoteClusterAlias = Strings .split (metadata .name (), NAME_PREFIX )[1 ];
69
+ this .ccrLicenseChecker = ccrLicenseChecker ;
70
+ this .client = client ;
46
71
}
47
72
48
73
@ Override
@@ -67,22 +92,85 @@ public RepositoryMetaData getMetadata() {
67
92
68
93
@ Override
69
94
public SnapshotInfo getSnapshotInfo (SnapshotId snapshotId ) {
70
- throw new UnsupportedOperationException ("Unsupported for repository of type: " + TYPE );
95
+ assert SNAPSHOT_ID .equals (snapshotId ) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId" ;
96
+ Client remoteClient = client .getRemoteClusterClient (remoteClusterAlias );
97
+ ClusterStateResponse response = remoteClient .admin ().cluster ().prepareState ().clear ().setMetaData (true ).setNodes (true ).get ();
98
+ ImmutableOpenMap <String , IndexMetaData > indicesMap = response .getState ().metaData ().indices ();
99
+ ArrayList <String > indices = new ArrayList <>(indicesMap .size ());
100
+ indicesMap .keysIt ().forEachRemaining (indices ::add );
101
+
102
+ return new SnapshotInfo (snapshotId , indices , SnapshotState .SUCCESS , response .getState ().getNodes ().getMaxNodeVersion ());
71
103
}
72
104
73
105
@ Override
74
106
public MetaData getSnapshotGlobalMetaData (SnapshotId snapshotId ) {
75
- throw new UnsupportedOperationException ("Unsupported for repository of type: " + TYPE );
107
+ assert SNAPSHOT_ID .equals (snapshotId ) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId" ;
108
+ Client remoteClient = client .getRemoteClusterClient (remoteClusterAlias );
109
+ ClusterStateResponse response = remoteClient
110
+ .admin ()
111
+ .cluster ()
112
+ .prepareState ()
113
+ .clear ()
114
+ .setMetaData (true )
115
+ .setIndices ("dummy_index_name" ) // We set a single dummy index name to avoid fetching all the index data
116
+ .get ();
117
+ return response .getState ().metaData ();
76
118
}
77
119
78
120
@ Override
79
121
public IndexMetaData getSnapshotIndexMetaData (SnapshotId snapshotId , IndexId index ) throws IOException {
80
- throw new UnsupportedOperationException ("Unsupported for repository of type: " + TYPE );
122
+ assert SNAPSHOT_ID .equals (snapshotId ) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId" ;
123
+ String leaderIndex = index .getName ();
124
+ Client remoteClient = client .getRemoteClusterClient (remoteClusterAlias );
125
+
126
+ ClusterStateResponse response = remoteClient
127
+ .admin ()
128
+ .cluster ()
129
+ .prepareState ()
130
+ .clear ()
131
+ .setMetaData (true )
132
+ .setIndices (leaderIndex )
133
+ .get ();
134
+
135
+ // Validates whether the leader cluster has been configured properly:
136
+ PlainActionFuture <String []> future = PlainActionFuture .newFuture ();
137
+ IndexMetaData leaderIndexMetaData = response .getState ().metaData ().index (leaderIndex );
138
+ ccrLicenseChecker .fetchLeaderHistoryUUIDs (remoteClient , leaderIndexMetaData , future ::onFailure , future ::onResponse );
139
+ String [] leaderHistoryUUIDs = future .actionGet ();
140
+
141
+ IndexMetaData .Builder imdBuilder = IndexMetaData .builder (leaderIndexMetaData );
142
+ // Adding the leader index uuid for each shard as custom metadata:
143
+ Map <String , String > metadata = new HashMap <>();
144
+ metadata .put (Ccr .CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS , String .join ("," , leaderHistoryUUIDs ));
145
+ metadata .put (Ccr .CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY , leaderIndexMetaData .getIndexUUID ());
146
+ metadata .put (Ccr .CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY , leaderIndexMetaData .getIndex ().getName ());
147
+ metadata .put (Ccr .CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY , remoteClusterAlias );
148
+ imdBuilder .putCustom (Ccr .CCR_CUSTOM_METADATA_KEY , metadata );
149
+
150
+ return imdBuilder .build ();
81
151
}
82
152
83
153
@ Override
84
154
public RepositoryData getRepositoryData () {
85
- throw new UnsupportedOperationException ("Unsupported for repository of type: " + TYPE );
155
+ Client remoteClient = client .getRemoteClusterClient (remoteClusterAlias );
156
+ ClusterStateResponse response = remoteClient .admin ().cluster ().prepareState ().clear ().setMetaData (true ).get ();
157
+ MetaData remoteMetaData = response .getState ().getMetaData ();
158
+
159
+ Map <String , SnapshotId > copiedSnapshotIds = new HashMap <>();
160
+ Map <String , SnapshotState > snapshotStates = new HashMap <>(copiedSnapshotIds .size ());
161
+ Map <IndexId , Set <SnapshotId >> indexSnapshots = new HashMap <>(copiedSnapshotIds .size ());
162
+
163
+ ImmutableOpenMap <String , IndexMetaData > remoteIndices = remoteMetaData .getIndices ();
164
+ for (String indexName : remoteMetaData .getConcreteAllIndices ()) {
165
+ // Both the Snapshot name and UUID are set to _latest_
166
+ SnapshotId snapshotId = new SnapshotId (LATEST , LATEST );
167
+ copiedSnapshotIds .put (indexName , snapshotId );
168
+ snapshotStates .put (indexName , SnapshotState .SUCCESS );
169
+ Index index = remoteIndices .get (indexName ).getIndex ();
170
+ indexSnapshots .put (new IndexId (indexName , index .getUUID ()), Collections .singleton (snapshotId ));
171
+ }
172
+
173
+ return new RepositoryData (1 , copiedSnapshotIds , snapshotStates , indexSnapshots , Collections .emptyList ());
86
174
}
87
175
88
176
@ Override
@@ -137,9 +225,17 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
137
225
}
138
226
139
227
@ Override
140
- public void restoreShard (IndexShard shard , SnapshotId snapshotId , Version version , IndexId indexId , ShardId snapshotShardId ,
228
+ public void restoreShard (IndexShard indexShard , SnapshotId snapshotId , Version version , IndexId indexId , ShardId shardId ,
141
229
RecoveryState recoveryState ) {
142
- throw new UnsupportedOperationException ("Unsupported for repository of type: " + TYPE );
230
+ final Store store = indexShard .store ();
231
+ store .incRef ();
232
+ try {
233
+ store .createEmpty (indexShard .indexSettings ().getIndexMetaData ().getCreationVersion ().luceneVersion );
234
+ } catch (EngineException | IOException e ) {
235
+ throw new IndexShardRecoveryException (shardId , "failed to recover from gateway" , e );
236
+ } finally {
237
+ store .decRef ();
238
+ }
143
239
}
144
240
145
241
@ Override
0 commit comments