Skip to content

Commit 59b14dc

Browse files
committed
Merge branch '6.x' into ccr-6.x
* 6.x: Allow engine to recover from translog upto a seqno (#33032) TEST: Skip assertSeqNos for closed shards (#33130) TEST: resync operation on replica should acquire shard permit (#33103) Add proxy support to RemoteClusterConnection (#33062) Build: Line up IDE detection logic Security index expands to a single replica (#33131) Suppress more tests HLRC: request/response homogeneity and JavaDoc improvements (#33133) [Rollup] Move toAggCap() methods out of rollup config objects (#32583) Muted all these tests due to #33128 Fix race condition in scheduler engine test
2 parents 41e4e16 + b5e93b5 commit 59b14dc

File tree

40 files changed

+785
-267
lines changed

40 files changed

+785
-267
lines changed

qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml

+11-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
---
22
"Fetch remote cluster info for existing cluster":
3-
3+
- skip:
4+
version: "all"
5+
reason: https://github.com/elastic/elasticsearch/issues/33128
46
- do:
57
cluster.remote_info: {}
68
- match: { my_remote_cluster.connected: true }
@@ -11,6 +13,9 @@
1113

1214
---
1315
"Add transient remote cluster based on the preset cluster and check remote info":
16+
- skip:
17+
version: "all"
18+
reason: https://github.com/elastic/elasticsearch/issues/33128
1419
- do:
1520
cluster.get_settings:
1621
include_defaults: true
@@ -65,8 +70,11 @@
6570
---
6671
"skip_unavailable is returned as part of _remote/info response":
6772
- skip:
68-
version: " - 6.0.99"
69-
reason: "skip_unavailable is only returned from 6.1.0 on"
73+
version: "all"
74+
reason: https://github.com/elastic/elasticsearch/issues/33128
75+
#- skip:
76+
# version: " - 6.0.99"
77+
# reason: "skip_unavailable is only returned from 6.1.0 on"
7078

7179
- do:
7280
cluster.get_settings:

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ public void apply(Settings value, Settings current, Settings previous) {
274274
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
275275
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
276276
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
277+
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
277278
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
278279
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
279280
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,

server/src/main/java/org/elasticsearch/common/settings/Setting.java

+4
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,10 @@ public static Setting<String> simpleString(String key, Property... properties) {
10091009
return new Setting<>(key, s -> "", Function.identity(), properties);
10101010
}
10111011

1012+
public static Setting<String> simpleString(String key, Function<String, String> parser, Property... properties) {
1013+
return new Setting<>(key, s -> "", parser, properties);
1014+
}
1015+
10121016
public static Setting<String> simpleString(String key, Setting<String> fallback, Property... properties) {
10131017
return new Setting<>(key, fallback, Function.identity(), properties);
10141018
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1648,10 +1648,12 @@ public interface Warmer {
16481648
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;
16491649

16501650
/**
1651-
* Performs recovery from the transaction log.
1651+
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
16521652
* This operation will close the engine if the recovery fails.
1653+
*
1654+
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
16531655
*/
1654-
public abstract Engine recoverFromTranslog() throws IOException;
1656+
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;
16551657

16561658
/**
16571659
* Do not replay translog operations, but make the engine be ready.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -398,15 +398,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
398398
}
399399

400400
@Override
401-
public InternalEngine recoverFromTranslog() throws IOException {
401+
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
402402
flushLock.lock();
403403
try (ReleasableLock lock = readLock.acquire()) {
404404
ensureOpen();
405405
if (pendingTranslogRecovery.get() == false) {
406406
throw new IllegalStateException("Engine has already been recovered");
407407
}
408408
try {
409-
recoverFromTranslogInternal();
409+
recoverFromTranslogInternal(recoverUpToSeqNo);
410410
} catch (Exception e) {
411411
try {
412412
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
@@ -428,11 +428,12 @@ public void skipTranslogRecovery() {
428428
pendingTranslogRecovery.set(false); // we are good - now we can commit
429429
}
430430

431-
private void recoverFromTranslogInternal() throws IOException {
431+
private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
432432
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
433433
final int opsRecovered;
434-
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
435-
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
434+
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
435+
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
436+
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
436437
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
437438
} catch (Exception e) {
438439
throw new EngineException(shardId, "failed to recover from translog", e);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1343,7 +1343,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13431343
**/
13441344
public void openEngineAndRecoverFromTranslog() throws IOException {
13451345
innerOpenEngineAndTranslog();
1346-
getEngine().recoverFromTranslog();
1346+
getEngine().recoverFromTranslog(Long.MAX_VALUE);
13471347
}
13481348

13491349
/**

server/src/main/java/org/elasticsearch/index/translog/Translog.java

+64-6
Original file line numberDiff line numberDiff line change
@@ -579,21 +579,27 @@ public long getLastSyncedGlobalCheckpoint() {
579579
*/
580580
public Snapshot newSnapshot() throws IOException {
581581
try (ReleasableLock ignored = readLock.acquire()) {
582-
return newSnapshotFromGen(getMinFileGeneration());
582+
return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE);
583583
}
584584
}
585585

586-
public Snapshot newSnapshotFromGen(long minGeneration) throws IOException {
586+
public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException {
587587
try (ReleasableLock ignored = readLock.acquire()) {
588588
ensureOpen();
589-
if (minGeneration < getMinFileGeneration()) {
590-
throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " +
589+
final long fromFileGen = fromGeneration.translogFileGeneration;
590+
if (fromFileGen < getMinFileGeneration()) {
591+
throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " +
591592
"Min referenced generation is [" + getMinFileGeneration() + "]");
592593
}
593594
TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
594-
.filter(reader -> reader.getGeneration() >= minGeneration)
595+
.filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo)
595596
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
596-
return newMultiSnapshot(snapshots);
597+
final Snapshot snapshot = newMultiSnapshot(snapshots);
598+
if (upToSeqNo == Long.MAX_VALUE) {
599+
return snapshot;
600+
} else {
601+
return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo);
602+
}
597603
}
598604
}
599605

@@ -928,7 +934,59 @@ default int overriddenOperations() {
928934
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
929935
*/
930936
Translog.Operation next() throws IOException;
937+
}
938+
939+
/**
940+
* A filtered snapshot consisting of only operations whose sequence numbers are in the given range
941+
* between {@code fromSeqNo} (inclusive) and {@code toSeqNo} (inclusive). This filtered snapshot
942+
* shares the same underlying resources with the {@code delegate} snapshot, therefore we should not
943+
* use the {@code delegate} after passing it to this filtered snapshot.
944+
*/
945+
static final class SeqNoFilterSnapshot implements Snapshot {
946+
private final Snapshot delegate;
947+
private int filteredOpsCount;
948+
private final long fromSeqNo; // inclusive
949+
private final long toSeqNo; // inclusive
950+
951+
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
952+
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
953+
this.delegate = delegate;
954+
this.fromSeqNo = fromSeqNo;
955+
this.toSeqNo = toSeqNo;
956+
}
957+
958+
@Override
959+
public int totalOperations() {
960+
return delegate.totalOperations();
961+
}
962+
963+
@Override
964+
public int skippedOperations() {
965+
return filteredOpsCount + delegate.skippedOperations();
966+
}
931967

968+
@Override
969+
public int overriddenOperations() {
970+
return delegate.overriddenOperations();
971+
}
972+
973+
@Override
974+
public Operation next() throws IOException {
975+
Translog.Operation op;
976+
while ((op = delegate.next()) != null) {
977+
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
978+
return op;
979+
} else {
980+
filteredOpsCount++;
981+
}
982+
}
983+
return null;
984+
}
985+
986+
@Override
987+
public void close() throws IOException {
988+
delegate.close();
989+
}
932990
}
933991

934992
/**

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

+51-10
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import java.util.EnumSet;
2122
import java.util.function.Supplier;
2223
import org.elasticsearch.Version;
2324
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
26+
import org.elasticsearch.common.Strings;
27+
import org.elasticsearch.common.UUIDs;
28+
import org.elasticsearch.common.collect.Tuple;
2529
import org.elasticsearch.common.component.AbstractComponent;
2630
import org.elasticsearch.common.settings.ClusterSettings;
2731
import org.elasticsearch.common.settings.Setting;
@@ -66,6 +70,22 @@ public abstract class RemoteClusterAware extends AbstractComponent {
6670
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
6771
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
6872

73+
/**
74+
* A proxy address for the remote cluster.
75+
* NOTE: this settings is undocumented until we have at last one transport that supports passing
76+
* on the hostname via a mechanism like SNI.
77+
*/
78+
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
79+
"search.remote.",
80+
"proxy",
81+
key -> Setting.simpleString(key, s -> {
82+
if (Strings.hasLength(s)) {
83+
parsePort(s);
84+
}
85+
return s;
86+
}, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
87+
88+
6989
protected final ClusterNameExpressionResolver clusterNameResolver;
7090

7191
/**
@@ -77,25 +97,42 @@ protected RemoteClusterAware(Settings settings) {
7797
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
7898
}
7999

80-
protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) {
100+
/**
101+
* Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple
102+
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
103+
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
104+
*/
105+
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
81106
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
82107
return allConcreteSettings.collect(
83108
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
84109
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
85110
List<String> addresses = concreteSetting.get(settings);
111+
final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings);
86112
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
87113
for (String address : addresses) {
88-
nodes.add(() -> {
89-
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
90-
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
91-
transportAddress,
92-
Version.CURRENT.minimumCompatibilityVersion());
93-
});
114+
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
94115
}
95-
return nodes;
116+
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
96117
}));
97118
}
98119

120+
static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) {
121+
if (proxyMode) {
122+
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
123+
String hostName = address.substring(0, indexOfPortSeparator(address));
124+
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
125+
transportAddress, Collections
126+
.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
127+
Version.CURRENT.minimumCompatibilityVersion());
128+
} else {
129+
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
130+
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
131+
transportAddress,
132+
Version.CURRENT.minimumCompatibilityVersion());
133+
}
134+
}
135+
99136
/**
100137
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
101138
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
@@ -138,20 +175,24 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
138175

139176
protected abstract Set<String> getRemoteClusterNames();
140177

178+
141179
/**
142180
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
143181
* empty the cluster alias is unregistered and should be removed.
144182
*/
145-
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses);
183+
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy);
146184

147185
/**
148186
* Registers this instance to listen to updates on the cluster settings.
149187
*/
150188
public void listenForUpdates(ClusterSettings clusterSettings) {
151-
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, this::updateRemoteCluster,
189+
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
190+
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
191+
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
152192
(namespace, value) -> {});
153193
}
154194

195+
155196
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
156197
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
157198
InetAddress hostAddress;

0 commit comments

Comments
 (0)