diff --git a/src/main/java/org/elasticsearch/action/ActionModule.java b/src/main/java/org/elasticsearch/action/ActionModule.java index d41a6de0c2f80..4b6627572ce08 100644 --- a/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/src/main/java/org/elasticsearch/action/ActionModule.java @@ -150,6 +150,9 @@ import org.elasticsearch.action.termvector.*; import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.updatebyquery.TransportUpdateByQueryAction; +import org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction; +import org.elasticsearch.action.updatebyquery.UpdateByQueryAction; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; @@ -262,6 +265,8 @@ protected void configure() { registerAction(CountAction.INSTANCE, TransportCountAction.class); registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class); registerAction(UpdateAction.INSTANCE, TransportUpdateAction.class); + registerAction(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class, + TransportShardUpdateByQueryAction.class); registerAction(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class); registerAction(BulkAction.INSTANCE, TransportBulkAction.class, diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index f79128868ce8a..8f3939dbbee35 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -40,7 +40,7 @@ public class BulkShardRequest extends ShardReplicationOperationRequest routing = Sets.newHashSet(); + + private BytesReference source; + private boolean sourceUnsafe; + + IndexUpdateByQueryRequest() { + } + + IndexUpdateByQueryRequest(UpdateByQueryRequest request, String index, String[] filteringAliases, Set routing) { + this.replicationType = request.replicationType(); + this.consistencyLevel = request.consistencyLevel(); + this.timeout = request.timeout(); + this.listenerThreaded(request.listenerThreaded()); + this.index = index; + this.types = request.types(); + this.bulkResponseOption = request.bulkResponseOptions(); + this.source = request.source(); + this.sourceUnsafe = request.sourceUnsafe(); + if (filteringAliases != null) { + this.filteringAliases = filteringAliases; + } + if (routing != null) { + this.routing = routing; + } + } + + public String[] types() { + return types; + } + + public String[] filteringAliases() { + return filteringAliases; + } + + public BulkResponseOption bulkResponseOptions() { + return bulkResponseOption; + } + + public Set routing() { + return routing; + } + + public BytesReference source() { + return source; + } + + public boolean sourceUnsafe() { + return sourceUnsafe; + } + + public void beforeLocalFork() { + if (sourceUnsafe) { + source = source.copyBytesArray(); + } + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = super.validate(); + if (source == null) { + validationException = addValidationError("Source is missing", validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + types = in.readStringArray(); + bulkResponseOption = BulkResponseOption.fromId(in.readByte()); + filteringAliases = in.readStringArray(); + routing = Sets.newHashSet(in.readStringArray()); + source = in.readBytesReference(); + sourceUnsafe = false; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(types); + out.writeByte(bulkResponseOption.id()); + out.writeStringArray(filteringAliases); + out.writeStringArray(routing.toArray(new String[routing.size()])); + out.writeBytesReference(source); + } + +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/IndexUpdateByQueryResponse.java b/src/main/java/org/elasticsearch/action/updatebyquery/IndexUpdateByQueryResponse.java new file mode 100644 index 0000000000000..f37e74f792fb5 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/IndexUpdateByQueryResponse.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import com.google.common.collect.Maps; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Map; + +/** + * Encapsulates the result of an update by query request by bundling all bulk item responses. + * Each bulk item response holds the result of an individual update. + */ +// TODO: Add Iterable for shard responses +public class IndexUpdateByQueryResponse extends ActionResponse { + + private String index; + private long totalHits; + private long updated; + private Map responsesByShard = Maps.newHashMap(); + private Map failuresByShard = Maps.newHashMap(); + + IndexUpdateByQueryResponse() { + } + + public IndexUpdateByQueryResponse(String index, ShardUpdateByQueryResponse[] shardResponses) { + this.index = index; + shardResponses(shardResponses); + } + + public String index() { + return index; + } + + public String getIndex() { + return index(); + } + + public IndexUpdateByQueryResponse index(String index) { + this.index = index; + return this; + } + + public IndexUpdateByQueryResponse shardResponses(ShardUpdateByQueryResponse[] responses) { + for (ShardUpdateByQueryResponse response : responses) { + totalHits += response.totalHits(); + updated += response.updated(); + if (response.failedShardExceptionMessage() != null) { + failuresByShard.put(response.shardId(), response.failedShardExceptionMessage()); + } + if (response.bulkResponses().length != 0) { + responsesByShard.put(response.shardId(), response.bulkResponses()); + } + } + return this; + } + + public Map responsesByShard() { + return responsesByShard; + } + + public Map failuresByShard() { + return failuresByShard; + } + + public long totalHits() { + return totalHits; + } + + public long updated() { + return updated; + } + + public long countShardResponses() { + long count = 0; + for (BulkItemResponse[] bulkItemResponses : responsesByShard.values()) { + count += bulkItemResponses.length; + } + return count; + } + + public boolean hasFailures() { + return !failuresByShard.isEmpty(); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + index = in.readString(); + totalHits = in.readVLong(); + updated = in.readVLong(); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + int shardId = in.readVInt(); + BulkItemResponse[] responses = new BulkItemResponse[in.readVInt()]; + for (int j = 0; j < responses.length; j++) { + responses[j] = BulkItemResponse.readBulkItem(in); + } + responsesByShard.put(shardId, responses); + } + + size = in.readVInt(); + for (int i = 0; i < size; i++) { + int shardId = in.readVInt(); + failuresByShard.put(shardId, in.readString()); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + out.writeVLong(totalHits); + out.writeVLong(updated); + out.writeVInt(responsesByShard.size()); + for (Map.Entry entry : responsesByShard.entrySet()) { + out.writeVInt(entry.getKey()); + out.writeVInt(entry.getValue().length); + for (BulkItemResponse bulkItemResponse : entry.getValue()) { + bulkItemResponse.writeTo(out); + } + } + out.writeVInt(failuresByShard.size()); + for (Map.Entry entry : failuresByShard.entrySet()) { + out.writeVInt(entry.getKey()); + out.writeString(entry.getValue()); + } + } + + public static IndexUpdateByQueryResponse readResponseItem(StreamInput in) throws IOException { + IndexUpdateByQueryResponse response = new IndexUpdateByQueryResponse(); + response.readFrom(in); + return response; + } +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/ShardUpdateByQueryRequest.java b/src/main/java/org/elasticsearch/action/updatebyquery/ShardUpdateByQueryRequest.java new file mode 100644 index 0000000000000..a58cba493a2ea --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/ShardUpdateByQueryRequest.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * Represents a shard update by query request, that will be performed on the targeted shard. + */ +public class ShardUpdateByQueryRequest extends ShardReplicationOperationRequest { + + private String[] types; + private BulkResponseOption bulkResponseOption; + private String[] filteringAliases = Strings.EMPTY_ARRAY; + + private BytesReference source; + private boolean sourceUnsafe; + + private int shardId = -1; + private String targetNodeId; + + ShardUpdateByQueryRequest() { + } + + ShardUpdateByQueryRequest(IndexUpdateByQueryRequest request, int shardId, String targetNodeId) { + index(request.index()); + replicationType(request.replicationType()); + consistencyLevel(request.consistencyLevel()); + timeout = request.timeout(); + listenerThreaded(request.listenerThreaded()); + types = request.types(); + source = request.source(); + sourceUnsafe = request.sourceUnsafe(); + bulkResponseOption = request.bulkResponseOptions(); + filteringAliases = request.filteringAliases(); + this.shardId = shardId; + this.targetNodeId = targetNodeId; + } + + public String[] types() { + return types; + } + + public BytesReference source() { + return source; + } + + public boolean sourceUnsafe() { + return sourceUnsafe; + } + + public String[] filteringAliases() { + return filteringAliases; + } + + public int shardId() { + return shardId; + } + + public String targetNodeId() { + return targetNodeId; + } + + public BulkResponseOption bulkResponseOptions() { + return bulkResponseOption; + } + + @Override + public void beforeLocalFork() { + if (sourceUnsafe) { + source = source.copyBytesArray(); + } + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = super.validate(); + if (source == null) { + validationException = addValidationError("Source is missing", validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + types = in.readStringArray(); + bulkResponseOption = BulkResponseOption.fromId(in.readByte()); + filteringAliases = in.readStringArray(); + shardId = in.readVInt(); + targetNodeId = in.readString(); + source = in.readBytesReference(); + sourceUnsafe = false; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(types); + out.writeByte(bulkResponseOption.id()); + out.writeStringArray(filteringAliases); + out.writeVInt(shardId); + out.writeString(targetNodeId); + out.writeBytesReference(source); + } +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/ShardUpdateByQueryResponse.java b/src/main/java/org/elasticsearch/action/updatebyquery/ShardUpdateByQueryResponse.java new file mode 100644 index 0000000000000..62b47818dbd2d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/ShardUpdateByQueryResponse.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Update by query response from a single shard. + */ +// TODO: implement Iterabel for bulkResponses +public class ShardUpdateByQueryResponse extends ActionResponse { + + private int shardId; + private int totalHits; + private int updated; + private BulkItemResponse[] bulkResponses = new BulkItemResponse[0]; + private String failedShardExceptionMessage; + + ShardUpdateByQueryResponse() { + } + + public ShardUpdateByQueryResponse(int shardId) { + this(shardId, 0, 0, new BulkItemResponse[0]); + } + + public ShardUpdateByQueryResponse(int shardId, int totalHits, int updated, BulkItemResponse[] bulkResponses) { + this.shardId = shardId; + this.totalHits = totalHits; + this.updated = updated; + this.bulkResponses = bulkResponses; + } + + public ShardUpdateByQueryResponse(int shardId, String failure) { + this.shardId = shardId; + this.failedShardExceptionMessage = failure; + } + + public int shardId() { + return shardId; + } + + public void shardId(int shardId) { + this.shardId = shardId; + } + + public int updated() { + return updated; + } + + public String failedShardExceptionMessage() { + return failedShardExceptionMessage; + } + + public ShardUpdateByQueryResponse failedShardExceptionMessage(String failedShardExceptionMessage) { + this.failedShardExceptionMessage = failedShardExceptionMessage; + return this; + } + + public BulkItemResponse[] bulkResponses() { + return bulkResponses; + } + + public ShardUpdateByQueryResponse responses(BulkItemResponse[] responses) { + this.bulkResponses = responses; + return this; + } + + public int totalHits() { + return totalHits; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shardId); + out.writeVInt(totalHits); + out.writeVInt(updated); + out.writeVInt(bulkResponses.length); + for (BulkItemResponse response : bulkResponses) { + response.writeTo(out); + } + out.writeOptionalString(failedShardExceptionMessage); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = in.readVInt(); + totalHits = in.readVInt(); + updated = in.readVInt(); + bulkResponses = new BulkItemResponse[in.readVInt()]; + for (int i = 0; i < bulkResponses.length; i++) { + bulkResponses[i] = BulkItemResponse.readBulkItem(in); + } + failedShardExceptionMessage = in.readOptionalString(); + } +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/TransportShardUpdateByQueryAction.java b/src/main/java/org/elasticsearch/action/updatebyquery/TransportShardUpdateByQueryAction.java new file mode 100644 index 0000000000000..89a4e81855334 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/TransportShardUpdateByQueryAction.java @@ -0,0 +1,398 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import com.google.common.collect.Maps; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.*; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cache.recycler.CacheRecycler; +import org.elasticsearch.cache.recycler.PageCacheRecycler; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.TopLevelFixedBitSetCollector; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.fieldvisitor.JustUidFieldsVisitor; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.query.ParsedQuery; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.internal.DefaultSearchContext; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportRequestHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Transport action that translates the shard update by query request into a bulk request. All actions are performed + * locally and the bulk requests are then forwarded to the replica shards (this logic is done inside + * {@link TransportShardBulkAction} which this transport action uses). + */ +public class TransportShardUpdateByQueryAction extends TransportAction { + + public final static String ACTION_NAME = UpdateByQueryAction.NAME + "/shard"; + + private final TransportShardBulkAction bulkAction; + private final IndicesService indicesService; + private final ClusterService clusterService; + private final ScriptService scriptService; + private final int batchSize; + private final CacheRecycler cacheRecycler; + private final PageCacheRecycler pageCacheRecycler; + private final BigArrays bigArrays; + + @Inject + public TransportShardUpdateByQueryAction(Settings settings, + ThreadPool threadPool, + TransportShardBulkAction bulkAction, + TransportService transportService, + CacheRecycler cacheRecycler, IndicesService indicesService, + ClusterService clusterService, + ScriptService scriptService, + PageCacheRecycler pageCacheRecycler, + BigArrays bigArrays) { + super(settings, threadPool); + this.bulkAction = bulkAction; + this.cacheRecycler = cacheRecycler; + this.indicesService = indicesService; + this.clusterService = clusterService; + this.scriptService = scriptService; + this.pageCacheRecycler = pageCacheRecycler; + this.bigArrays = bigArrays; + this.batchSize = componentSettings.getAsInt("bulk_size", 1000); + transportService.registerHandler(ACTION_NAME, new TransportHandler()); + } + + protected void doExecute(final ShardUpdateByQueryRequest request, final ActionListener listener) { + String localNodeId = clusterService.state().nodes().localNodeId(); + if (!localNodeId.equals(request.targetNodeId())) { + throw new ElasticsearchException("Request arrived on the wrong node. This shouldn't happen!"); + } + + if (request.operationThreaded()) { + request.beforeLocalFork(); + threadPool.executor(ThreadPool.Names.BULK).execute(new Runnable() { + + public void run() { + doExecuteInternal(request, listener); + } + + }); + } else { + doExecuteInternal(request, listener); + } + } + + private void doExecuteInternal(ShardUpdateByQueryRequest request, ActionListener listener) { + IndexService indexService = indicesService.indexServiceSafe(request.index()); + IndexShard indexShard = indexService.shardSafe(request.shardId()); + ShardSearchRequest shardSearchRequest = new ShardSearchRequest(); + shardSearchRequest.types(request.types()); + shardSearchRequest.filteringAliases(request.filteringAliases()); + SearchContext searchContext = new DefaultSearchContext( + 0, + shardSearchRequest, + null, indexShard.acquireSearcher("update_by_query"), indexService, indexShard, + scriptService, cacheRecycler, pageCacheRecycler, bigArrays + ); + SearchContext.setCurrent(searchContext); + try { + UpdateByQueryContext ubqContext = parseRequestSource(indexService, request, searchContext); + searchContext.preProcess(); + // TODO: Work per segment. The collector should collect docs per segment instead of one big set of top level ids + TopLevelFixedBitSetCollector bitSetCollector = new TopLevelFixedBitSetCollector(searchContext.searcher().getIndexReader().maxDoc()); + searchContext.searcher().search(searchContext.query(), searchContext.aliasFilter(), bitSetCollector); + FixedBitSet docsToUpdate = bitSetCollector.getBitSet(); + + int docsToUpdateCount = docsToUpdate.cardinality(); + logger.trace("[{}][{}] {} docs to update", request.index(), request.shardId(), docsToUpdateCount); + + if (docsToUpdateCount == 0) { + ShardUpdateByQueryResponse response = new ShardUpdateByQueryResponse(request.shardId()); + listener.onResponse(response); + searchContext.clearAndRelease(); + return; + } + BatchedShardUpdateByQueryExecutor bulkExecutor = new BatchedShardUpdateByQueryExecutor( + listener, docsToUpdate, request, ubqContext + ); + bulkExecutor.executeBulkIndex(); + } catch (Throwable t) { + // If we end up here then BatchedShardUpdateByQueryExecutor#finalizeBulkActions isn't invoked + // so we need to release the search context. + searchContext.clearAndRelease(); + listener.onFailure(t); + } finally { + SearchContext.removeCurrent(); + } + } + + private UpdateByQueryContext parseRequestSource(IndexService indexService, ShardUpdateByQueryRequest request, SearchContext context) { + ParsedQuery parsedQuery = null; + String script = null; + String scriptLang = null; + Map params = Maps.newHashMap(); + try { + XContentParser parser = XContentHelper.createParser(request.source()); + for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) { + if (token == XContentParser.Token.FIELD_NAME) { + String fieldName = parser.currentName(); + if ("query".equals(fieldName)) { + parsedQuery = indexService.queryParserService().parse(parser); + } else if ("query_binary".equals(fieldName)) { + parser.nextToken(); + byte[] querySource = parser.binaryValue(); + XContentParser qSourceParser = XContentFactory.xContent(querySource).createParser(querySource); + parsedQuery = indexService.queryParserService().parse(qSourceParser); + } else if ("script".equals(fieldName)) { + parser.nextToken(); + script = parser.text(); + } else if ("lang".equals(fieldName)) { + parser.nextToken(); + scriptLang = parser.text(); + } else if ("params".equals(fieldName)) { + parser.nextToken(); + params = parser.map(); + } + } + } + } catch (Exception e) { + throw new ElasticsearchException("Couldn't parse query from source.", e); + } + + if (parsedQuery == null) { + throw new ElasticsearchException("Query is required"); + } + if (script == null) { + throw new ElasticsearchException("Script is required"); + } + context.parsedQuery(parsedQuery); + return new UpdateByQueryContext(context, batchSize, clusterService.state(), script, scriptLang, params); + } + + + class BatchedShardUpdateByQueryExecutor implements ActionListener { + + private final ActionListener finalResponseListener; + private final DocIdSetIterator iterator; + private final int matches; + private final ShardUpdateByQueryRequest request; + private final List receivedBulkItemResponses; + private final UpdateByQueryContext updateByQueryContext; + + // Counter for keeping tracker number of docs that have been updated. + // No need for sync now since onResponse method in synchronized + private int updated; + + BatchedShardUpdateByQueryExecutor(ActionListener finalResponseListener, + FixedBitSet docsToUpdate, + ShardUpdateByQueryRequest request, + UpdateByQueryContext updateByQueryContext) { + this.iterator = docsToUpdate.iterator(); + this.matches = docsToUpdate.cardinality(); + this.request = request; + this.finalResponseListener = finalResponseListener; + this.receivedBulkItemResponses = new ArrayList(); + this.updateByQueryContext = updateByQueryContext; + } + + // Call can be invoked with a Network thread. Replica isn't on the same node... Therefore when + // need to continue with the bulk do it in a new thread. One thread will enter at the time. + public synchronized void onResponse(BulkShardResponse bulkShardResponse) { + try { + for (BulkItemResponse itemResponse : bulkShardResponse.getResponses()) { + if (!itemResponse.isFailed()) { + updated++; + } + switch (request.bulkResponseOptions()) { + case ALL: + receivedBulkItemResponses.add(itemResponse); + break; + case FAILED: + if (itemResponse.isFailed()) { + receivedBulkItemResponses.add(itemResponse); + } + break; + case NONE: + break; + } + } + if (iterator.docID() == DocIdSetIterator.NO_MORE_DOCS) { + finalizeBulkActions(null); + } else { + threadPool.executor(ThreadPool.Names.BULK).execute(new Runnable() { + public void run() { + try { + executeBulkIndex(); + } catch (Throwable e) { + onFailure(e); + } + } + }); + } + } catch (Throwable t) { + onFailure(t); + } + } + + public synchronized void onFailure(Throwable e) { + try { + logger.debug("error while executing bulk operations for an update by query action, sending partial response...", e); + finalizeBulkActions(e); + } catch (Throwable t) { + finalResponseListener.onFailure(t); + } + } + + public void executeBulkIndex() throws IOException { + fillBatch(iterator, updateByQueryContext.searchContext.searcher().getIndexReader(), request, updateByQueryContext.bulkItemRequestsBulkList); + logger.trace("[{}][{}] executing bulk request with size {}", request.index(), request.shardId(), updateByQueryContext.bulkItemRequestsBulkList.size()); + if (updateByQueryContext.bulkItemRequestsBulkList.isEmpty()) { + onResponse(new BulkShardResponse(new ShardId(request.index(), request.shardId()), new BulkItemResponse[0])); + } else { + // We are already on the primary shard. Only have network traffic for replica shards + // Also no need for threadpool b/c TransUpdateAction uses it already for local requests. + BulkItemRequest[] bulkItemRequests = + updateByQueryContext.bulkItemRequestsBulkList.toArray(new BulkItemRequest[updateByQueryContext.bulkItemRequestsBulkList.size()]); + // We clear the list, since the array is already created + updateByQueryContext.bulkItemRequestsBulkList.clear(); + final BulkShardRequest bulkShardRequest = new BulkShardRequest( + request.index(), request.shardId(), false, bulkItemRequests + ); + // The batches are already threaded... No need for new thread + bulkShardRequest.operationThreaded(false); + bulkAction.execute(bulkShardRequest, this); + } + } + + private void finalizeBulkActions(Throwable e) { + updateByQueryContext.searchContext.clearAndRelease(); + BulkItemResponse[] bulkResponses = receivedBulkItemResponses.toArray(new BulkItemResponse[receivedBulkItemResponses.size()]); + receivedBulkItemResponses.clear(); + ShardUpdateByQueryResponse finalResponse = new ShardUpdateByQueryResponse( + request.shardId(), matches, updated, bulkResponses + ); + + if (e != null) { + finalResponse.failedShardExceptionMessage(ExceptionsHelper.detailedMessage(e)); + } + finalResponseListener.onResponse(finalResponse); + } + + // TODO: Work per segment. The collector should collect docs per segment instead of one big set of top level ids + private void fillBatch(DocIdSetIterator iterator, IndexReader indexReader, ShardUpdateByQueryRequest request, + List bulkItemRequests) throws IOException { + int counter = 0; + for (int docID = iterator.nextDoc(); docID != DocIdSetIterator.NO_MORE_DOCS; docID = iterator.nextDoc()) { + JustUidFieldsVisitor fieldVisitor = new JustUidFieldsVisitor(); + indexReader.document(docID, fieldVisitor); + Uid uid = fieldVisitor.uid(); + UpdateRequest updateRequest = new UpdateRequest(request.index(), uid.type(), uid.id()) + .scriptLang(updateByQueryContext.scriptLang) + .scriptParams(updateByQueryContext.scriptParams) + .script(updateByQueryContext.scriptString); + + bulkItemRequests.add(new BulkItemRequest(counter, updateRequest)); + + if (++counter == batchSize) { + break; + } + } + } + + } + + class TransportHandler extends BaseTransportRequestHandler { + + public ShardUpdateByQueryRequest newInstance() { + return new ShardUpdateByQueryRequest(); + } + + public String executor() { + return ThreadPool.Names.SAME; + } + + public void messageReceived(final ShardUpdateByQueryRequest request, final TransportChannel channel) throws Exception { + // no need to have a threaded listener since we just send back a response + request.listenerThreaded(false); + execute(request, new ActionListener() { + + public void onResponse(ShardUpdateByQueryResponse result) { + try { + channel.sendResponse(result); + } catch (Exception e) { + onFailure(e); + } + } + + public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn("Failed to send response for get", e1); + } + } + + }); + } + } + +} + +class UpdateByQueryContext { + + final SearchContext searchContext; + final List bulkItemRequestsBulkList; + final ClusterState clusterState; + + final String scriptString; + final String scriptLang; + final Map scriptParams; + + UpdateByQueryContext(SearchContext searchContext, int batchSize, ClusterState clusterState, String scriptString, String scriptLang, Map scriptParams) { + this.searchContext = searchContext; + this.clusterState = clusterState; + this.bulkItemRequestsBulkList = new ArrayList(batchSize); + this.scriptString = scriptString; + this.scriptLang = scriptLang; + this.scriptParams = scriptParams; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/TransportUpdateByQueryAction.java b/src/main/java/org/elasticsearch/action/updatebyquery/TransportUpdateByQueryAction.java new file mode 100644 index 0000000000000..e455411078a68 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/TransportUpdateByQueryAction.java @@ -0,0 +1,436 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import com.google.common.collect.Lists; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.TimeoutClusterStateListener; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * Delegates a {@link IndexUpdateByQueryRequest} to the primary shards of the index this request is targeted to. + * The requests is transformed into {@link ShardUpdateByQueryRequest} and send to each primary shard. Afterwards + * the responses from the shards are merged into a final response which is send to the client. + */ +// Perhaps create a base transport update action that sends shard requests to primary shards only +public class TransportUpdateByQueryAction extends TransportAction { + + private final TransportService transportService; + + private final ClusterService clusterService; + + private final TransportShardUpdateByQueryAction transportShardUpdateByQueryAction; + + @Inject + public TransportUpdateByQueryAction(Settings settings, + ThreadPool threadPool, + TransportService transportService, + ClusterService clusterService, + TransportShardUpdateByQueryAction transportShardUpdateByQueryAction) { + super(settings, threadPool); + this.transportService = transportService; + this.clusterService = clusterService; + this.transportShardUpdateByQueryAction = transportShardUpdateByQueryAction; + transportService.registerHandler(UpdateByQueryAction.NAME, new TransportHandler()); + } + + protected void doExecute(UpdateByQueryRequest request, ActionListener listener) { + long startTime = System.currentTimeMillis(); + MetaData metaData = clusterService.state().metaData(); + String[] concreteIndices = metaData.concreteIndices(request.indices(), IndicesOptions.lenient()); + Map> routingMap = metaData.resolveSearchRouting(request.routing(), request.indices()); + if (concreteIndices.length == 1) { + doExecuteIndexRequest(request, metaData, concreteIndices[0], routingMap, new SingleIndexUpdateByQueryActionListener(startTime, listener)); + } else { + MultipleIndexUpdateByQueryActionListener indexActionListener = + new MultipleIndexUpdateByQueryActionListener(startTime, listener, concreteIndices.length); + for (String concreteIndex : concreteIndices) { + doExecuteIndexRequest(request, metaData, concreteIndex, routingMap, indexActionListener); + } + } + + } + + private static class MultipleIndexUpdateByQueryActionListener implements ActionListener { + + private final long startTime; + private final ActionListener listener; + private final int expectedNumberOfResponses; + private final AtomicReferenceArray successFullIndexResponses; + private final AtomicReferenceArray failedIndexResponses; + private final AtomicInteger indexCounter; + + private MultipleIndexUpdateByQueryActionListener(long startTime, ActionListener listener, int expectedNumberOfResponses) { + this.startTime = startTime; + this.listener = listener; + successFullIndexResponses = new AtomicReferenceArray(expectedNumberOfResponses); + failedIndexResponses = new AtomicReferenceArray(expectedNumberOfResponses); + this.expectedNumberOfResponses = expectedNumberOfResponses; + indexCounter = new AtomicInteger(); + } + + public void onResponse(IndexUpdateByQueryResponse indexUpdateByQueryResponse) { + successFullIndexResponses.set(indexCounter.getAndIncrement(), indexUpdateByQueryResponse); + if (indexCounter.get() == expectedNumberOfResponses) { + finishHim(); + + } + } + + public void onFailure(Throwable e) { + failedIndexResponses.set(indexCounter.getAndIncrement(), e); + if (indexCounter.get() == expectedNumberOfResponses) { + finishHim(); + } + } + + private void finishHim() { + long tookInMillis = System.currentTimeMillis() - startTime; + UpdateByQueryResponse response = new UpdateByQueryResponse(tookInMillis); + List indexResponses = Lists.newArrayList(); + List indexFailures = Lists.newArrayList(); + for (int i = 0; i < expectedNumberOfResponses; i++) { + IndexUpdateByQueryResponse indexResponse = successFullIndexResponses.get(i); + if (indexResponse != null) { + indexResponses.add(indexResponse); + } else { + indexFailures.add(ExceptionsHelper.detailedMessage(failedIndexResponses.get(i))); + } + } + response.indexResponses(indexResponses.toArray(new IndexUpdateByQueryResponse[indexResponses.size()])); + response.mainFailures(indexFailures.toArray(new String[indexFailures.size()])); + listener.onResponse(response); + } + } + + private static class SingleIndexUpdateByQueryActionListener implements ActionListener { + + private final long startTime; + private final ActionListener listener; + + private SingleIndexUpdateByQueryActionListener(long startTime, ActionListener listener) { + this.listener = listener; + this.startTime = startTime; + } + + public void onResponse(IndexUpdateByQueryResponse indexUpdateByQueryResponse) { + long tookInMillis = System.currentTimeMillis() - startTime; + UpdateByQueryResponse finalResponse = new UpdateByQueryResponse(tookInMillis, indexUpdateByQueryResponse); + listener.onResponse(finalResponse); + } + + public void onFailure(Throwable e) { + long tookInMillis = System.currentTimeMillis() - startTime; + UpdateByQueryResponse finalResponse = new UpdateByQueryResponse(tookInMillis); + finalResponse.mainFailures(new String[]{ExceptionsHelper.detailedMessage(e)}); + listener.onResponse(finalResponse); + } + + } + + + // Index operations happen below here: + + protected void doExecuteIndexRequest(UpdateByQueryRequest request, MetaData metaData, String concreteIndex, + @Nullable Map> routingMap, ActionListener listener) { + String[] filteringAliases = metaData.filteringAliases(concreteIndex, request.indices()); + Set routing = null; + if (routingMap != null) { + routing = routingMap.get(concreteIndex); + } + IndexUpdateByQueryRequest indexRequest = new IndexUpdateByQueryRequest(request, concreteIndex, filteringAliases, routing); + new UpdateByQueryIndexOperationAction(indexRequest, listener).startExecution(); + } + + private class UpdateByQueryIndexOperationAction { + + final IndexUpdateByQueryRequest request; + final ActionListener indexActionListener; + + private UpdateByQueryIndexOperationAction(IndexUpdateByQueryRequest request, ActionListener listener) { + this.request = request; + this.indexActionListener = listener; + } + + void startExecution() { + startExecution(false); + } + + boolean startExecution(boolean fromClusterEvent) { + ClusterState state = clusterService.state(); + ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + if (blockException != null) { + logger.trace("[{}] global block exception, retrying...", request.index()); + overallRetry(fromClusterEvent, null, blockException); + return false; + } + blockException = state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); + if (blockException != null) { + logger.trace("[{}] index block exception, retrying...", request.index()); + overallRetry(fromClusterEvent, null, blockException); + return false; + } + + List primaryShards = Lists.newArrayList(); + GroupShardsIterator groupShardsIterator = + clusterService.operationRouting().deleteByQueryShards(state, request.index(), request.routing()); + for (ShardIterator shardIt : groupShardsIterator) { + for (ShardRouting shardRouting = shardIt.nextOrNull(); shardRouting != null; shardRouting = shardIt.nextOrNull()) { + if (shardRouting.primary()) { + if (shardRouting.started()) { + primaryShards.add(shardRouting); + } else { + logger.trace( + "[{}][{}] required primary shard isn't available, retrying...", + request.index(), shardRouting.id() + ); + overallRetry(fromClusterEvent, shardRouting, null); + return false; + } + } + } + } + + if (primaryShards.size() != groupShardsIterator.size()) { + logger.trace( + "[{}] not all required primary shards[{}/{}] are available for index[{}], retrying...", + request.index(), primaryShards.size(), groupShardsIterator.size() + ); + overallRetry(fromClusterEvent, null, null); + return false; + } + + logger.trace("[{}] executing IndexUpdateByQueryRequest", request.index()); + DiscoveryNodes nodes = state.nodes(); + final ShardResponseListener responseListener = new ShardResponseListener(primaryShards.size(), indexActionListener); + for (ShardRouting shard : primaryShards) { + logger.trace("[{}][{}] executing ShardUpdateByQueryRequest", request.index(), shard.id()); + if (shard.currentNodeId().equals(nodes.localNodeId())) { + executeLocally(shard, responseListener); + } else { + executeRemotely(shard, nodes, responseListener); + } + } + + return true; + } + + void overallRetry(boolean fromClusterEvent, final ShardRouting shardRouting, @Nullable final Throwable failure) { + if (!fromClusterEvent) { + clusterService.add(request.timeout(), new TimeoutClusterStateListener() { + + public void postAdded() { + logger.trace("[{}] post added, retrying update by query", request.index()); + if (startExecution(true)) { + // if we managed to start and perform the operation on the primary, we can remove this listener + clusterService.remove(this); + } + } + + public void onClose() { + logger.trace("[{}] update by query for, node closed", request.index()); + clusterService.remove(this); + indexActionListener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + public void clusterChanged(ClusterChangedEvent event) { + logger.trace("[{}] cluster changed, retrying update by query", request.index()); + if (startExecution(true)) { + // if we managed to start and perform the operation on the primary, we can remove this listener + clusterService.remove(this); + } + } + + public void onTimeout(TimeValue timeValue) { + // just to be on the safe side, see if we can start it now? + logger.trace("[{}] timeout, retrying update by query", request.index()); + if (startExecution(true)) { + clusterService.remove(this); + return; + } + clusterService.remove(this); + Throwable listenerFailure = failure; + if (listenerFailure == null) { + if (shardRouting == null) { + listenerFailure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeValue + "], request: " + request.toString()); + } else { + listenerFailure = new UnavailableShardsException(shardRouting.shardId(), "[" + shardRouting.shardId() + "] not started, Timeout waiting for [" + timeValue + "], request: " + request.toString()); + } + } + indexActionListener.onFailure(listenerFailure); + } + }); + } + } + + void executeLocally(final ShardRouting shard, final ShardResponseListener responseListener) { + final ShardUpdateByQueryRequest localShardRequest = new ShardUpdateByQueryRequest(request, shard.id(), shard.currentNodeId()); + transportShardUpdateByQueryAction.execute(localShardRequest, new ActionListener() { + + public void onResponse(ShardUpdateByQueryResponse shardUpdateByQueryResponse) { + responseListener.handleResponse(shardUpdateByQueryResponse); + } + + public void onFailure(Throwable e) { + responseListener.handleException(e, shard); + } + }); + } + + void executeRemotely(final ShardRouting shard, DiscoveryNodes nodes, final ShardResponseListener responseListener) { + final DiscoveryNode discoveryNode = nodes.get(shard.currentNodeId()); + if (discoveryNode == null) { + responseListener.handleException(new RuntimeException("No node for shard"), shard); + return; + } + + ShardUpdateByQueryRequest localShardRequest = new ShardUpdateByQueryRequest(request, shard.id(), shard.currentNodeId()); + transportService.sendRequest(discoveryNode, TransportShardUpdateByQueryAction.ACTION_NAME, + localShardRequest, new BaseTransportResponseHandler() { + + public ShardUpdateByQueryResponse newInstance() { + return new ShardUpdateByQueryResponse(); + } + + public void handleResponse(ShardUpdateByQueryResponse response) { + responseListener.handleResponse(response); + } + + public void handleException(TransportException e) { + responseListener.handleException(e, shard); + } + + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + + private class ShardResponseListener { + + final AtomicReferenceArray shardResponses; + + final AtomicInteger indexCounter; + + final ActionListener finalListener; + + final int numberOfExpectedShardResponses; + + private ShardResponseListener(int numberOfPrimaryShards, ActionListener finalListener) { + shardResponses = new AtomicReferenceArray(numberOfPrimaryShards); + numberOfExpectedShardResponses = numberOfPrimaryShards; + indexCounter = new AtomicInteger(); + this.finalListener = finalListener; + } + + void handleResponse(ShardUpdateByQueryResponse response) { + shardResponses.set(indexCounter.getAndIncrement(), response); + if (indexCounter.get() == numberOfExpectedShardResponses) { + finalizeAction(); + } + } + + void handleException(Throwable e, ShardRouting shard) { + logger.error("[{}][{}] error while executing update by query shard request", e, request.index(), shard.id()); + String failure = ExceptionsHelper.detailedMessage(e); + shardResponses.set(indexCounter.getAndIncrement(), new ShardUpdateByQueryResponse(shard.id(), failure)); + if (indexCounter.get() == numberOfExpectedShardResponses) { + finalizeAction(); + } + } + + void finalizeAction() { + ShardUpdateByQueryResponse[] responses = new ShardUpdateByQueryResponse[shardResponses.length()]; + for (int i = 0; i < shardResponses.length(); i++) { + responses[i] = shardResponses.get(i); + } + IndexUpdateByQueryResponse finalResponse = new IndexUpdateByQueryResponse( + request.index(), + responses + ); + finalListener.onResponse(finalResponse); + } + + } + + } + + private class TransportHandler extends BaseTransportRequestHandler { + + public UpdateByQueryRequest newInstance() { + return new UpdateByQueryRequest(); + } + + public String executor() { + return ThreadPool.Names.SAME; + } + + public void messageReceived(UpdateByQueryRequest request, final TransportChannel channel) throws Exception { + // no need to have a threaded listener since we just send back a response + request.listenerThreaded(false); + doExecute(request, new ActionListener() { + + public void onResponse(UpdateByQueryResponse result) { + try { + channel.sendResponse(result); + } catch (Exception e) { + onFailure(e); + } + } + + public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn("Failed to send response for get", e1); + } + } + }); + } + } + +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryAction.java b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryAction.java new file mode 100644 index 0000000000000..a5532b3099455 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryAction.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.Client; + +/** + * Entry point from client to transport actions. + */ +public class UpdateByQueryAction extends Action { + + public static final UpdateByQueryAction INSTANCE = new UpdateByQueryAction(); + public static final String NAME = "updateByQuery"; + + private UpdateByQueryAction() { + super(NAME); + } + + public UpdateByQueryResponse newResponse() { + return new UpdateByQueryResponse(); + } + + public UpdateByQueryRequestBuilder newRequestBuilder(Client client) { + return new UpdateByQueryRequestBuilder(client); + } + +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryRequest.java b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryRequest.java new file mode 100644 index 0000000000000..02be5f07e57d0 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryRequest.java @@ -0,0 +1,138 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * Represents an update by query request. + */ +public class UpdateByQueryRequest extends IndicesReplicationOperationRequest { + + private static final XContentType contentType = Requests.CONTENT_TYPE; + + private String[] types = Strings.EMPTY_ARRAY; + private BulkResponseOption bulkResponseOption = BulkResponseOption.NONE; + private String routing; + private BytesReference source; + private boolean sourceUnsafe; + + UpdateByQueryRequest() { + + } + + public UpdateByQueryRequest(String[] indices, String[] types) { + this.indices = indices; + this.types = types; + } + + public String[] types() { + return types; + } + + public UpdateByQueryRequest types(String... types) { + this.types = types; + return this; + } + + public UpdateByQueryRequest source(BytesReference source, boolean sourceUnsafe) { + this.source = source; + this.sourceUnsafe = sourceUnsafe; + return this; + } + + public UpdateByQueryRequest source(UpdateByQuerySourceBuilder sourceBuilder) { + this.source = sourceBuilder.buildAsBytes(contentType); + this.sourceUnsafe = false; + return this; + } + + public BytesReference source() { + return source; + } + + public boolean sourceUnsafe() { + return sourceUnsafe; + } + + public BulkResponseOption bulkResponseOptions() { + return bulkResponseOption; + } + + public UpdateByQueryRequest bulkResponseOptions(BulkResponseOption bulkResponseOption) { + this.bulkResponseOption = bulkResponseOption; + return this; + } + + public String routing() { + return routing; + } + + public UpdateByQueryRequest routing(String routing) { + this.routing = routing; + return this; + } + + public void beforeLocalFork() { + if (sourceUnsafe) { + source = source.copyBytesArray(); + } + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = super.validate(); + if (source == null) { + validationException = addValidationError("Source is missing", validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + types = in.readStringArray(); + bulkResponseOption = BulkResponseOption.fromId(in.readByte()); + routing = in.readOptionalString(); + source = in.readBytesReference(); + sourceUnsafe = false; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(types); + out.writeByte(bulkResponseOption.id()); + out.writeOptionalString(routing); + out.writeBytesReference(source); + } + +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryRequestBuilder.java b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryRequestBuilder.java new file mode 100644 index 0000000000000..0f01eb4b2823d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryRequestBuilder.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.updatebyquery; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.internal.InternalClient; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.query.QueryBuilder; + +import java.util.Map; + +/** + * A request builder that produces {@link IndexUpdateByQueryRequest} instances. + */ +public class UpdateByQueryRequestBuilder extends IndicesReplicationOperationRequestBuilder { + + private UpdateByQuerySourceBuilder sourceBuilder; + + public UpdateByQueryRequestBuilder(Client client) { + super((InternalClient) client, new UpdateByQueryRequest()); + } + + public UpdateByQueryRequestBuilder setTypes(String... types) { + request().types(types); + return this; + } + + public UpdateByQueryRequestBuilder setIncludeBulkResponses(BulkResponseOption option) { + request().bulkResponseOptions(option); + return this; + } + + public UpdateByQueryRequestBuilder setReplicationType(ReplicationType replicationType) { + request().replicationType(replicationType); + return this; + } + + public UpdateByQueryRequestBuilder setConsistencyLevel(WriteConsistencyLevel writeConsistencyLevel) { + request().consistencyLevel(writeConsistencyLevel); + return this; + } + + /** + * Constructs a new search source builder with a search query. + * + * @see org.elasticsearch.index.query.QueryBuilders + */ + public UpdateByQueryRequestBuilder setQuery(QueryBuilder queryBuilder) { + sourceBuilder().query(queryBuilder); + return this; + } + + /** + * Constructs a new search source builder with a search query. + */ + public UpdateByQueryRequestBuilder setQuery(BytesReference query) { + sourceBuilder().query(query); + return this; + } + + public UpdateByQueryRequestBuilder setScriptLang(String lang) { + sourceBuilder().scriptLang(lang); + return this; + } + + public UpdateByQueryRequestBuilder setScript(String script) { + sourceBuilder().script(script); + return this; + } + + public UpdateByQueryRequestBuilder setScriptParams(Map scriptParams) { + if (scriptParams != null) { + sourceBuilder().scriptParams(scriptParams); + } + return this; + } + + public UpdateByQueryRequestBuilder addScriptParam(String name, String value) { + sourceBuilder().addScriptParam(name, value); + return this; + } + + protected void doExecute(ActionListener listener) { + if (sourceBuilder != null) { + request.source(sourceBuilder); + } + + ((Client) client).updateByQuery(request, listener); + } + + private UpdateByQuerySourceBuilder sourceBuilder() { + if (sourceBuilder == null) { + sourceBuilder = new UpdateByQuerySourceBuilder(); + } + return sourceBuilder; + } + +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryResponse.java b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryResponse.java new file mode 100644 index 0000000000000..7aa732491fc72 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQueryResponse.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Encapsulates the result of an update by query request by bundling all bulk item responses. + * Each bulk item response holds the result of an individual update. + */ +// TODO: implements Iterable for index responses +public class UpdateByQueryResponse extends ActionResponse { + + private long tookInMillis; + private long totalHits; + private long updated; + private IndexUpdateByQueryResponse[] indexResponses = new IndexUpdateByQueryResponse[0]; + private String[] mainFailures = Strings.EMPTY_ARRAY; + + UpdateByQueryResponse() { + } + + public UpdateByQueryResponse(long tookInMillis, IndexUpdateByQueryResponse... indexResponses) { + this.tookInMillis = tookInMillis; + indexResponses(indexResponses); + } + + public long tookInMillis() { + return tookInMillis; + } + + public long getTookInMillis() { + return tookInMillis(); + } + + public IndexUpdateByQueryResponse[] indexResponses() { + return indexResponses; + } + + public IndexUpdateByQueryResponse[] getIndexResponses() { + return indexResponses(); + } + + public UpdateByQueryResponse indexResponses(IndexUpdateByQueryResponse[] responses) { + for (IndexUpdateByQueryResponse response : responses) { + totalHits += response.totalHits(); + updated += response.updated(); + } + this.indexResponses = responses; + return this; + } + + /** + * @return The main index level failures + */ + public String[] mainFailures() { + return mainFailures; + } + + public String[] getMainFailures() { + return mainFailures(); + } + + public UpdateByQueryResponse mainFailures(String[] mainFailures) { + this.mainFailures = mainFailures; + return this; + } + + /** + * @return the number of documents that have matched with the update query + */ + public long totalHits() { + return totalHits; + } + + public long getTotalHits() { + return totalHits(); + } + + /** + * @return the number of documents that are actually updated + */ + public long updated() { + return updated; + } + + public long getUpdated() { + return updated(); + } + + public boolean hasFailures() { + return mainFailures().length != 0; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + tookInMillis = in.readVLong(); + totalHits = in.readVLong(); + updated = in.readVLong(); + indexResponses = new IndexUpdateByQueryResponse[in.readVInt()]; + for (int i = 0; i < indexResponses.length; i++) { + indexResponses[i] = IndexUpdateByQueryResponse.readResponseItem(in); + } + mainFailures = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(tookInMillis); + out.writeVLong(totalHits); + out.writeVLong(updated); + out.writeVInt(indexResponses.length); + for (IndexUpdateByQueryResponse response : indexResponses) { + response.writeTo(out); + } + out.writeStringArray(mainFailures); + } +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQuerySourceBuilder.java b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQuerySourceBuilder.java new file mode 100644 index 0000000000000..332e0c30688a2 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/UpdateByQuerySourceBuilder.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.updatebyquery; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilderException; + +import java.io.IOException; +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; + +/** + * Source builder of the script, lang, params and query for a update by query request. + */ +public class UpdateByQuerySourceBuilder implements ToXContent { + + private QueryBuilder queryBuilder; + private BytesReference queryBinary; + private String script; + private String scriptLang; + private Map scriptParams = newHashMap(); + + public UpdateByQuerySourceBuilder query(QueryBuilder query) { + this.queryBuilder = query; + return this; + } + + public UpdateByQuerySourceBuilder query(BytesReference queryBinary) { + this.queryBinary = queryBinary; + return this; + } + + public UpdateByQuerySourceBuilder script(String script) { + this.script = script; + return this; + } + + public UpdateByQuerySourceBuilder scriptLang(String scriptLang) { + this.scriptLang = scriptLang; + return this; + } + + public UpdateByQuerySourceBuilder scriptParams(Map scriptParams) { + this.scriptParams = scriptParams; + return this; + } + + public UpdateByQuerySourceBuilder addScriptParam(String name, String value) { + scriptParams.put(name, value); + return this; + } + + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (queryBuilder != null) { + builder.field("query"); + queryBuilder.toXContent(builder, params); + } + + if (queryBinary != null) { + if (XContentFactory.xContentType(queryBinary) == builder.contentType()) { + builder.rawField("query", queryBinary); + } else { + builder.field("query_binary", queryBinary); + } + } + + if (script != null) { + builder.field("script", script); + } + + if (scriptLang != null) { + builder.field("lang", scriptLang); + } + + if (!scriptParams.isEmpty()) { + builder.field("params", scriptParams); + } + + builder.endObject(); + return builder; + } + + public BytesReference buildAsBytes(XContentType contentType) throws SearchSourceBuilderException { + try { + XContentBuilder builder = XContentFactory.contentBuilder(contentType); + toXContent(builder, ToXContent.EMPTY_PARAMS); + return builder.bytes(); + } catch (Exception e) { + throw new SearchSourceBuilderException("Failed to build search source", e); + } + } +} diff --git a/src/main/java/org/elasticsearch/action/updatebyquery/package-info.java b/src/main/java/org/elasticsearch/action/updatebyquery/package-info.java new file mode 100644 index 0000000000000..5f07f0c3745a4 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/updatebyquery/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Update by query action. + */ +package org.elasticsearch.action.updatebyquery; \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/client/Client.java b/src/main/java/org/elasticsearch/client/Client.java index b108db8da7b2c..995b7460268bf 100644 --- a/src/main/java/org/elasticsearch/client/Client.java +++ b/src/main/java/org/elasticsearch/client/Client.java @@ -50,6 +50,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.action.updatebyquery.*; import org.elasticsearch.common.Nullable; /** @@ -168,6 +169,30 @@ public interface Client { */ UpdateRequestBuilder prepareUpdate(String index, String type, String id); + /** + * Updates documents that match a query specified in the request. The update is based on a script. + * + * @param request The update by query request. + * @param listener A listener that notifies the caller when the update by query operation has completed + */ + void updateByQuery(UpdateByQueryRequest request, ActionListener listener); + + /** + * Performs the same action as in {@link #updateByQuery(org.elasticsearch.action.updatebyquery.UpdateByQueryRequest, + * org.elasticsearch.action.ActionListener)}, but works with an {@link ActionFuture} instead of a {@link ActionListener}. + * + * @param request The update query request + * @return The result future + */ + ActionFuture updateByQuery(UpdateByQueryRequest request); + + /** + * Prepares a update for documents matching a query using a script. + * + * @return a builder instance + */ + UpdateByQueryRequestBuilder prepareUpdateByQuery(); + /** * Index a document associated with a given index and type. *

diff --git a/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 53e108e4155a8..ba37b88b40292 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -59,6 +59,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.action.updatebyquery.*; import org.elasticsearch.client.internal.InternalClient; import org.elasticsearch.common.Nullable; @@ -117,6 +118,21 @@ public UpdateRequestBuilder prepareUpdate(String index, String type, String id) return new UpdateRequestBuilder(this, index, type, id); } + @Override + public void updateByQuery(UpdateByQueryRequest request, ActionListener listener) { + execute(UpdateByQueryAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture updateByQuery(UpdateByQueryRequest request) { + return execute(UpdateByQueryAction.INSTANCE, request); + } + + @Override + public UpdateByQueryRequestBuilder prepareUpdateByQuery() { + return new UpdateByQueryRequestBuilder(this); + } + @Override public ActionFuture delete(final DeleteRequest request) { return execute(DeleteAction.INSTANCE, request); diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index efaea63e32fc7..5acedd52e7098 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -54,6 +54,10 @@ import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.CacheRecyclerModule; import org.elasticsearch.cache.recycler.PageCacheRecycler; +import org.elasticsearch.action.updatebyquery.IndexUpdateByQueryRequest; +import org.elasticsearch.action.updatebyquery.IndexUpdateByQueryResponse; +import org.elasticsearch.action.updatebyquery.UpdateByQueryRequest; +import org.elasticsearch.action.updatebyquery.UpdateByQueryResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.client.transport.support.InternalTransportClient; @@ -340,6 +344,16 @@ public void update(UpdateRequest request, ActionListener listene internalClient.update(request, listener); } + @Override + public void updateByQuery(UpdateByQueryRequest request, ActionListener listener) { + internalClient.updateByQuery(request, listener); + } + + @Override + public ActionFuture updateByQuery(UpdateByQueryRequest request) { + return internalClient.updateByQuery(request); + } + @Override public ActionFuture delete(DeleteRequest request) { return internalClient.delete(request); diff --git a/src/main/java/org/elasticsearch/common/lucene/TopLevelFixedBitSetCollector.java b/src/main/java/org/elasticsearch/common/lucene/TopLevelFixedBitSetCollector.java new file mode 100644 index 0000000000000..59fc4804ace26 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/TopLevelFixedBitSetCollector.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.lucene; + +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.util.FixedBitSet; + +import java.io.IOException; + +/** + * Collector that collects the hits in a {@link FixedBitSet} instance. + */ +public class TopLevelFixedBitSetCollector extends Collector { + + private final FixedBitSet bitSet; + private int docBase; + + /** + * @param maxDoc The higest Lucene docid + 1 from a toplevel IndexReader / IndexSearcher. + */ + public TopLevelFixedBitSetCollector(int maxDoc) { + this.bitSet = new FixedBitSet(maxDoc); + } + + public void setScorer(Scorer scorer) throws IOException { + } + + public void collect(int doc) throws IOException { + bitSet.set(docBase + doc); + } + + @Override + public void setNextReader(AtomicReaderContext context) throws IOException { + this.docBase = context.docBase; + } + + public boolean acceptsDocsOutOfOrder() { + return true; + } + + /** + * @return The matched Lucene doc ids as {@link FixedBitSet} + */ + public FixedBitSet getBitSet() { + return bitSet; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index 9b9a24e424519..64812c444ef56 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -102,6 +102,7 @@ import org.elasticsearch.rest.action.termvector.RestMultiTermVectorsAction; import org.elasticsearch.rest.action.termvector.RestTermVectorAction; import org.elasticsearch.rest.action.update.RestUpdateAction; +import org.elasticsearch.rest.action.updatebyquery.RestUpdateByQueryAction; import java.util.List; @@ -199,6 +200,7 @@ protected void configure() { bind(RestMultiTermVectorsAction.class).asEagerSingleton(); bind(RestBulkAction.class).asEagerSingleton(); bind(RestUpdateAction.class).asEagerSingleton(); + bind(RestUpdateByQueryAction.class).asEagerSingleton(); bind(RestPercolateAction.class).asEagerSingleton(); bind(RestMultiPercolateAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/rest/action/support/RestActions.java b/src/main/java/org/elasticsearch/rest/action/support/RestActions.java index 8cc900284eb61..e806780412808 100644 --- a/src/main/java/org/elasticsearch/rest/action/support/RestActions.java +++ b/src/main/java/org/elasticsearch/rest/action/support/RestActions.java @@ -91,6 +91,9 @@ public static QuerySourceBuilder parseQuerySource(RestRequest request) { QueryStringQueryBuilder queryBuilder = QueryBuilders.queryString(queryString); queryBuilder.defaultField(request.param("df")); queryBuilder.analyzer(request.param("analyzer")); + queryBuilder.analyzeWildcard(request.paramAsBoolean("analyze_wildcard", false)); + queryBuilder.lowercaseExpandedTerms(request.paramAsBoolean("lowercase_expanded_terms", true)); + queryBuilder.lenient(request.paramAsBooleanOptional("lenient", null)); String defaultOperator = request.param("default_operator"); if (defaultOperator != null) { if ("OR".equals(defaultOperator)) { diff --git a/src/main/java/org/elasticsearch/rest/action/updatebyquery/RestUpdateByQueryAction.java b/src/main/java/org/elasticsearch/rest/action/updatebyquery/RestUpdateByQueryAction.java new file mode 100644 index 0000000000000..408ed2b33cd60 --- /dev/null +++ b/src/main/java/org/elasticsearch/rest/action/updatebyquery/RestUpdateByQueryAction.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.updatebyquery; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.updatebyquery.*; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestXContentBuilder; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestStatus.OK; + +/** + * Rest handler for update by query requests. + */ +public class RestUpdateByQueryAction extends BaseRestHandler { + + @Inject + public RestUpdateByQueryAction(Settings settings, Client client, RestController controller) { + super(settings, client); + controller.registerHandler(POST, "/{index}/_update_by_query", this); + controller.registerHandler(POST, "/{index}/{type}/_update_by_query", this); + } + + public void handleRequest(final RestRequest request, final RestChannel channel) { + UpdateByQueryRequest udqRequest = new UpdateByQueryRequest( + Strings.splitStringByCommaToArray(request.param("index")), + Strings.splitStringByCommaToArray(request.param("type")) + ); + udqRequest.listenerThreaded(false); + String replicationType = request.param("replication"); + if (replicationType != null) { + udqRequest.replicationType(ReplicationType.fromString(replicationType)); + } + String consistencyLevel = request.param("consistency"); + if (consistencyLevel != null) { + udqRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + } + String responseType = request.param("response"); + if (responseType != null) { + udqRequest.bulkResponseOptions(BulkResponseOption.fromString(responseType)); + } + udqRequest.routing(request.param("routing")); + String timeout = request.param("timeout"); + if (timeout != null) { + udqRequest.timeout(TimeValue.parseTimeValue(timeout, null)); + } + + // see if we have it in the body + if (request.hasContent()) { + udqRequest.source(request.content(), request.contentUnsafe()); + } else if (request.hasParam("source")) { + udqRequest.source(new BytesArray(request.param("source")), false); + } else if (request.hasParam("q")) { + UpdateByQuerySourceBuilder sourceBuilder = new UpdateByQuerySourceBuilder(); + sourceBuilder.script(request.param("script")); + sourceBuilder.scriptLang(request.param("lang")); + for (Map.Entry entry : request.params().entrySet()) { + if (entry.getKey().startsWith("sp_")) { + sourceBuilder.addScriptParam(entry.getKey().substring(3), entry.getValue()); + } + } + + sourceBuilder.query(RestActions.parseQuerySource(request).buildAsBytes(XContentType.JSON)); + udqRequest.source(sourceBuilder); + } + + client.updateByQuery(udqRequest, new ActionListener() { + + public void onResponse(UpdateByQueryResponse response) { + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); + + builder.startObject(); + builder.field(Fields.OK, !response.hasFailures()); + builder.field(Fields.TOOK, response.tookInMillis()); + builder.field(Fields.TOTAL, response.totalHits()); + builder.field(Fields.UPDATED, response.updated()); + + if (response.hasFailures()) { + builder.startObject(Fields.ERRORS); + builder.startArray(); + for (String failure : response.mainFailures()) { + builder.field(Fields.ERROR, failure); + } + builder.endArray(); + builder.endObject(); + } + + if (response.indexResponses().length != 0) { + builder.startArray(Fields.INDICES); + for (IndexUpdateByQueryResponse indexResponse : response.indexResponses()) { + builder.startObject(); + builder.field(indexResponse.index()); + builder.startObject(); + for (Map.Entry shard : indexResponse.responsesByShard().entrySet()) { + builder.startObject(shard.getKey().toString()); + if (indexResponse.failuresByShard().containsKey(shard.getKey())) { + builder.field(Fields.ERROR, indexResponse.failuresByShard().get(shard.getKey())); + } + builder.startArray(Fields.ITEMS); + for (BulkItemResponse itemResponse : shard.getValue()) { + builder.startObject(); + builder.startObject(itemResponse.getOpType()); + builder.field(Fields._INDEX, itemResponse.getIndex()); + builder.field(Fields._TYPE, itemResponse.getType()); + builder.field(Fields._ID, itemResponse.getId()); + long version = itemResponse.getVersion(); + if (version != -1) { + builder.field(Fields._VERSION, itemResponse.getVersion()); + } + if (itemResponse.isFailed()) { + builder.field(Fields.ERROR, itemResponse.getFailure().getMessage()); + } else { + builder.field(Fields.OK, true); + } + builder.endObject(); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + for (Map.Entry shard : indexResponse.failuresByShard().entrySet()) { + builder.startObject(shard.getKey().toString()); + builder.field(Fields.ERROR, shard.getValue()); + builder.endObject(); + } + builder.endObject(); + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + channel.sendResponse(new XContentRestResponse(request, OK, builder)); + } catch (Exception e) { + onFailure(e); + } + } + + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + + static final class Fields { + static final XContentBuilderString OK = new XContentBuilderString("ok"); + static final XContentBuilderString ERRORS = new XContentBuilderString("errors"); + static final XContentBuilderString ERROR = new XContentBuilderString("error"); + static final XContentBuilderString TOOK = new XContentBuilderString("took"); + static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _ID = new XContentBuilderString("_id"); + static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); + static final XContentBuilderString TOTAL = new XContentBuilderString("total"); + static final XContentBuilderString UPDATED = new XContentBuilderString("updated"); + static final XContentBuilderString ITEMS = new XContentBuilderString("items"); + static final XContentBuilderString INDICES = new XContentBuilderString("indices"); + } +} diff --git a/src/test/java/org/elasticsearch/test/client/RandomizingClient.java b/src/test/java/org/elasticsearch/test/client/RandomizingClient.java index cbdf36e9e0ae8..7c0b1005f76be 100644 --- a/src/test/java/org/elasticsearch/test/client/RandomizingClient.java +++ b/src/test/java/org/elasticsearch/test/client/RandomizingClient.java @@ -51,6 +51,9 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.action.updatebyquery.UpdateByQueryRequest; +import org.elasticsearch.action.updatebyquery.UpdateByQueryRequestBuilder; +import org.elasticsearch.action.updatebyquery.UpdateByQueryResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.internal.InternalClient; @@ -413,6 +416,21 @@ public void clearScroll(ClearScrollRequest request, ActionListener listener) { + delegate.updateByQuery(request, listener); + } + + @Override + public ActionFuture updateByQuery(UpdateByQueryRequest request) { + return delegate.updateByQuery(request); + } + + @Override + public UpdateByQueryRequestBuilder prepareUpdateByQuery() { + return delegate.prepareUpdateByQuery(); + } + @Override public ThreadPool threadPool() { return delegate.threadPool(); diff --git a/src/test/java/org/elasticsearch/test/integration/updatebyquery/UpdateByQueryTests.java b/src/test/java/org/elasticsearch/test/integration/updatebyquery/UpdateByQueryTests.java new file mode 100644 index 0000000000000..d69325520c975 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/updatebyquery/UpdateByQueryTests.java @@ -0,0 +1,305 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.updatebyquery; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.updatebyquery.BulkResponseOption; +import org.elasticsearch.action.updatebyquery.IndexUpdateByQueryResponse; +import org.elasticsearch.action.updatebyquery.UpdateByQueryResponse; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.AliasAction.newAddAliasAction; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.*; + +public class UpdateByQueryTests extends ElasticsearchIntegrationTest { + + protected void createIndex(String indexName) throws Exception { + logger.info("--> creating index test"); + prepareCreate(indexName).addMapping("type1", XContentFactory.jsonBuilder() + .startObject() + .startObject("type1") + .startObject("_timestamp").field("enabled", true).field("store", "yes").endObject() + .startObject("_ttl").field("enabled", true).field("store", "yes").endObject() + .endObject() + .endObject()) + .execute().actionGet(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.settingsBuilder() + .put("action.updatebyquery.bulk_size", 5) + .put(super.nodeSettings(nodeOrdinal)) + .build(); + } + + @Test + public void testUpdateByQuery() throws Exception { + createIndex("test"); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + + final long numDocs = 25; + for (int i = 1; i <= numDocs; i++) { + client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field1", 1).execute().actionGet(); + if (i % 10 == 0) { + client().admin().indices().prepareFlush("test").execute().actionGet(); + } + } + // Add one doc with a different type. + client().prepareIndex("test", "type2", "1").setSource("field1", 1).execute().actionGet(); + client().admin().indices().prepareRefresh("test").execute().actionGet(); + + CountResponse countResponse = client().prepareCount("test") + .setQuery(termQuery("field1", 2)).get(); + assertThat(countResponse.getCount(), equalTo(0L)); + + Map scriptParams = new HashMap(); + UpdateByQueryResponse response = client().prepareUpdateByQuery() + .setIndices("test") + .setTypes("type1") + .setIncludeBulkResponses(BulkResponseOption.ALL) + .setScript("ctx._source.field1 += 1").setScriptParams(scriptParams) + .setQuery(matchAllQuery()) + .execute() + .actionGet(); + + assertThat(response, notNullValue()); + assertThat(response.mainFailures().length, equalTo(0)); + assertThat(response.totalHits(), equalTo(numDocs)); + assertThat(response.updated(), equalTo(numDocs)); + assertThat(response.indexResponses().length, equalTo(1)); + assertThat(response.indexResponses()[0].countShardResponses(), equalTo(numDocs)); + + assertThat(response.indexResponses()[0].failuresByShard().isEmpty(), equalTo(true)); + for (BulkItemResponse[] shardResponses : response.indexResponses()[0].responsesByShard().values()) { + for (BulkItemResponse shardResponse : shardResponses) { + assertThat(shardResponse.getVersion(), equalTo(2L)); + assertThat(shardResponse.isFailed(), equalTo(false)); + assertThat(shardResponse.getFailure(), nullValue()); + assertThat(shardResponse.getFailureMessage(), nullValue()); + } + } + + client().admin().indices().prepareRefresh("test").execute().actionGet(); + countResponse = client().prepareCount("test") + .setQuery(termQuery("field1", 2)) + .execute() + .actionGet(); + assertThat(countResponse.getCount(), equalTo(numDocs)); + + response = client().prepareUpdateByQuery() + .setIndices("test") + .setTypes("type1") + .setScript("ctx._source.field1 += 1").setScriptParams(scriptParams) + .setQuery(matchAllQuery()) + .execute() + .actionGet(); + + assertThat(response, notNullValue()); + assertThat(response.totalHits(), equalTo(numDocs)); + assertThat(response.updated(), equalTo(numDocs)); + assertThat(response.indexResponses().length, equalTo(1)); + assertThat(response.indexResponses()[0].totalHits(), equalTo(numDocs)); + assertThat(response.indexResponses()[0].updated(), equalTo(numDocs)); + assertThat(response.indexResponses()[0].failuresByShard().size(), equalTo(0)); + assertThat(response.indexResponses()[0].responsesByShard().size(), equalTo(0)); + + client().admin().indices().prepareRefresh("test").execute().actionGet(); + countResponse = client().prepareCount("test") + .setQuery(termQuery("field1", 3)) + .execute() + .actionGet(); + assertThat(countResponse.getCount(), equalTo(numDocs)); + } + + @Test + public void testUpdateByQuery_multipleIndices() throws Exception { + createIndex("test1"); + createIndex("test2"); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + + final long numDocs = 100; + final long docsPerIndex = 10; + String current = "test0"; + int id = 1; + for (int i = 0; i < numDocs; i++) { + if (i % docsPerIndex == 0) { + current = "test" + (i / docsPerIndex); + id = 1; + } + client().prepareIndex(current, "type1", Integer.toString(id++)).setSource("field1", 1).execute().actionGet(); + if (i % 5 == 0) { + client().admin().indices().prepareFlush(current).execute().actionGet(); + } + } + // Add one doc with a different type. + client().admin().indices().prepareRefresh("*").execute().actionGet(); + + CountResponse countResponse = client().prepareCount("*") + .setQuery(termQuery("field1", 2)) + .execute() + .actionGet(); + assertThat(countResponse.getCount(), equalTo(0L)); + + Map scriptParams = new HashMap(); + UpdateByQueryResponse response = client().prepareUpdateByQuery() + .setIndices("*") + .setTypes("type1") + .setIncludeBulkResponses(BulkResponseOption.ALL) + .setScript("ctx._source.field1 += 1").setScriptParams(scriptParams) + .setQuery(matchAllQuery()) + .execute() + .actionGet(); + + assertThat(response, notNullValue()); + assertThat(response.totalHits(), equalTo(numDocs)); + assertThat(response.updated(), equalTo(numDocs)); + assertThat(response.indexResponses().length, equalTo(10)); + Arrays.sort(response.indexResponses(), new Comparator() { + + public int compare(IndexUpdateByQueryResponse res1, IndexUpdateByQueryResponse res2) { + int index1 = res1.index().charAt(res1.index().length() - 1); + int index2 = res2.index().charAt(res2.index().length() - 1); + return index1 - index2; + } + + }); + + for (int i = 0; i < response.indexResponses().length; i++) { + String index = "test" + i; + assertThat(response.indexResponses()[i].index(), equalTo(index)); + assertThat(response.indexResponses()[i].countShardResponses(), equalTo(docsPerIndex)); + + assertThat(response.indexResponses()[i].failuresByShard().isEmpty(), equalTo(true)); + for (BulkItemResponse[] shardResponses : response.indexResponses()[i].responsesByShard().values()) { + for (BulkItemResponse shardResponse : shardResponses) { + assertThat(shardResponse.getVersion(), equalTo(2L)); + assertThat(shardResponse.isFailed(), equalTo(false)); + assertThat(shardResponse.getFailure(), nullValue()); + assertThat(shardResponse.getFailureMessage(), nullValue()); + } + } + } + + assertThat(response.mainFailures().length, equalTo(0)); + + client().admin().indices().prepareRefresh("*").execute().actionGet(); + countResponse = client().prepareCount("*") + .setQuery(termQuery("field1", 2)) + .execute() + .actionGet(); + assertThat(countResponse.getCount(), equalTo(numDocs)); + } + + @Test + public void testUpdateByQuery_usingAliases() { + client().admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + + client().admin().indices().prepareAliases().addAliasAction( + newAddAliasAction("test", "alias0").routing("0") + ).execute().actionGet(); + + client().admin().indices().prepareAliases().addAliasAction( + newAddAliasAction("test", "alias1").filter(FilterBuilders.termFilter("field", "value2")).routing("1") + ).execute().actionGet(); + + client().prepareIndex("alias0", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet(); + client().prepareIndex("alias0", "type1", "2").setSource("field", "value2").setRefresh(true).execute().actionGet(); + client().admin().indices().prepareFlush("test").execute().actionGet(); + client().prepareIndex("alias1", "type1", "3").setSource("field", "value1").setRefresh(true).execute().actionGet(); + client().prepareIndex("alias1", "type1", "4").setSource("field", "value2").setRefresh(true).execute().actionGet(); + + assertThat(client().prepareGet("alias0", "type1", "1").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("alias0", "type1", "2").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("alias1", "type1", "3").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("alias1", "type1", "4").execute().actionGet().isExists(), equalTo(true)); + + UpdateByQueryResponse response = client().prepareUpdateByQuery() + .setIndices("alias1") + .setQuery(matchAllQuery()) + .setScript("ctx.op = \"delete\"") + .execute().actionGet(); + assertThat(response.totalHits(), equalTo(1L)); + assertThat(response.updated(), equalTo(1L)); + + response = client().prepareUpdateByQuery() + .setIndices("alias0") + .setQuery(matchAllQuery()) + .setScript("ctx.op = \"delete\"") + .execute().actionGet(); + assertThat(response.totalHits(), equalTo(2L)); + assertThat(response.updated(), equalTo(2L)); + + assertThat(client().prepareGet("alias0", "type1", "1").execute().actionGet().isExists(), equalTo(false)); + assertThat(client().prepareGet("alias0", "type1", "2").execute().actionGet().isExists(), equalTo(false)); + assertThat(client().prepareGet("alias1", "type1", "3").execute().actionGet().isExists(), equalTo(true)); + assertThat(client().prepareGet("alias1", "type1", "4").execute().actionGet().isExists(), equalTo(false)); + } + + @Test + public void testUpdateByQuery_noMatches() throws Exception { + createIndex("test"); + client().prepareIndex("test", "type1", "1").setSource("field1", 1).execute().actionGet(); + client().admin().indices().prepareRefresh("test").execute().actionGet(); + + CountResponse countResponse = client().prepareCount("test") + .setQuery(termQuery("field2", 1)).get(); + assertHitCount(countResponse, 0); + + Map scriptParams = new HashMap(); + UpdateByQueryResponse response = client().prepareUpdateByQuery() + .setIndices("test") + .setTypes("type1") + .setIncludeBulkResponses(BulkResponseOption.ALL) + .setScript("ctx._source.field1 += 1").setScriptParams(scriptParams) + .setQuery(termQuery("field2", 1)) + .execute() + .actionGet(); + + assertThat(response, notNullValue()); + assertThat(response.mainFailures().length, equalTo(0)); + assertThat(response.totalHits(), equalTo(0l)); + assertThat(response.updated(), equalTo(0l)); + assertThat(response.indexResponses(), arrayWithSize(1)); + assertThat(response.indexResponses()[0].responsesByShard().isEmpty(), is(true)); + } + +} diff --git a/src/test/java/org/elasticsearch/test/stress/updatebyquery/UpdateByQueryStressTest.java b/src/test/java/org/elasticsearch/test/stress/updatebyquery/UpdateByQueryStressTest.java new file mode 100644 index 0000000000000..ee8cf2d8cd33a --- /dev/null +++ b/src/test/java/org/elasticsearch/test/stress/updatebyquery/UpdateByQueryStressTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.stress.updatebyquery; + +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.updatebyquery.IndexUpdateByQueryResponse; +import org.elasticsearch.action.updatebyquery.UpdateByQueryResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; + +import java.util.Locale; +import java.util.Map; + +/** + */ +public class UpdateByQueryStressTest { + + public static void main(String[] args) { + final int NUMBER_OF_NODES = 4; + final int NUMBER_OF_INDICES = 5; + final int BATCH = 300000; + + final Settings nodeSettings = ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 2) +// .put("action.updatebyquery.bulk_size", 5) + .put("index.number_of_replicas", 0).build(); + final Node[] nodes = new Node[NUMBER_OF_NODES]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = NodeBuilder.nodeBuilder().settings(nodeSettings).node(); + } + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + public void run() { + for (Node node : nodes) { + node.close(); + } + } + })); + + try { + Client client = nodes.length == 1 ? nodes[0].client() : nodes[1].client(); + try { + client.admin().indices().prepareDelete().execute().actionGet(); + } catch (Exception e) { + // ignore + } + + client.admin().indices().prepareUpdateSettings("*").setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1)); + for (int i = 0; i < NUMBER_OF_INDICES; i++) { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + for (int j = 0; j < BATCH; j++) { + bulkRequest.add(Requests.indexRequest("test" + i).id(Integer.toString(j)).type("type").source("field", "1")); + if (bulkRequest.numberOfActions() % 10000 == 0) { + bulkRequest.execute().actionGet(); + bulkRequest = client.prepareBulk(); + } + } + if (bulkRequest.numberOfActions() > 0) { + bulkRequest.execute().actionGet(); + } + } + client.admin().indices().prepareRefresh("*").execute().actionGet(); + client.admin().indices().prepareUpdateSettings("*").setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", 1)); + client.admin().cluster().prepareHealth("*").setWaitForGreenStatus().execute().actionGet(); + + UpdateByQueryResponse response = client.prepareUpdateByQuery() + .setIndices("*") + .setQuery(QueryBuilders.matchAllQuery()) + .setScript("ctx._source.field += 1") + .execute() + .actionGet(); + + System.out.printf(Locale.ENGLISH, "Update by query took: %d ms and matches with %d documents\n", response.tookInMillis(), response.totalHits()); + System.out.printf(Locale.ENGLISH, "and %d documents have actually successfully been updated.\n", response.updated()); + if (response.totalHits() != BATCH * NUMBER_OF_INDICES) { + System.err.printf( + Locale.ENGLISH, + "Number of matches is incorrect! Expected %d but was %d\n", + BATCH * NUMBER_OF_INDICES, + response.totalHits() + ); + } + + if (response.indexResponses().length != NUMBER_OF_INDICES) { + System.err.printf( + Locale.ENGLISH, + "Number of index sub responses is incorrect! Expected %d but was %d\n", + BATCH, + response.indexResponses().length + ); + } + + for (IndexUpdateByQueryResponse indexResponse : response.indexResponses()) { + for (Map.Entry bulkItemResponses : indexResponse.responsesByShard().entrySet()) { + for (BulkItemResponse bulkItemResponse : bulkItemResponses.getValue()) { + IndexResponse indexRes = bulkItemResponse.getResponse(); + if (indexRes.getVersion() != 2) { + System.out.printf( + Locale.ENGLISH, + "Version doesn't match for id[%s] expected version 2, but was %d\n", + indexRes.getId(), + indexRes.getVersion() + ); + } + } + } + } + + } finally { + for (Node node : nodes) { + node.close(); + } + } + + } +}