Skip to content

Commit c4c3c8b

Browse files
authored
Add data stream support to CCR (#61993)
This commit adds support data stream support to CCR's auto following by making the following changes: * When the auto follow coordinator iterates over the candidate indices to follow, the auto follow coordinator also checks whether the index is part of a data stream and if the name of data stream also matches with the auto follow pattern then the index will be auto followed. * When following an index, the put follow api also checks whether that index is part of a data stream and if so then also replicates the data stream definition to the local cluster. * In order for the follow index api to determine whether an index is part of a data stream, the cluster state api was modified to also fetch the data stream definition of the cluster state if only the state is queried for specific indices. When a data stream is auto followed, only new backing indices are auto followed. This is in line with how time based indices patterns are replicated today. This means that the data stream isn't copied 1 to 1 into the local cluster. The local cluster's data stream definition contains the same name, timestamp field and generation, but the list of backing indices may be different (depending on when a data stream was auto followed). Closes #56259
1 parent 08c0a87 commit c4c3c8b

File tree

14 files changed

+621
-26
lines changed

14 files changed

+621
-26
lines changed

docs/reference/ccr/auto-follow.asciidoc

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ each new index in the series is replicated automatically. Whenever the name of
77
a new index on the remote cluster matches the auto-follow pattern, a
88
corresponding follower index is added to the local cluster.
99

10+
You can also create auto-follow patterns for data streams. When a new backing
11+
index is generated on a remote cluster, that index and its data stream are
12+
automatically followed if the data stream name matches an auto-follow
13+
pattern. If you create a data stream after creating the auto-follow pattern,
14+
all backing indices are followed automatically.
15+
16+
1017
Auto-follow patterns are especially useful with
1118
<<index-lifecycle-management,{ilm-cap}>>, which might continually create
1219
new indices on the cluster containing the leader index.

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.cluster.ClusterStateObserver;
3030
import org.elasticsearch.cluster.NotMasterException;
3131
import org.elasticsearch.cluster.block.ClusterBlockException;
32+
import org.elasticsearch.cluster.metadata.DataStream;
33+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
3234
import org.elasticsearch.cluster.metadata.IndexMetadata;
3335
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3436
import org.elasticsearch.cluster.metadata.Metadata;
@@ -37,6 +39,7 @@
3739
import org.elasticsearch.cluster.service.ClusterService;
3840
import org.elasticsearch.common.inject.Inject;
3941
import org.elasticsearch.common.unit.TimeValue;
42+
import org.elasticsearch.index.Index;
4043
import org.elasticsearch.node.NodeClosedException;
4144
import org.elasticsearch.tasks.Task;
4245
import org.elasticsearch.threadpool.ThreadPool;
@@ -149,9 +152,21 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request,
149152
mdBuilder.version(currentState.metadata().version());
150153
String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request);
151154
for (String filteredIndex : indices) {
152-
IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex);
153-
if (indexMetadata != null) {
154-
mdBuilder.put(indexMetadata, false);
155+
// If the requested index is part of a data stream then that data stream should also be included:
156+
IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(filteredIndex);
157+
if (indexAbstraction.getParentDataStream() != null) {
158+
DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
159+
mdBuilder.put(dataStream);
160+
// Also the IMD of other backing indices need to be included, otherwise the cluster state api
161+
// can't create a valid cluster state instance:
162+
for (Index backingIndex : dataStream.getIndices()) {
163+
mdBuilder.put(currentState.metadata().index(backingIndex), false);
164+
}
165+
} else {
166+
IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex);
167+
if (indexMetadata != null) {
168+
mdBuilder.put(indexMetadata, false);
169+
}
155170
}
156171
}
157172
} else {

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

+16
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import java.util.Map;
8787
import java.util.Optional;
8888
import java.util.Set;
89+
import java.util.function.BiConsumer;
8990
import java.util.function.Function;
9091
import java.util.function.Predicate;
9192
import java.util.stream.Collectors;
@@ -184,6 +185,20 @@ public RestoreService(ClusterService clusterService, RepositoriesService reposit
184185
* @param listener restore listener
185186
*/
186187
public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionListener<RestoreCompletionResponse> listener) {
188+
restoreSnapshot(request, listener, (clusterState, builder) -> {});
189+
}
190+
191+
/**
192+
* Restores snapshot specified in the restore request.
193+
*
194+
* @param request restore request
195+
* @param listener restore listener
196+
* @param updater handler that allows callers to make modifications to {@link Metadata}
197+
* in the same cluster state update as the restore operation
198+
*/
199+
public void restoreSnapshot(final RestoreSnapshotRequest request,
200+
final ActionListener<RestoreCompletionResponse> listener,
201+
final BiConsumer<ClusterState, Metadata.Builder> updater) {
187202
try {
188203
// Read snapshot info and metadata from the repository
189204
final String repositoryName = request.repository();
@@ -455,6 +470,7 @@ restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
455470
}
456471

457472
RoutingTable rt = rtBuilder.build();
473+
updater.accept(currentState, mdBuilder);
458474
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
459475
return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
460476
}

x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java

+217
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,27 @@
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.common.xcontent.XContentBuilder;
1515
import org.elasticsearch.common.xcontent.json.JsonXContent;
16+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1617

1718
import java.io.IOException;
19+
import java.text.SimpleDateFormat;
20+
import java.util.Date;
21+
import java.util.List;
22+
import java.util.Locale;
1823
import java.util.Map;
1924
import java.util.concurrent.TimeUnit;
2025

26+
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
2127
import static org.hamcrest.Matchers.equalTo;
2228
import static org.hamcrest.Matchers.hasEntry;
2329
import static org.hamcrest.Matchers.hasKey;
30+
import static org.hamcrest.Matchers.hasSize;
2431
import static org.hamcrest.Matchers.instanceOf;
2532

2633
public class AutoFollowIT extends ESCCRRestTestCase {
2734

35+
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT);
36+
2837
public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
2938
if ("follow".equals(targetCluster) == false) {
3039
logger.info("skipping test, waiting for target cluster [follow]" );
@@ -64,6 +73,7 @@ public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
6473
verifyDocuments("logs-20190101", 5, "filtered_field:true");
6574
verifyDocuments("logs-20200101", 5, "filtered_field:true");
6675
});
76+
deleteAutoFollowPattern("leader_cluster_pattern");
6777
}
6878

6979
public void testAutoFollowPatterns() throws Exception {
@@ -122,6 +132,7 @@ public void testAutoFollowPatterns() throws Exception {
122132
verifyCcrMonitoring("metrics-20210101", "metrics-20210101");
123133
verifyAutoFollowMonitoring();
124134
}, 30, TimeUnit.SECONDS);
135+
deleteAutoFollowPattern("test_pattern");
125136
}
126137

127138
public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
@@ -163,12 +174,218 @@ public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws
163174
);
164175
}
165176

177+
public void testDataStreams() throws Exception {
178+
if ("follow".equals(targetCluster) == false) {
179+
return;
180+
}
181+
182+
final int numDocs = 64;
183+
final String dataStreamName = "logs-mysql-error";
184+
185+
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
186+
187+
// Create auto follow pattern
188+
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
189+
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
190+
bodyBuilder.startObject();
191+
{
192+
bodyBuilder.startArray("leader_index_patterns");
193+
{
194+
bodyBuilder.value("logs-*");
195+
}
196+
bodyBuilder.endArray();
197+
bodyBuilder.field("remote_cluster", "leader_cluster");
198+
}
199+
bodyBuilder.endObject();
200+
request.setJsonEntity(Strings.toString(bodyBuilder));
201+
}
202+
assertOK(client().performRequest(request));
203+
204+
// Create data stream and ensure that is is auto followed
205+
{
206+
try (RestClient leaderClient = buildLeaderClient()) {
207+
for (int i = 0; i < numDocs; i++) {
208+
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
209+
indexRequest.addParameter("refresh", "true");
210+
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
211+
assertOK(leaderClient.performRequest(indexRequest));
212+
}
213+
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001");
214+
verifyDocuments(leaderClient, dataStreamName, numDocs);
215+
}
216+
assertBusy(() -> {
217+
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
218+
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001");
219+
ensureYellow(dataStreamName);
220+
verifyDocuments(client(), dataStreamName, numDocs);
221+
});
222+
}
223+
224+
// First rollover and ensure second backing index is replicated:
225+
{
226+
try (RestClient leaderClient = buildLeaderClient()) {
227+
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
228+
assertOK(leaderClient.performRequest(rolloverRequest));
229+
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");
230+
231+
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
232+
indexRequest.addParameter("refresh", "true");
233+
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
234+
assertOK(leaderClient.performRequest(indexRequest));
235+
verifyDocuments(leaderClient, dataStreamName, numDocs + 1);
236+
}
237+
assertBusy(() -> {
238+
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
239+
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");
240+
ensureYellow(dataStreamName);
241+
verifyDocuments(client(), dataStreamName, numDocs + 1);
242+
});
243+
}
244+
245+
// Second rollover and ensure third backing index is replicated:
246+
{
247+
try (RestClient leaderClient = buildLeaderClient()) {
248+
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
249+
assertOK(leaderClient.performRequest(rolloverRequest));
250+
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" +
251+
".ds-logs-mysql-error-000003");
252+
253+
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
254+
indexRequest.addParameter("refresh", "true");
255+
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
256+
assertOK(leaderClient.performRequest(indexRequest));
257+
verifyDocuments(leaderClient, dataStreamName, numDocs + 2);
258+
}
259+
assertBusy(() -> {
260+
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3));
261+
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002",
262+
".ds-logs-mysql-error-000003");
263+
ensureYellow(dataStreamName);
264+
verifyDocuments(client(), dataStreamName, numDocs + 2);
265+
});
266+
}
267+
// Cleanup:
268+
{
269+
deleteAutoFollowPattern("test_pattern");
270+
deleteDataStream(dataStreamName);
271+
}
272+
}
273+
274+
public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception {
275+
if ("follow".equals(targetCluster) == false) {
276+
return;
277+
}
278+
279+
final int initialNumDocs = 16;
280+
final String dataStreamName = "logs-syslog-prod";
281+
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
282+
// Initialize data stream prior to auto following
283+
{
284+
try (RestClient leaderClient = buildLeaderClient()) {
285+
for (int i = 0; i < initialNumDocs; i++) {
286+
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
287+
indexRequest.addParameter("refresh", "true");
288+
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
289+
assertOK(leaderClient.performRequest(indexRequest));
290+
}
291+
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001");
292+
verifyDocuments(leaderClient, dataStreamName, initialNumDocs);
293+
}
294+
}
295+
// Create auto follow pattern
296+
{
297+
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
298+
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
299+
bodyBuilder.startObject();
300+
{
301+
bodyBuilder.startArray("leader_index_patterns");
302+
{
303+
bodyBuilder.value("logs-*");
304+
}
305+
bodyBuilder.endArray();
306+
bodyBuilder.field("remote_cluster", "leader_cluster");
307+
}
308+
bodyBuilder.endObject();
309+
request.setJsonEntity(Strings.toString(bodyBuilder));
310+
}
311+
assertOK(client().performRequest(request));
312+
}
313+
// Rollover and ensure only second backing index is replicated:
314+
{
315+
try (RestClient leaderClient = buildLeaderClient()) {
316+
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
317+
assertOK(leaderClient.performRequest(rolloverRequest));
318+
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");
319+
320+
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
321+
indexRequest.addParameter("refresh", "true");
322+
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
323+
assertOK(leaderClient.performRequest(indexRequest));
324+
verifyDocuments(leaderClient, dataStreamName, initialNumDocs + 1);
325+
}
326+
assertBusy(() -> {
327+
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
328+
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002");
329+
ensureYellow(dataStreamName);
330+
verifyDocuments(client(), dataStreamName, 1);
331+
});
332+
}
333+
// Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly:
334+
{
335+
followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001");
336+
assertBusy(() -> {
337+
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
338+
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");
339+
ensureYellow(dataStreamName);
340+
verifyDocuments(client(), dataStreamName, initialNumDocs + 1);
341+
});
342+
}
343+
// Cleanup:
344+
{
345+
deleteAutoFollowPattern("test_pattern");
346+
deleteDataStream(dataStreamName);
347+
}
348+
}
349+
166350
private int getNumberOfSuccessfulFollowedIndices() throws IOException {
167351
Request statsRequest = new Request("GET", "/_ccr/stats");
168352
Map<?, ?> response = toMap(client().performRequest(statsRequest));
169353
response = (Map<?, ?>) response.get("auto_follow_stats");
170354
return (Integer) response.get("number_of_successful_follow_indices");
171355
}
172356

357+
private static void verifyDocuments(final RestClient client,
358+
final String index,
359+
final int expectedNumDocs) throws IOException {
360+
final Request request = new Request("GET", "/" + index + "/_search");
361+
request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
362+
Map<String, ?> response = toMap(client.performRequest(request));
363+
364+
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
365+
assertThat(index, numDocs, equalTo(expectedNumDocs));
366+
}
367+
368+
static void verifyDataStream(final RestClient client,
369+
final String name,
370+
final String... expectedBackingIndices) throws IOException {
371+
Request request = new Request("GET", "/_data_stream/" + name);
372+
Map<String, ?> response = toMap(client.performRequest(request));
373+
List<?> retrievedDataStreams = (List<?>) response.get("data_streams");
374+
assertThat(retrievedDataStreams, hasSize(1));
375+
List<?> actualBackingIndices = (List<?>) ((Map<?, ?>) retrievedDataStreams.get(0)).get("indices");
376+
assertThat(actualBackingIndices, hasSize(expectedBackingIndices.length));
377+
for (int i = 0; i < expectedBackingIndices.length; i++) {
378+
Map<?, ?> actualBackingIndex = (Map<?, ?>) actualBackingIndices.get(i);
379+
String expectedBackingIndex = expectedBackingIndices[i];
380+
assertThat(actualBackingIndex.get("index_name"), equalTo(expectedBackingIndex));
381+
}
382+
}
383+
384+
private void deleteDataStream(String name) throws IOException {
385+
try (RestClient leaderClient = buildLeaderClient()) {
386+
Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name);
387+
assertOK(leaderClient.performRequest(deleteTemplateRequest));
388+
}
389+
}
173390

174391
}

0 commit comments

Comments
 (0)