diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index 8cb0f86f1d8cb..f9416c069f3c7 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -126,6 +126,7 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{ If `name` was `new_name` before the request was sent then the entire update request is ignored. +=== Upserts There is also support for `upsert`. If the document does not already exists, the content of the `upsert` element will be used to index the fresh doc: @@ -142,8 +143,38 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{ } }' -------------------------------------------------- +added[1.4.0] -Last it also supports `doc_as_upsert`. So that the +If the document does not exist you may want your update script to +run anyway in order to initialize the document contents using +business logic unknown to the client. In this case pass the +new `scripted_upsert` parameter with the value `true`. + +[source,js] +-------------------------------------------------- +curl -XPOST 'localhost:9200/sessions/session/dh3sgudg8gsrgl/_update' -d '{ + "script_id" : "my_web_session_summariser", + "scripted_upsert":true, + "params" : { + "pageViewEvent" : { + "url":"foo.com/bar", + "response":404, + "time":"2014-01-01 12:32" + } + }, + "upsert" : { + } +}' +-------------------------------------------------- +The default `scripted_upsert` setting is `false` meaning the script is not executed for inserts. +However, in scenarios like the one above we may be using a non-trivial script stored +using the new "indexed scripts" feature. The script may be deriving properties +like the duration of our web session based on observing multiple page view events so the +client can supply a blank "upsert" document and allow the script to fill in most of the details +using the events passed in the `params` element. + + +Last, the upsert facility also supports `doc_as_upsert`. So that the provided document will be inserted if the document does not already exist. This will reduce the amount of data that needs to be sent to elasticsearch. @@ -158,6 +189,9 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{ }' -------------------------------------------------- + +=== Parameters + The update operation supports similar parameters as the index API, including: diff --git a/rest-api-spec/api/update.json b/rest-api-spec/api/update.json index 85ce81ea1f57e..6196067f63cc8 100644 --- a/rest-api-spec/api/update.json +++ b/rest-api-spec/api/update.json @@ -61,6 +61,13 @@ "script": { "description": "The URL-encoded script definition (instead of using request body)" }, + "script_id": { + "description": "The id of a stored script" + }, + "scripted_upsert": { + "type": "boolean", + "description": "True if the script referenced in script or script_id should be called to perform inserts - defaults to false" + }, "timeout": { "type": "time", "description": "Explicit operation timeout" diff --git a/rest-api-spec/test/update/25_script_upsert.yaml b/rest-api-spec/test/update/25_script_upsert.yaml index 64226b7e89945..c91992affaa7e 100644 --- a/rest-api-spec/test/update/25_script_upsert.yaml +++ b/rest-api-spec/test/update/25_script_upsert.yaml @@ -37,5 +37,24 @@ id: 1 - match: { _source.foo: xxx } + + - do: + update: + index: test_1 + type: test + id: 2 + body: + script: "ctx._source.foo = bar" + params: { bar: 'xxx' } + upsert: { foo: baz } + scripted_upsert: true + + - do: + get: + index: test_1 + type: test + id: 2 + + - match: { _source.foo: xxx } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 0b177e2d7ff57..a1bad51557d96 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -90,11 +90,49 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) { if (request.upsertRequest() == null && !request.docAsUpsert()) { throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); } + Long ttl = null; IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest(); + if (request.scriptedUpsert() && (request.script() != null)) { + // Run the script to perform the create logic + IndexRequest upsert = request.upsertRequest(); + Map upsertDoc = upsert.sourceAsMap(); + Map ctx = new HashMap<>(2); + // Tell the script that this is a create and not an update + ctx.put("op", "create"); + ctx.put("_source", upsertDoc); + try { + ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptType, request.scriptParams); + script.setNextVar("ctx", ctx); + script.run(); + // we need to unwrap the ctx... + ctx = (Map) script.unwrap(ctx); + } catch (Exception e) { + throw new ElasticsearchIllegalArgumentException("failed to execute script", e); + } + //Allow the script to set TTL using ctx._ttl + ttl = getTTLFromScriptContext(ctx); + //Allow the script to abort the create by setting "op" to "none" + String scriptOpChoice = (String) ctx.get("op"); + + // Only valid options for an upsert script are "create" + // (the default) or "none", meaning abort upsert + if (!"create".equals(scriptOpChoice)) { + if (!"none".equals(scriptOpChoice)) { + logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script); + } + UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), + getResult.getVersion(), false); + update.setGetResult(getResult); + return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON); + } + indexRequest.source((Map)ctx.get("_source")); + } + indexRequest.index(request.index()).type(request.type()).id(request.id()) // it has to be a "create!" - .create(true) + .create(true) .routing(request.routing()) + .ttl(ttl) .refresh(request.refresh()) .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); indexRequest.operationThreaded(false); @@ -121,7 +159,6 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) { String operation = null; String timestamp = null; Long ttl = null; - Object fetchedTTL = null; final Map updatedSourceAsMap; final XContentType updateSourceContentType = sourceAndContent.v1(); String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null; @@ -164,15 +201,8 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) { operation = (String) ctx.get("op"); timestamp = (String) ctx.get("_timestamp"); - fetchedTTL = ctx.get("_ttl"); - if (fetchedTTL != null) { - if (fetchedTTL instanceof Number) { - ttl = ((Number) fetchedTTL).longValue(); - } else { - ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); - } - } - + ttl = getTTLFromScriptContext(ctx); + updatedSourceAsMap = (Map) ctx.get("_source"); } @@ -211,6 +241,19 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) { } } + private Long getTTLFromScriptContext(Map ctx) { + Long ttl = null; + Object fetchedTTL = ctx.get("_ttl"); + if (fetchedTTL != null) { + if (fetchedTTL instanceof Number) { + ttl = ((Number) fetchedTTL).longValue(); + } else { + ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); + } + } + return ttl; + } + /** * Extracts the fields from the updated document to be returned in a update response */ diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 529b27742c774..94a197550730c 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -76,6 +76,7 @@ public class UpdateRequest extends InstanceShardOperationRequest private IndexRequest upsertRequest; + private boolean scriptedUpsert = false; private boolean docAsUpsert = false; private boolean detectNoop = false; @@ -596,6 +597,8 @@ public UpdateRequest source(BytesReference source) throws Exception { scriptParams = parser.map(); } else if ("lang".equals(currentFieldName)) { scriptLang = parser.text(); + } else if ("scripted_upsert".equals(currentFieldName)) { + scriptedUpsert = parser.booleanValue(); } else if ("upsert".equals(currentFieldName)) { XContentBuilder builder = XContentFactory.contentBuilder(xContentType); builder.copyCurrentStructure(parser); @@ -621,6 +624,15 @@ public boolean docAsUpsert() { public void docAsUpsert(boolean shouldUpsertDoc) { this.docAsUpsert = shouldUpsertDoc; } + + public boolean scriptedUpsert(){ + return this.scriptedUpsert; + } + + public void scriptedUpsert(boolean scriptedUpsert) { + this.scriptedUpsert = scriptedUpsert; + } + @Override public void readFrom(StreamInput in) throws IOException { @@ -663,6 +675,9 @@ public void readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_1_3_0)) { detectNoop = in.readBoolean(); } + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + scriptedUpsert = in.readBoolean(); + } } @Override @@ -715,6 +730,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_1_3_0)) { out.writeBoolean(detectNoop); } + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + out.writeBoolean(scriptedUpsert); + } } } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index 1235439cd95ed..640369288eae9 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -353,6 +353,15 @@ public UpdateRequestBuilder setDetectNoop(boolean detectNoop) { request.detectNoop(detectNoop); return this; } + + + /** + * Sets whether the script should be run in the case of an insert + */ + public UpdateRequestBuilder setScriptedUpsert(boolean scriptedUpsert) { + request.scriptedUpsert(scriptedUpsert); + return this; + } @Override protected void doExecute(ActionListener listener) { diff --git a/src/test/java/org/elasticsearch/update/UpdateTests.java b/src/test/java/org/elasticsearch/update/UpdateTests.java index e60db0afc12f9..9fcd5149a7807 100644 --- a/src/test/java/org/elasticsearch/update/UpdateTests.java +++ b/src/test/java/org/elasticsearch/update/UpdateTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.update; -import org.apache.lucene.document.Field; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.util.LuceneTestCase.Slow; @@ -27,10 +26,10 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.NoNodeAvailableException; @@ -44,14 +43,15 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyModule; -import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider; -import org.elasticsearch.index.store.CorruptedFileTest; import org.elasticsearch.index.store.Store; import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -179,6 +179,51 @@ public void testUpsert() throws Exception { assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2")); } } + + @Test + public void testScriptedUpsert() throws Exception { + createIndex(); + ensureGreen(); + + // Script logic is + // 1) New accounts take balance from "balance" in upsert doc and first payment is charged at 50% + // 2) Existing accounts subtract full payment from balance stored in elasticsearch + + String script="int oldBalance=ctx._source.balance;"+ + "int deduction=ctx.op == \"create\" ? (payment/2) : payment;"+ + "ctx._source.balance=oldBalance-deduction;"; + int openingBalance=10; + + // Pay money from what will be a new account and opening balance comes from upsert doc + // provided by client + UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1") + .setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject()) + .setScriptedUpsert(true) + .addScriptParam("payment", 2) + .setScript(script, ScriptService.ScriptType.INLINE) + .execute().actionGet(); + assertTrue(updateResponse.isCreated()); + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("9")); + } + + // Now pay money for an existing account where balance is stored in es + updateResponse = client().prepareUpdate("test", "type1", "1") + .setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject()) + .setScriptedUpsert(true) + .addScriptParam("payment", 2) + .setScript(script, ScriptService.ScriptType.INLINE) + .execute().actionGet(); + assertFalse(updateResponse.isCreated()); + + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("7")); + } + } @Test public void testUpsertDoc() throws Exception {