-
Notifications
You must be signed in to change notification settings - Fork 25.2k
add support for write index resolution when creating/updating documents #31520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
9e86921
df8fabd
7c75916
113ebc5
fb7c168
c1d2d4e
b585835
42893bf
54d0a3d
b5e5e81
d4a5ebd
2f8a004
9238cb5
8ae0967
e8cf8c8
2f2af95
3df26bd
5d47213
00eb394
7c753db
f1032e4
347149e
e58a3fb
222ae13
ae372f3
b54efe4
cda911f
eb097c2
cc969d1
284d3ee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,7 +103,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, S | |
return concreteIndexNames(context, indexExpressions); | ||
} | ||
|
||
/** | ||
/** | ||
* Translates the provided index expression into actual concrete indices, properly deduplicated. | ||
* | ||
* @param state the cluster state containing all the data to resolve to expressions to concrete indices | ||
|
@@ -117,7 +117,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, S | |
* indices options in the context don't allow such a case. | ||
*/ | ||
public Index[] concreteIndices(ClusterState state, IndicesOptions options, String... indexExpressions) { | ||
Context context = new Context(state, options); | ||
Context context = new Context(state, options, false, false); | ||
return concreteIndices(context, indexExpressions); | ||
} | ||
|
||
|
@@ -194,29 +194,45 @@ Index[] concreteIndices(Context context, String... indexExpressions) { | |
} | ||
|
||
Collection<IndexMetaData> resolvedIndices = aliasOrIndex.getIndices(); | ||
if (resolvedIndices.size() > 1 && !options.allowAliasesToMultipleIndices()) { | ||
String[] indexNames = new String[resolvedIndices.size()]; | ||
int i = 0; | ||
for (IndexMetaData indexMetaData : resolvedIndices) { | ||
indexNames[i++] = indexMetaData.getIndex().getName(); | ||
|
||
if (aliasOrIndex.isAlias() && context.isResolveToWriteIndex()) { | ||
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) aliasOrIndex; | ||
IndexMetaData writeIndex = alias.getWriteIndex(); | ||
if (writeIndex == null) { | ||
if (alias.getIndices().size() > 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you can make this statement - it may be that all of them have their write index flag set to false. I think we can have a generic statement like "no write index is defined for alias X. The write index may be explicitly disabled using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will make more generic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
saying it points to multiple indices with none set as a write-index [is_write_index=true] does not conflict with that statement? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't this covered by "the alias points to multiple indices without one being designated as a write index"? |
||
throw new IllegalArgumentException("Alias [" + alias.getAliasName() + | ||
"] points to multiple indices with none set as a write-index [is_write_index=true]"); | ||
} else { | ||
throw new IllegalArgumentException("Alias [" + alias.getAliasName() + "] points to an index [" | ||
+ alias.getIndices().get(0).getIndex().getName() + "] with [is_write_index=false]"); | ||
} | ||
} | ||
throw new IllegalArgumentException("Alias [" + expression + "] has more than one indices associated with it [" + | ||
concreteIndices.add(writeIndex.getIndex()); | ||
} else { | ||
if (resolvedIndices.size() > 1 && !options.allowAliasesToMultipleIndices()) { | ||
String[] indexNames = new String[resolvedIndices.size()]; | ||
int i = 0; | ||
for (IndexMetaData indexMetaData : resolvedIndices) { | ||
indexNames[i++] = indexMetaData.getIndex().getName(); | ||
} | ||
throw new IllegalArgumentException("Alias [" + expression + "] has more than one indices associated with it [" + | ||
Arrays.toString(indexNames) + "], can't execute a single index op"); | ||
} | ||
} | ||
|
||
for (IndexMetaData index : resolvedIndices) { | ||
if (index.getState() == IndexMetaData.State.CLOSE) { | ||
if (failClosed) { | ||
throw new IndexClosedException(index.getIndex()); | ||
} else { | ||
if (options.forbidClosedIndices() == false) { | ||
concreteIndices.add(index.getIndex()); | ||
for (IndexMetaData index : resolvedIndices) { | ||
if (index.getState() == IndexMetaData.State.CLOSE) { | ||
if (failClosed) { | ||
throw new IndexClosedException(index.getIndex()); | ||
} else { | ||
if (options.forbidClosedIndices() == false) { | ||
concreteIndices.add(index.getIndex()); | ||
} | ||
} | ||
} else if (index.getState() == IndexMetaData.State.OPEN) { | ||
concreteIndices.add(index.getIndex()); | ||
} else { | ||
throw new IllegalStateException("index state [" + index.getState() + "] not supported"); | ||
} | ||
} else if (index.getState() == IndexMetaData.State.OPEN) { | ||
concreteIndices.add(index.getIndex()); | ||
} else { | ||
throw new IllegalStateException("index state [" + index.getState() + "] not supported"); | ||
} | ||
} | ||
} | ||
|
@@ -255,6 +271,24 @@ public Index concreteSingleIndex(ClusterState state, IndicesRequest request) { | |
return indices[0]; | ||
} | ||
|
||
/** | ||
* Utility method that allows to resolve an index expression to its corresponding single write index. | ||
* | ||
* @param state the cluster state containing all the data to resolve to expression to a concrete index | ||
* @param request The request that defines how the an alias or an index need to be resolved to a concrete index | ||
* and the expression that can be resolved to an alias or an index name. | ||
* @throws IllegalArgumentException if the index resolution does not lead to an index, or leads to more than one index | ||
* @return the write index obtained as a result of the index resolution | ||
*/ | ||
public Index concreteWriteIndex(ClusterState state, IndicesRequest request) { | ||
String indexExpression = request.indices() != null && request.indices().length > 0 ? request.indices()[0] : null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think null is good hear (it translates to an all). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why would this be different than concreteSingleIndex? |
||
Context context = new Context(state, request.indicesOptions(), false, true); | ||
Index[] indices = concreteIndices(context, indexExpression); | ||
// concreteIndices will throw its own exception when checking for a write index. Assert here for good measure. | ||
assert indices.length == 1 : "The index/alias and options provided did not point to a write-index"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to be stronger - the expression resolvers can in theory give you multiple expressions / indices. This needs to be a hard exception (and we should test it) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're right, need more tests for multiple index expressions EDIT: comment below clarifies why it is not possible to reach this branch of code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after looking at this for a second... looks like the intent of this method is to be called against DocWriteRequests, which, whether they actually have multiple expressions or not, the above lines filter for the first index expression. So conreteIndices will never be called with multiple index expressions, even if the request did have them. That is why I could never reach this situation. With write index being requested, concreteIndices will throw an exception if nothing is found, otherwise it will add the one write index and continue on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will add the tests for good measure since the interface technically allows for it. This might be worth cleaning up in the future to make it clear that it is only used by DocWriteRequests There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even though DocWriteRequest makes sense here, it is not up to the DocWriteRequest to specify that it would
ReplicationRequest is responsible for specifying (1) and (2). So, leaving it as IndicesRequest is just as good it seems |
||
return indices[0]; | ||
} | ||
|
||
/** | ||
* @return whether the specified alias or index exists. If the alias or index contains datemath then that is resolved too. | ||
*/ | ||
|
@@ -292,7 +326,7 @@ public String[] indexAliases(ClusterState state, String index, Predicate<AliasMe | |
String... expressions) { | ||
// expand the aliases wildcard | ||
List<String> resolvedExpressions = expressions != null ? Arrays.asList(expressions) : Collections.emptyList(); | ||
Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true); | ||
Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true, false); | ||
for (ExpressionResolver expressionResolver : expressionResolvers) { | ||
resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions); | ||
} | ||
|
@@ -512,24 +546,26 @@ static final class Context { | |
private final IndicesOptions options; | ||
private final long startTime; | ||
private final boolean preserveAliases; | ||
private final boolean resolveToWriteIndex; | ||
|
||
Context(ClusterState state, IndicesOptions options) { | ||
this(state, options, System.currentTimeMillis()); | ||
} | ||
|
||
Context(ClusterState state, IndicesOptions options, boolean preserveAliases) { | ||
this(state, options, System.currentTimeMillis(), preserveAliases); | ||
Context(ClusterState state, IndicesOptions options, boolean preserveAliases, boolean resolveToWriteIndex) { | ||
this(state, options, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex); | ||
} | ||
|
||
Context(ClusterState state, IndicesOptions options, long startTime) { | ||
this(state, options, startTime, false); | ||
this(state, options, startTime, false, false); | ||
} | ||
|
||
Context(ClusterState state, IndicesOptions options, long startTime, boolean preserveAliases) { | ||
Context(ClusterState state, IndicesOptions options, long startTime, boolean preserveAliases, boolean resolveToWriteIndex) { | ||
this.state = state; | ||
this.options = options; | ||
this.startTime = startTime; | ||
this.preserveAliases = preserveAliases; | ||
this.resolveToWriteIndex = resolveToWriteIndex; | ||
} | ||
|
||
public ClusterState getState() { | ||
|
@@ -552,6 +588,14 @@ public long getStartTime() { | |
boolean isPreserveAliases() { | ||
return preserveAliases; | ||
} | ||
|
||
/** | ||
* This is used to require that aliases resolve to their write-index. It is currently not used in conjunction | ||
* with <code>preserveAliases</code>. | ||
*/ | ||
boolean isResolveToWriteIndex() { | ||
return resolveToWriteIndex; | ||
} | ||
} | ||
|
||
private interface ExpressionResolver { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
import org.elasticsearch.common.UUIDs; | ||
import org.elasticsearch.common.collect.HppcMaps; | ||
import org.elasticsearch.common.collect.ImmutableOpenMap; | ||
import org.elasticsearch.common.collect.Tuple; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.logging.Loggers; | ||
|
@@ -61,7 +62,6 @@ | |
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.EnumSet; | ||
|
@@ -473,7 +473,8 @@ public String[] getConcreteAllClosedIndices() { | |
} | ||
|
||
/** | ||
* Returns indexing routing for the given index. | ||
* Returns indexing routing for the given index. If <code>aliasOrIndex</code> points to | ||
* multiple indices and any of those indices define index routing, an exception is thrown. | ||
*/ | ||
// TODO: This can be moved to IndexNameExpressionResolver too, but this means that we will support wildcards and other expressions | ||
// in the index,bulk,update and delete apis. | ||
|
@@ -487,32 +488,34 @@ public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) | |
return routing; | ||
} | ||
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) result; | ||
|
||
List<String> indexReferencesWithRouting = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we separate this into a different change, so we can discuss it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will try to split it out. I think it may require a few mini changes to make things pass in the meantime. |
||
for (Tuple<String, AliasMetaData> tuple : alias.getConcreteIndexAndAliasMetaDatas()) { | ||
if (tuple.v2().indexRouting() != null) { | ||
indexReferencesWithRouting.add(tuple.v1()); | ||
} | ||
} | ||
|
||
if (indexReferencesWithRouting.isEmpty()) { | ||
return routing; | ||
} | ||
|
||
if (result.getIndices().size() > 1) { | ||
rejectSingleIndexOperation(aliasOrIndex, result); | ||
throw new IllegalArgumentException("Alias [" + alias.getAliasName() + "] references multiple indices and provides an index" + | ||
" routing for index [" + Strings.collectionToDelimitedString(indexReferencesWithRouting, ",") + "]"); | ||
} | ||
|
||
AliasMetaData aliasMd = alias.getFirstAliasMetaData(); | ||
if (aliasMd.indexRouting() != null) { | ||
if (aliasMd.indexRouting().indexOf(',') != -1) { | ||
throw new IllegalArgumentException("index/alias [" + aliasOrIndex + "] provided with routing value [" + aliasMd.getIndexRouting() + "] that resolved to several routing values, rejecting operation"); | ||
} | ||
if (routing != null) { | ||
if (!routing.equals(aliasMd.indexRouting())) { | ||
throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has index routing associated with it [" + aliasMd.indexRouting() + "], and was provided with routing value [" + routing + "], rejecting operation"); | ||
} | ||
} | ||
// Alias routing overrides the parent routing (if any). | ||
return aliasMd.indexRouting(); | ||
if (aliasMd.indexRouting().indexOf(',') != -1) { | ||
throw new IllegalArgumentException("index/alias [" + aliasOrIndex + "] provided with routing value [" + aliasMd.getIndexRouting() + "] that resolved to several routing values, rejecting operation"); | ||
} | ||
return routing; | ||
} | ||
|
||
private void rejectSingleIndexOperation(String aliasOrIndex, AliasOrIndex result) { | ||
String[] indexNames = new String[result.getIndices().size()]; | ||
int i = 0; | ||
for (IndexMetaData indexMetaData : result.getIndices()) { | ||
indexNames[i++] = indexMetaData.getIndex().getName(); | ||
if (routing != null) { | ||
if (!routing.equals(aliasMd.indexRouting())) { | ||
throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has index routing associated with it [" + aliasMd.indexRouting() + "], and was provided with routing value [" + routing + "], rejecting operation"); | ||
} | ||
} | ||
throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has more than one index associated with it [" + Arrays.toString(indexNames) + "], can't execute a single index op"); | ||
// Alias routing overrides the parent routing (if any). | ||
return aliasMd.indexRouting(); | ||
} | ||
|
||
public boolean hasIndex(String index) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse; | ||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; | ||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; | ||
import org.elasticsearch.action.delete.DeleteResponse; | ||
import org.elasticsearch.action.index.IndexResponse; | ||
import org.elasticsearch.action.search.SearchResponse; | ||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; | ||
|
@@ -57,6 +58,7 @@ | |
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.elasticsearch.client.Requests.createIndexRequest; | ||
import static org.elasticsearch.client.Requests.deleteRequest; | ||
import static org.elasticsearch.client.Requests.indexRequest; | ||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_METADATA_BLOCK; | ||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_READ_ONLY_BLOCK; | ||
|
@@ -85,6 +87,15 @@ public void testAliases() throws Exception { | |
|
||
ensureGreen(); | ||
|
||
logger.info("--> aliasing index [test] with [alias1]"); | ||
assertAcked(admin().indices().prepareAliases().addAlias("test", "alias1", false)); | ||
|
||
logger.info("--> indexing against [alias1], should fail now"); | ||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, | ||
() -> client().index(indexRequest("alias1").type("type1").id("1").source(source("2", "test"), | ||
XContentType.JSON)).actionGet()); | ||
assertThat(exception.getMessage(), equalTo("Alias [alias1] points to an index [test] with [is_write_index=false]")); | ||
|
||
logger.info("--> aliasing index [test] with [alias1]"); | ||
assertAcked(admin().indices().prepareAliases().addAlias("test", "alias1")); | ||
|
||
|
@@ -98,6 +109,34 @@ public void testAliases() throws Exception { | |
|
||
ensureGreen(); | ||
|
||
logger.info("--> add index [test_x] with [alias1]"); | ||
assertAcked(admin().indices().prepareAliases().addAlias("test_x", "alias1")); | ||
|
||
logger.info("--> indexing against [alias1], should fail now"); | ||
exception = expectThrows(IllegalArgumentException.class, | ||
() -> client().index(indexRequest("alias1").type("type1").id("1").source(source("2", "test"), | ||
XContentType.JSON)).actionGet()); | ||
assertThat(exception.getMessage(), | ||
equalTo("Alias [alias1] points to multiple indices with none set as a write-index [is_write_index=true]")); | ||
|
||
logger.info("--> deleting against [alias1], should fail now"); | ||
exception = expectThrows(IllegalArgumentException.class, | ||
() -> client().delete(deleteRequest("alias1").type("type1").id("1")).actionGet()); | ||
assertThat(exception.getMessage(), | ||
equalTo("Alias [alias1] points to multiple indices with none set as a write-index [is_write_index=true]")); | ||
|
||
logger.info("--> add index [test_x] with [alias1] as write-index"); | ||
assertAcked(admin().indices().prepareAliases().addAlias("test_x", "alias1", true)); | ||
|
||
logger.info("--> indexing against [alias1], should work now"); | ||
indexResponse = client().index(indexRequest("alias1").type("type1").id("1") | ||
.source(source("1", "test"), XContentType.JSON)).actionGet(); | ||
assertThat(indexResponse.getIndex(), equalTo("test_x")); | ||
|
||
logger.info("--> deleting against [alias1], should fail now"); | ||
DeleteResponse deleteResponse = client().delete(deleteRequest("alias1").type("type1").id("1")).actionGet(); | ||
assertThat(deleteResponse.getIndex(), equalTo("test_x")); | ||
|
||
logger.info("--> remove [alias1], Aliasing index [test_x] with [alias1]"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we now lost the part of the test that goes from two aliases without a flag to one without a flag. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're right, re-added |
||
assertAcked(admin().indices().prepareAliases().removeAlias("test", "alias1").addAlias("test_x", "alias1")); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make if clause have this structure and inline this variable in the second else (see marker):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping about inlining this variable.