Skip to content

Commit 360f954

Browse files
author
Hendrik Muhs
authored
[Transform] correctly retrieve checkpoints from remote indices (#50903)
uses remote client(s) to correctly retrieve index checkpoints from remote clusters
1 parent 3c6f649 commit 360f954

10 files changed

+211
-35
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,13 @@ public Collection<Object> createComponents(
244244

245245
TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
246246
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName());
247-
TransformCheckpointService checkpointService = new TransformCheckpointService(client, configManager, auditor);
247+
TransformCheckpointService checkpointService = new TransformCheckpointService(
248+
client,
249+
settings,
250+
clusterService,
251+
configManager,
252+
auditor
253+
);
248254
SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());
249255

250256
transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java

+67-12
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,32 @@
1616
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
1717
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
1818
import org.elasticsearch.action.admin.indices.stats.ShardStats;
19+
import org.elasticsearch.action.support.GroupedActionListener;
1920
import org.elasticsearch.action.support.IndicesOptions;
2021
import org.elasticsearch.client.Client;
2122
import org.elasticsearch.common.util.set.Sets;
23+
import org.elasticsearch.transport.RemoteClusterService;
2224
import org.elasticsearch.xpack.core.ClientHelper;
2325
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
2426
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
2527
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
2628
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
2729
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
2830
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
31+
import org.elasticsearch.xpack.transform.checkpoint.RemoteClusterResolver.ResolvedIndices;
2932
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
3033
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
3134

3235
import java.time.Instant;
3336
import java.util.Arrays;
37+
import java.util.Collection;
3438
import java.util.Collections;
3539
import java.util.HashSet;
40+
import java.util.List;
3641
import java.util.Map;
3742
import java.util.Set;
3843
import java.util.TreeMap;
44+
import java.util.stream.Collectors;
3945

4046
public class DefaultCheckpointProvider implements CheckpointProvider {
4147

@@ -45,17 +51,20 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
4551
private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class);
4652

4753
protected final Client client;
54+
protected final RemoteClusterResolver remoteClusterResolver;
4855
protected final TransformConfigManager transformConfigManager;
4956
protected final TransformAuditor transformAuditor;
5057
protected final TransformConfig transformConfig;
5158

5259
public DefaultCheckpointProvider(
5360
final Client client,
61+
final RemoteClusterResolver remoteClusterResolver,
5462
final TransformConfigManager transformConfigManager,
5563
final TransformAuditor transformAuditor,
5664
final TransformConfig transformConfig
5765
) {
5866
this.client = client;
67+
this.remoteClusterResolver = remoteClusterResolver;
5968
this.transformConfigManager = transformConfigManager;
6069
this.transformAuditor = transformAuditor;
6170
this.transformConfig = transformConfig;
@@ -84,13 +93,61 @@ public void createNextCheckpoint(final TransformCheckpoint lastCheckpoint, final
8493
}
8594

8695
protected void getIndexCheckpoints(ActionListener<Map<String, long[]>> listener) {
96+
try {
97+
ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex());
98+
ActionListener<Map<String, long[]>> groupedListener = listener;
99+
100+
if (resolvedIndexes.numClusters() > 1) {
101+
ActionListener<Collection<Map<String, long[]>>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> {
102+
listener.onResponse(
103+
indexCheckpoints.stream()
104+
.flatMap(m -> m.entrySet().stream())
105+
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()))
106+
);
107+
}, listener::onFailure);
108+
109+
groupedListener = new GroupedActionListener<>(mergeMapsListener, resolvedIndexes.numClusters());
110+
}
111+
112+
if (resolvedIndexes.getLocalIndices().isEmpty() == false) {
113+
getCheckpointsFromOneCluster(
114+
client,
115+
transformConfig.getHeaders(),
116+
resolvedIndexes.getLocalIndices().toArray(new String[0]),
117+
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
118+
groupedListener
119+
);
120+
}
121+
122+
for (Map.Entry<String, List<String>> remoteIndex : resolvedIndexes.getRemoteIndicesPerClusterAlias().entrySet()) {
123+
Client remoteClient = client.getRemoteClusterClient(remoteIndex.getKey());
124+
getCheckpointsFromOneCluster(
125+
remoteClient,
126+
transformConfig.getHeaders(),
127+
remoteIndex.getValue().toArray(new String[0]),
128+
remoteIndex.getKey() + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR,
129+
groupedListener
130+
);
131+
}
132+
} catch (Exception e) {
133+
listener.onFailure(e);
134+
}
135+
}
136+
137+
private static void getCheckpointsFromOneCluster(
138+
Client client,
139+
Map<String, String> headers,
140+
String[] indices,
141+
String prefix,
142+
ActionListener<Map<String, long[]>> listener
143+
) {
87144
// 1st get index to see the indexes the user has access to
88-
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource().getIndex())
145+
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(indices)
89146
.features(new GetIndexRequest.Feature[0])
90147
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
91148

92149
ClientHelper.executeWithHeadersAsync(
93-
transformConfig.getHeaders(),
150+
headers,
94151
ClientHelper.TRANSFORM_ORIGIN,
95152
client,
96153
GetIndexAction.INSTANCE,
@@ -104,23 +161,20 @@ protected void getIndexCheckpoints(ActionListener<Map<String, long[]>> listener)
104161
client,
105162
ClientHelper.TRANSFORM_ORIGIN,
106163
IndicesStatsAction.INSTANCE,
107-
new IndicesStatsRequest().indices(transformConfig.getSource().getIndex())
108-
.clear()
109-
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
164+
new IndicesStatsRequest().indices(indices).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
110165
ActionListener.wrap(response -> {
111166
if (response.getFailedShards() != 0) {
112167
listener.onFailure(new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
113168
return;
114169
}
115-
116-
listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices));
170+
listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix));
117171
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)))
118172
);
119173
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)))
120174
);
121175
}
122176

123-
static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices) {
177+
static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices, String prefix) {
124178
Map<String, TreeMap<Integer, Long>> checkpointsByIndex = new TreeMap<>();
125179

126180
for (ShardStats shard : shards) {
@@ -129,9 +183,10 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
129183
if (userIndices.contains(indexName)) {
130184
// SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case
131185
long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint();
132-
if (checkpointsByIndex.containsKey(indexName)) {
186+
String fullIndexName = prefix + indexName;
187+
if (checkpointsByIndex.containsKey(fullIndexName)) {
133188
// we have already seen this index, just check/add shards
134-
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
189+
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(fullIndexName);
135190
// 1st time we see this shard for this index, add the entry for the shard
136191
// or there is already a checkpoint entry for this index/shard combination
137192
// but with a higher global checkpoint. This is by design(not a problem) and
@@ -142,8 +197,8 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
142197
}
143198
} else {
144199
// 1st time we see this index, create an entry for the index and add the shard checkpoint
145-
checkpointsByIndex.put(indexName, new TreeMap<>());
146-
checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint);
200+
checkpointsByIndex.put(fullIndexName, new TreeMap<>());
201+
checkpointsByIndex.get(fullIndexName).put(shard.getShardRouting().getId(), globalCheckpoint);
147202
}
148203
}
149204
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform.checkpoint;
8+
9+
import org.elasticsearch.common.settings.ClusterSettings;
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.transport.RemoteClusterAware;
12+
import org.elasticsearch.transport.RemoteConnectionStrategy;
13+
14+
import java.util.Collections;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.concurrent.CopyOnWriteArraySet;
18+
19+
/**
20+
* Maintain a list of remote clusters (aliases) and provide the ability to resolve.
21+
*/
22+
class RemoteClusterResolver extends RemoteClusterAware {
23+
24+
private final CopyOnWriteArraySet<String> clusters;
25+
26+
class ResolvedIndices {
27+
private final Map<String, List<String>> remoteIndicesPerClusterAlias;
28+
private final List<String> localIndices;
29+
30+
ResolvedIndices(Map<String, List<String>> remoteIndicesPerClusterAlias, List<String> localIndices) {
31+
this.localIndices = localIndices;
32+
this.remoteIndicesPerClusterAlias = remoteIndicesPerClusterAlias;
33+
}
34+
35+
public Map<String, List<String>> getRemoteIndicesPerClusterAlias() {
36+
return remoteIndicesPerClusterAlias;
37+
}
38+
39+
public List<String> getLocalIndices() {
40+
return localIndices;
41+
}
42+
43+
public int numClusters() {
44+
return remoteIndicesPerClusterAlias.size() + (localIndices.isEmpty() ? 0 : 1);
45+
}
46+
}
47+
48+
RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) {
49+
super(settings);
50+
clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings));
51+
listenForUpdates(clusterSettings);
52+
}
53+
54+
@Override
55+
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
56+
if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) {
57+
clusters.add(clusterAlias);
58+
} else {
59+
clusters.remove(clusterAlias);
60+
}
61+
}
62+
63+
ResolvedIndices resolve(String... indices) {
64+
Map<String, List<String>> resolvedClusterIndices = groupClusterIndices(clusters, indices);
65+
List<String> localIndices = resolvedClusterIndices.getOrDefault(LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList());
66+
resolvedClusterIndices.remove(LOCAL_CLUSTER_GROUP_KEY);
67+
return new ResolvedIndices(resolvedClusterIndices, localIndices);
68+
}
69+
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ public class TimeBasedCheckpointProvider extends DefaultCheckpointProvider {
3232

3333
TimeBasedCheckpointProvider(
3434
final Client client,
35+
final RemoteClusterResolver remoteClusterResolver,
3536
final TransformConfigManager transformConfigManager,
3637
final TransformAuditor transformAuditor,
3738
final TransformConfig transformConfig
3839
) {
39-
super(client, transformConfigManager, transformAuditor, transformConfig);
40+
super(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig);
4041
timeSyncConfig = (TimeSyncConfig) transformConfig.getSyncConfig();
4142
}
4243

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.apache.logging.log4j.Logger;
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.client.Client;
13+
import org.elasticsearch.cluster.service.ClusterService;
14+
import org.elasticsearch.common.settings.Settings;
1315
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
1416
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
1517
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@@ -33,23 +35,33 @@ public class TransformCheckpointService {
3335
private final Client client;
3436
private final TransformConfigManager transformConfigManager;
3537
private final TransformAuditor transformAuditor;
38+
private final RemoteClusterResolver remoteClusterResolver;
3639

3740
public TransformCheckpointService(
3841
final Client client,
42+
final Settings settings,
43+
final ClusterService clusterService,
3944
final TransformConfigManager transformConfigManager,
4045
TransformAuditor transformAuditor
4146
) {
4247
this.client = client;
4348
this.transformConfigManager = transformConfigManager;
4449
this.transformAuditor = transformAuditor;
50+
this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings());
4551
}
4652

4753
public CheckpointProvider getCheckpointProvider(final TransformConfig transformConfig) {
4854
if (transformConfig.getSyncConfig() instanceof TimeSyncConfig) {
49-
return new TimeBasedCheckpointProvider(client, transformConfigManager, transformAuditor, transformConfig);
55+
return new TimeBasedCheckpointProvider(
56+
client,
57+
remoteClusterResolver,
58+
transformConfigManager,
59+
transformAuditor,
60+
transformConfig
61+
);
5062
}
5163

52-
return new DefaultCheckpointProvider(client, transformConfigManager, transformAuditor, transformConfig);
64+
return new DefaultCheckpointProvider(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig);
5365
}
5466

5567
/**
@@ -82,5 +94,4 @@ public void getCheckpointingInfo(
8294
listener.onFailure(new CheckpointException("Failed to retrieve configuration", transformError));
8395
}));
8496
}
85-
8697
}

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.client.Client;
1313
import org.elasticsearch.common.logging.Loggers;
14+
import org.elasticsearch.common.settings.ClusterSettings;
15+
import org.elasticsearch.common.settings.Settings;
1416
import org.elasticsearch.common.util.set.Sets;
1517
import org.elasticsearch.test.ESTestCase;
1618
import org.elasticsearch.test.MockLogAppender;
@@ -48,6 +50,7 @@ public void testReportSourceIndexChangesRunsEmpty() throws Exception {
4850

4951
DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
5052
client,
53+
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
5154
transformConfigManager,
5255
transformAuditor,
5356
transformConfig
@@ -92,6 +95,7 @@ public void testReportSourceIndexChangesAddDelete() throws Exception {
9295

9396
DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
9497
client,
98+
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
9599
transformConfigManager,
96100
transformAuditor,
97101
transformConfig
@@ -151,6 +155,7 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception {
151155

152156
DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
153157
client,
158+
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
154159
transformConfigManager,
155160
transformAuditor,
156161
transformConfig

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import org.elasticsearch.cluster.routing.RecoverySource;
2020
import org.elasticsearch.cluster.routing.ShardRouting;
2121
import org.elasticsearch.cluster.routing.UnassignedInfo;
22+
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.UUIDs;
24+
import org.elasticsearch.common.settings.ClusterSettings;
25+
import org.elasticsearch.common.settings.Settings;
2326
import org.elasticsearch.index.Index;
2427
import org.elasticsearch.index.cache.query.QueryCacheStats;
2528
import org.elasticsearch.index.cache.request.RequestCacheStats;
@@ -138,7 +141,13 @@ public void createComponents() {
138141

139142
// use a mock for the checkpoint service
140143
TransformAuditor mockAuditor = mock(TransformAuditor.class);
141-
transformCheckpointService = new TransformCheckpointService(mockClientForCheckpointing, transformsConfigManager, mockAuditor);
144+
transformCheckpointService = new TransformCheckpointService(
145+
mockClientForCheckpointing,
146+
Settings.EMPTY,
147+
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
148+
transformsConfigManager,
149+
mockAuditor
150+
);
142151
}
143152

144153
@AfterClass

0 commit comments

Comments
 (0)