Skip to content

[7.x][Transform] correctly retrieve checkpoints from remote indices (#50903) #50969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,13 @@ public Collection<Object> createComponents(

TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName());
TransformCheckpointService checkpointService = new TransformCheckpointService(client, configManager, auditor);
TransformCheckpointService checkpointService = new TransformCheckpointService(
client,
settings,
clusterService,
configManager,
auditor
);
SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());

transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,32 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.transform.checkpoint.RemoteClusterResolver.ResolvedIndices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class DefaultCheckpointProvider implements CheckpointProvider {

Expand All @@ -45,17 +51,20 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class);

protected final Client client;
protected final RemoteClusterResolver remoteClusterResolver;
protected final TransformConfigManager transformConfigManager;
protected final TransformAuditor transformAuditor;
protected final TransformConfig transformConfig;

public DefaultCheckpointProvider(
final Client client,
final RemoteClusterResolver remoteClusterResolver,
final TransformConfigManager transformConfigManager,
final TransformAuditor transformAuditor,
final TransformConfig transformConfig
) {
this.client = client;
this.remoteClusterResolver = remoteClusterResolver;
this.transformConfigManager = transformConfigManager;
this.transformAuditor = transformAuditor;
this.transformConfig = transformConfig;
Expand Down Expand Up @@ -84,13 +93,61 @@ public void createNextCheckpoint(final TransformCheckpoint lastCheckpoint, final
}

protected void getIndexCheckpoints(ActionListener<Map<String, long[]>> listener) {
try {
ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex());
ActionListener<Map<String, long[]>> groupedListener = listener;

if (resolvedIndexes.numClusters() > 1) {
ActionListener<Collection<Map<String, long[]>>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> {
listener.onResponse(
indexCheckpoints.stream()
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()))
);
}, listener::onFailure);

groupedListener = new GroupedActionListener<>(mergeMapsListener, resolvedIndexes.numClusters());
}

if (resolvedIndexes.getLocalIndices().isEmpty() == false) {
getCheckpointsFromOneCluster(
client,
transformConfig.getHeaders(),
resolvedIndexes.getLocalIndices().toArray(new String[0]),
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
groupedListener
);
}

for (Map.Entry<String, List<String>> remoteIndex : resolvedIndexes.getRemoteIndicesPerClusterAlias().entrySet()) {
Client remoteClient = client.getRemoteClusterClient(remoteIndex.getKey());
getCheckpointsFromOneCluster(
remoteClient,
transformConfig.getHeaders(),
remoteIndex.getValue().toArray(new String[0]),
remoteIndex.getKey() + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR,
groupedListener
);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

private static void getCheckpointsFromOneCluster(
Client client,
Map<String, String> headers,
String[] indices,
String prefix,
ActionListener<Map<String, long[]>> listener
) {
// 1st get index to see the indexes the user has access to
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource().getIndex())
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(indices)
.features(new GetIndexRequest.Feature[0])
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
headers,
ClientHelper.TRANSFORM_ORIGIN,
client,
GetIndexAction.INSTANCE,
Expand All @@ -104,23 +161,20 @@ protected void getIndexCheckpoints(ActionListener<Map<String, long[]>> listener)
client,
ClientHelper.TRANSFORM_ORIGIN,
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices(transformConfig.getSource().getIndex())
.clear()
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
new IndicesStatsRequest().indices(indices).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
ActionListener.wrap(response -> {
if (response.getFailedShards() != 0) {
listener.onFailure(new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
return;
}

listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices));
listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix));
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)))
);
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)))
);
}

static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices) {
static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices, String prefix) {
Map<String, TreeMap<Integer, Long>> checkpointsByIndex = new TreeMap<>();

for (ShardStats shard : shards) {
Expand All @@ -129,9 +183,10 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
if (userIndices.contains(indexName)) {
// SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case
long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint();
if (checkpointsByIndex.containsKey(indexName)) {
String fullIndexName = prefix + indexName;
if (checkpointsByIndex.containsKey(fullIndexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(fullIndexName);
// 1st time we see this shard for this index, add the entry for the shard
// or there is already a checkpoint entry for this index/shard combination
// but with a higher global checkpoint. This is by design(not a problem) and
Expand All @@ -142,8 +197,8 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
}
} else {
// 1st time we see this index, create an entry for the index and add the shard checkpoint
checkpointsByIndex.put(indexName, new TreeMap<>());
checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint);
checkpointsByIndex.put(fullIndexName, new TreeMap<>());
checkpointsByIndex.get(fullIndexName).put(shard.getShardRouting().getId(), globalCheckpoint);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.checkpoint;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteConnectionStrategy;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* Maintain a list of remote clusters (aliases) and provide the ability to resolve.
*/
class RemoteClusterResolver extends RemoteClusterAware {

private final CopyOnWriteArraySet<String> clusters;

class ResolvedIndices {
private final Map<String, List<String>> remoteIndicesPerClusterAlias;
private final List<String> localIndices;

ResolvedIndices(Map<String, List<String>> remoteIndicesPerClusterAlias, List<String> localIndices) {
this.localIndices = localIndices;
this.remoteIndicesPerClusterAlias = remoteIndicesPerClusterAlias;
}

public Map<String, List<String>> getRemoteIndicesPerClusterAlias() {
return remoteIndicesPerClusterAlias;
}

public List<String> getLocalIndices() {
return localIndices;
}

public int numClusters() {
return remoteIndicesPerClusterAlias.size() + (localIndices.isEmpty() ? 0 : 1);
}
}

RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) {
super(settings);
clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings));
listenForUpdates(clusterSettings);
}

@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) {
clusters.add(clusterAlias);
} else {
clusters.remove(clusterAlias);
}
}

ResolvedIndices resolve(String... indices) {
// 7.x workaround, see gh#40419
Map<String, List<String>> resolvedClusterIndices = groupClusterIndices(clusters, indices, i -> false);
List<String> localIndices = resolvedClusterIndices.getOrDefault(LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList());
resolvedClusterIndices.remove(LOCAL_CLUSTER_GROUP_KEY);
return new ResolvedIndices(resolvedClusterIndices, localIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public class TimeBasedCheckpointProvider extends DefaultCheckpointProvider {

TimeBasedCheckpointProvider(
final Client client,
final RemoteClusterResolver remoteClusterResolver,
final TransformConfigManager transformConfigManager,
final TransformAuditor transformAuditor,
final TransformConfig transformConfig
) {
super(client, transformConfigManager, transformAuditor, transformConfig);
super(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig);
timeSyncConfig = (TimeSyncConfig) transformConfig.getSyncConfig();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
Expand All @@ -33,23 +35,33 @@ public class TransformCheckpointService {
private final Client client;
private final TransformConfigManager transformConfigManager;
private final TransformAuditor transformAuditor;
private final RemoteClusterResolver remoteClusterResolver;

public TransformCheckpointService(
final Client client,
final Settings settings,
final ClusterService clusterService,
final TransformConfigManager transformConfigManager,
TransformAuditor transformAuditor
) {
this.client = client;
this.transformConfigManager = transformConfigManager;
this.transformAuditor = transformAuditor;
this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings());
}

public CheckpointProvider getCheckpointProvider(final TransformConfig transformConfig) {
if (transformConfig.getSyncConfig() instanceof TimeSyncConfig) {
return new TimeBasedCheckpointProvider(client, transformConfigManager, transformAuditor, transformConfig);
return new TimeBasedCheckpointProvider(
client,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);
}

return new DefaultCheckpointProvider(client, transformConfigManager, transformAuditor, transformConfig);
return new DefaultCheckpointProvider(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig);
}

/**
Expand Down Expand Up @@ -82,5 +94,4 @@ public void getCheckpointingInfo(
listener.onFailure(new CheckpointException("Failed to retrieve configuration", transformError));
}));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
Expand Down Expand Up @@ -48,6 +50,7 @@ public void testReportSourceIndexChangesRunsEmpty() throws Exception {

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
client,
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
transformConfigManager,
transformAuditor,
transformConfig
Expand Down Expand Up @@ -92,6 +95,7 @@ public void testReportSourceIndexChangesAddDelete() throws Exception {

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
client,
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
transformConfigManager,
transformAuditor,
transformConfig
Expand Down Expand Up @@ -151,6 +155,7 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception {

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
client,
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
transformConfigManager,
transformAuditor,
transformConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.cache.request.RequestCacheStats;
Expand Down Expand Up @@ -138,7 +141,13 @@ public void createComponents() {

// use a mock for the checkpoint service
TransformAuditor mockAuditor = mock(TransformAuditor.class);
transformCheckpointService = new TransformCheckpointService(mockClientForCheckpointing, transformsConfigManager, mockAuditor);
transformCheckpointService = new TransformCheckpointService(
mockClientForCheckpointing,
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
transformsConfigManager,
mockAuditor
);
}

@AfterClass
Expand Down
Loading