Skip to content

Commit 9ae6905

Browse files
authored
add support for write index resolution when creating/updating documents (#31520)
Now write operations like Index, Delete, Update rely on the write-index associated with an alias to operate against. This means writes will be accepted even when an alias points to multiple indices, so long as one is the write index. Routing values will be used from the AliasMetaData for the alias in the write-index. All read operations are left untouched.
1 parent 7c0fc20 commit 9ae6905

File tree

12 files changed

+531
-35
lines changed

12 files changed

+531
-35
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ protected void doRun() throws Exception {
295295
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
296296
break;
297297
case DELETE:
298-
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
298+
docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
299299
// check if routing is required, if so, throw error if routing wasn't specified
300300
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
301301
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
@@ -474,7 +474,7 @@ Index getConcreteIndex(String indexOrAlias) {
474474
Index resolveIfAbsent(DocWriteRequest<?> request) {
475475
Index concreteIndex = indices.get(request.index());
476476
if (concreteIndex == null) {
477-
concreteIndex = indexNameExpressionResolver.concreteSingleIndex(state, request);
477+
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request);
478478
indices.put(request.index(), concreteIndex);
479479
}
480480
return concreteIndex;

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappi
496496

497497
/* resolve the routing if needed */
498498
public void resolveRouting(MetaData metaData) {
499-
routing(metaData.resolveIndexRouting(routing, index));
499+
routing(metaData.resolveWriteIndexRouting(routing, index));
500500
}
501501

502502
@Override

server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ protected void resolveRequest(ClusterState state, UpdateRequest request) {
104104
}
105105

106106
public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) {
107-
request.routing((metaData.resolveIndexRouting(request.routing(), request.index())));
107+
request.routing((metaData.resolveWriteIndexRouting(request.routing(), request.index())));
108108
// Fail fast on the node that received the request, rather than failing when translating on the index or delete request.
109109
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
110110
throw new RoutingMissingException(concreteIndex, request.type(), request.id());

server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242

4343
import java.util.ArrayList;
4444
import java.util.Arrays;
45-
import java.util.Collection;
4645
import java.util.Collections;
4746
import java.util.HashMap;
4847
import java.util.HashSet;
@@ -103,7 +102,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, S
103102
return concreteIndexNames(context, indexExpressions);
104103
}
105104

106-
/**
105+
/**
107106
* Translates the provided index expression into actual concrete indices, properly deduplicated.
108107
*
109108
* @param state the cluster state containing all the data to resolve to expressions to concrete indices
@@ -117,7 +116,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, S
117116
* indices options in the context don't allow such a case.
118117
*/
119118
public Index[] concreteIndices(ClusterState state, IndicesOptions options, String... indexExpressions) {
120-
Context context = new Context(state, options);
119+
Context context = new Context(state, options, false, false);
121120
return concreteIndices(context, indexExpressions);
122121
}
123122

@@ -193,30 +192,40 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
193192
}
194193
}
195194

196-
Collection<IndexMetaData> resolvedIndices = aliasOrIndex.getIndices();
197-
if (resolvedIndices.size() > 1 && !options.allowAliasesToMultipleIndices()) {
198-
String[] indexNames = new String[resolvedIndices.size()];
199-
int i = 0;
200-
for (IndexMetaData indexMetaData : resolvedIndices) {
201-
indexNames[i++] = indexMetaData.getIndex().getName();
195+
if (aliasOrIndex.isAlias() && context.isResolveToWriteIndex()) {
196+
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) aliasOrIndex;
197+
IndexMetaData writeIndex = alias.getWriteIndex();
198+
if (writeIndex == null) {
199+
throw new IllegalArgumentException("no write index is defined for alias [" + alias.getAliasName() + "]." +
200+
" The write index may be explicitly disabled using is_write_index=false or the alias points to multiple" +
201+
" indices without one being designated as a write index");
202202
}
203-
throw new IllegalArgumentException("Alias [" + expression + "] has more than one indices associated with it [" +
203+
concreteIndices.add(writeIndex.getIndex());
204+
} else {
205+
if (aliasOrIndex.getIndices().size() > 1 && !options.allowAliasesToMultipleIndices()) {
206+
String[] indexNames = new String[aliasOrIndex.getIndices().size()];
207+
int i = 0;
208+
for (IndexMetaData indexMetaData : aliasOrIndex.getIndices()) {
209+
indexNames[i++] = indexMetaData.getIndex().getName();
210+
}
211+
throw new IllegalArgumentException("Alias [" + expression + "] has more than one indices associated with it [" +
204212
Arrays.toString(indexNames) + "], can't execute a single index op");
205-
}
213+
}
206214

207-
for (IndexMetaData index : resolvedIndices) {
208-
if (index.getState() == IndexMetaData.State.CLOSE) {
209-
if (failClosed) {
210-
throw new IndexClosedException(index.getIndex());
211-
} else {
212-
if (options.forbidClosedIndices() == false) {
213-
concreteIndices.add(index.getIndex());
215+
for (IndexMetaData index : aliasOrIndex.getIndices()) {
216+
if (index.getState() == IndexMetaData.State.CLOSE) {
217+
if (failClosed) {
218+
throw new IndexClosedException(index.getIndex());
219+
} else {
220+
if (options.forbidClosedIndices() == false) {
221+
concreteIndices.add(index.getIndex());
222+
}
214223
}
224+
} else if (index.getState() == IndexMetaData.State.OPEN) {
225+
concreteIndices.add(index.getIndex());
226+
} else {
227+
throw new IllegalStateException("index state [" + index.getState() + "] not supported");
215228
}
216-
} else if (index.getState() == IndexMetaData.State.OPEN) {
217-
concreteIndices.add(index.getIndex());
218-
} else {
219-
throw new IllegalStateException("index state [" + index.getState() + "] not supported");
220229
}
221230
}
222231
}
@@ -255,6 +264,28 @@ public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
255264
return indices[0];
256265
}
257266

267+
/**
268+
* Utility method that allows to resolve an index expression to its corresponding single write index.
269+
*
270+
* @param state the cluster state containing all the data to resolve to expression to a concrete index
271+
* @param request The request that defines how the an alias or an index need to be resolved to a concrete index
272+
* and the expression that can be resolved to an alias or an index name.
273+
* @throws IllegalArgumentException if the index resolution does not lead to an index, or leads to more than one index
274+
* @return the write index obtained as a result of the index resolution
275+
*/
276+
public Index concreteWriteIndex(ClusterState state, IndicesRequest request) {
277+
if (request.indices() == null || (request.indices() != null && request.indices().length != 1)) {
278+
throw new IllegalArgumentException("indices request must specify a single index expression");
279+
}
280+
Context context = new Context(state, request.indicesOptions(), false, true);
281+
Index[] indices = concreteIndices(context, request.indices()[0]);
282+
if (indices.length != 1) {
283+
throw new IllegalArgumentException("The index expression [" + request.indices()[0] +
284+
"] and options provided did not point to a single write-index");
285+
}
286+
return indices[0];
287+
}
288+
258289
/**
259290
* @return whether the specified alias or index exists. If the alias or index contains datemath then that is resolved too.
260291
*/
@@ -292,7 +323,7 @@ public String[] indexAliases(ClusterState state, String index, Predicate<AliasMe
292323
String... expressions) {
293324
// expand the aliases wildcard
294325
List<String> resolvedExpressions = expressions != null ? Arrays.asList(expressions) : Collections.emptyList();
295-
Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true);
326+
Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true, false);
296327
for (ExpressionResolver expressionResolver : expressionResolvers) {
297328
resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions);
298329
}
@@ -512,24 +543,26 @@ static final class Context {
512543
private final IndicesOptions options;
513544
private final long startTime;
514545
private final boolean preserveAliases;
546+
private final boolean resolveToWriteIndex;
515547

516548
Context(ClusterState state, IndicesOptions options) {
517549
this(state, options, System.currentTimeMillis());
518550
}
519551

520-
Context(ClusterState state, IndicesOptions options, boolean preserveAliases) {
521-
this(state, options, System.currentTimeMillis(), preserveAliases);
552+
Context(ClusterState state, IndicesOptions options, boolean preserveAliases, boolean resolveToWriteIndex) {
553+
this(state, options, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex);
522554
}
523555

524556
Context(ClusterState state, IndicesOptions options, long startTime) {
525-
this(state, options, startTime, false);
557+
this(state, options, startTime, false, false);
526558
}
527559

528-
Context(ClusterState state, IndicesOptions options, long startTime, boolean preserveAliases) {
560+
Context(ClusterState state, IndicesOptions options, long startTime, boolean preserveAliases, boolean resolveToWriteIndex) {
529561
this.state = state;
530562
this.options = options;
531563
this.startTime = startTime;
532564
this.preserveAliases = preserveAliases;
565+
this.resolveToWriteIndex = resolveToWriteIndex;
533566
}
534567

535568
public ClusterState getState() {
@@ -552,6 +585,14 @@ public long getStartTime() {
552585
boolean isPreserveAliases() {
553586
return preserveAliases;
554587
}
588+
589+
/**
590+
* This is used to require that aliases resolve to their write-index. It is currently not used in conjunction
591+
* with <code>preserveAliases</code>.
592+
*/
593+
boolean isResolveToWriteIndex() {
594+
return resolveToWriteIndex;
595+
}
555596
}
556597

557598
private interface ExpressionResolver {

server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,42 @@ public String[] getConcreteAllClosedIndices() {
471471
return allClosedIndices;
472472
}
473473

474+
/**
475+
* Returns indexing routing for the given <code>aliasOrIndex</code>. Resolves routing from the alias metadata used
476+
* in the write index.
477+
*/
478+
public String resolveWriteIndexRouting(@Nullable String routing, String aliasOrIndex) {
479+
if (aliasOrIndex == null) {
480+
return routing;
481+
}
482+
483+
AliasOrIndex result = getAliasAndIndexLookup().get(aliasOrIndex);
484+
if (result == null || result.isAlias() == false) {
485+
return routing;
486+
}
487+
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) result;
488+
IndexMetaData writeIndex = alias.getWriteIndex();
489+
if (writeIndex == null) {
490+
throw new IllegalArgumentException("alias [" + aliasOrIndex + "] does not have a write index");
491+
}
492+
AliasMetaData aliasMd = writeIndex.getAliases().get(alias.getAliasName());
493+
if (aliasMd.indexRouting() != null) {
494+
if (aliasMd.indexRouting().indexOf(',') != -1) {
495+
throw new IllegalArgumentException("index/alias [" + aliasOrIndex + "] provided with routing value ["
496+
+ aliasMd.getIndexRouting() + "] that resolved to several routing values, rejecting operation");
497+
}
498+
if (routing != null) {
499+
if (!routing.equals(aliasMd.indexRouting())) {
500+
throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has index routing associated with it ["
501+
+ aliasMd.indexRouting() + "], and was provided with routing value [" + routing + "], rejecting operation");
502+
}
503+
}
504+
// Alias routing overrides the parent routing (if any).
505+
return aliasMd.indexRouting();
506+
}
507+
return routing;
508+
}
509+
474510
/**
475511
* Returns indexing routing for the given index.
476512
*/

server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,20 @@
2020

2121
package org.elasticsearch.action.bulk;
2222

23+
import org.elasticsearch.action.admin.indices.alias.Alias;
2324
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
25+
import org.elasticsearch.action.index.IndexRequest;
26+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2427
import org.elasticsearch.common.xcontent.XContentType;
28+
import org.elasticsearch.rest.RestStatus;
2529
import org.elasticsearch.test.ESIntegTestCase;
2630

2731
import java.nio.charset.StandardCharsets;
32+
import java.util.Collections;
33+
import java.util.Map;
2834

2935
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
36+
import static org.hamcrest.Matchers.equalTo;
3037

3138
public class BulkIntegrationIT extends ESIntegTestCase {
3239
public void testBulkIndexCreatesMapping() throws Exception {
@@ -40,4 +47,38 @@ public void testBulkIndexCreatesMapping() throws Exception {
4047
assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
4148
});
4249
}
50+
51+
/**
52+
* This tests that the {@link TransportBulkAction} evaluates alias routing values correctly when dealing with
53+
* an alias pointing to multiple indices, while a write index exits.
54+
*/
55+
public void testBulkWithWriteIndexAndRouting() {
56+
Map<String, Integer> twoShardsSettings = Collections.singletonMap(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2);
57+
client().admin().indices().prepareCreate("index1")
58+
.addAlias(new Alias("alias1").indexRouting("0")).setSettings(twoShardsSettings).get();
59+
client().admin().indices().prepareCreate("index2")
60+
.addAlias(new Alias("alias1").indexRouting("0").writeIndex(randomFrom(false, null)))
61+
.setSettings(twoShardsSettings).get();
62+
client().admin().indices().prepareCreate("index3")
63+
.addAlias(new Alias("alias1").indexRouting("1").writeIndex(true)).setSettings(twoShardsSettings).get();
64+
65+
IndexRequest indexRequestWithAlias = new IndexRequest("alias1", "type", "id");
66+
if (randomBoolean()) {
67+
indexRequestWithAlias.routing("1");
68+
}
69+
indexRequestWithAlias.source(Collections.singletonMap("foo", "baz"));
70+
BulkResponse bulkResponse = client().prepareBulk().add(indexRequestWithAlias).get();
71+
assertThat(bulkResponse.getItems()[0].getResponse().getIndex(), equalTo("index3"));
72+
assertThat(bulkResponse.getItems()[0].getResponse().getShardId().getId(), equalTo(0));
73+
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
74+
assertThat(bulkResponse.getItems()[0].getResponse().status(), equalTo(RestStatus.CREATED));
75+
assertThat(client().prepareGet("index3", "type", "id").setRouting("1").get().getSource().get("foo"), equalTo("baz"));
76+
77+
bulkResponse = client().prepareBulk().add(client().prepareUpdate("alias1", "type", "id").setDoc("foo", "updated")).get();
78+
assertFalse(bulkResponse.hasFailures());
79+
assertThat(client().prepareGet("index3", "type", "id").setRouting("1").get().getSource().get("foo"), equalTo("updated"));
80+
bulkResponse = client().prepareBulk().add(client().prepareDelete("alias1", "type", "id")).get();
81+
assertFalse(bulkResponse.hasFailures());
82+
assertFalse(client().prepareGet("index3", "type", "id").setRouting("1").get().isExists());
83+
}
4384
}

0 commit comments

Comments
 (0)