Skip to content

Commit 08152b0

Browse files
committed
Update API enhancement - add support for scripted upserts.
In the case of inserts the UpdateHelper class will now allow the script used to apply updates to run on the upsert doc provided by clients. This allows the logic for managing the internal state of the data item to be managed by the script and is not reliant on clients performing the initialisation of data structures managed by the script. Closes #7143
1 parent 19b1dbd commit 08152b0

File tree

7 files changed

+192
-17
lines changed

7 files changed

+192
-17
lines changed

docs/reference/docs/update.asciidoc

+35-1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
126126
If `name` was `new_name` before the request was sent then the entire update
127127
request is ignored.
128128

129+
=== Upserts
129130
There is also support for `upsert`. If the document does
130131
not already exists, the content of the `upsert` element will be used to
131132
index the fresh doc:
@@ -142,8 +143,38 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
142143
}
143144
}'
144145
--------------------------------------------------
146+
added[1.4.0]
145147

146-
Last it also supports `doc_as_upsert`. So that the
148+
If the document does not exist you may want your update script to
149+
run anyway in order to initialize the document contents using
150+
business logic unknown to the client. In this case pass the
151+
new `scripted_upsert` parameter with the value `true`.
152+
153+
[source,js]
154+
--------------------------------------------------
155+
curl -XPOST 'localhost:9200/sessions/session/dh3sgudg8gsrgl/_update' -d '{
156+
"script_id" : "my_web_session_summariser",
157+
"scripted_upsert":true,
158+
"params" : {
159+
"pageViewEvent" : {
160+
"url":"foo.com/bar",
161+
"response":404,
162+
"time":"2014-01-01 12:32"
163+
}
164+
},
165+
"upsert" : {
166+
}
167+
}'
168+
--------------------------------------------------
169+
The default `scripted_upsert` setting is `false` meaning the script is not executed for inserts.
170+
However, in scenarios like the one above we may be using a non-trivial script stored
171+
using the new "indexed scripts" feature. The script may be deriving properties
172+
like the duration of our web session based on observing multiple page view events so the
173+
client can supply a blank "upsert" document and allow the script to fill in most of the details
174+
using the events passed in the `params` element.
175+
176+
177+
Last, the upsert facility also supports `doc_as_upsert`. So that the
147178
provided document will be inserted if the document does not already
148179
exist. This will reduce the amount of data that needs to be sent to
149180
elasticsearch.
@@ -158,6 +189,9 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
158189
}'
159190
--------------------------------------------------
160191

192+
193+
=== Parameters
194+
161195
The update operation supports similar parameters as the index API,
162196
including:
163197

rest-api-spec/api/update.json

+7
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@
6161
"script": {
6262
"description": "The URL-encoded script definition (instead of using request body)"
6363
},
64+
"script_id": {
65+
"description": "The id of a stored script"
66+
},
67+
"scripted_upsert": {
68+
"type": "boolean",
69+
"description": "True if the script referenced in script or script_id should be called to perform inserts - defaults to false"
70+
},
6471
"timeout": {
6572
"type": "time",
6673
"description": "Explicit operation timeout"

rest-api-spec/test/update/25_script_upsert.yaml

+19
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,24 @@
3737
id: 1
3838

3939
- match: { _source.foo: xxx }
40+
41+
- do:
42+
update:
43+
index: test_1
44+
type: test
45+
id: 2
46+
body:
47+
script: "ctx._source.foo = bar"
48+
params: { bar: 'xxx' }
49+
upsert: { foo: baz }
50+
scripted_upsert: true
51+
52+
- do:
53+
get:
54+
index: test_1
55+
type: test
56+
id: 2
57+
58+
- match: { _source.foo: xxx }
4059

4160

src/main/java/org/elasticsearch/action/update/UpdateHelper.java

+54-11
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,49 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
9090
if (request.upsertRequest() == null && !request.docAsUpsert()) {
9191
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
9292
}
93+
Long ttl = null;
9394
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
95+
if (request.scriptedUpsert() && (request.script() != null)) {
96+
// Run the script to perform the create logic
97+
IndexRequest upsert = request.upsertRequest();
98+
Map<String, Object> upsertDoc = upsert.sourceAsMap();
99+
Map<String, Object> ctx = new HashMap<>(2);
100+
// Tell the script that this is a create and not an update
101+
ctx.put("op", "create");
102+
ctx.put("_source", upsertDoc);
103+
try {
104+
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptType, request.scriptParams);
105+
script.setNextVar("ctx", ctx);
106+
script.run();
107+
// we need to unwrap the ctx...
108+
ctx = (Map<String, Object>) script.unwrap(ctx);
109+
} catch (Exception e) {
110+
throw new ElasticsearchIllegalArgumentException("failed to execute script", e);
111+
}
112+
//Allow the script to set TTL using ctx._ttl
113+
ttl = getTTLFromScriptContext(ctx);
114+
//Allow the script to abort the create by setting "op" to "none"
115+
String scriptOpChoice = (String) ctx.get("op");
116+
117+
// Only valid options for an upsert script are "create"
118+
// (the default) or "none", meaning abort upsert
119+
if (!"create".equals(scriptOpChoice)) {
120+
if (!"none".equals(scriptOpChoice)) {
121+
logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script);
122+
}
123+
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(),
124+
getResult.getVersion(), false);
125+
update.setGetResult(getResult);
126+
return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON);
127+
}
128+
indexRequest.source((Map)ctx.get("_source"));
129+
}
130+
94131
indexRequest.index(request.index()).type(request.type()).id(request.id())
95132
// it has to be a "create!"
96-
.create(true)
133+
.create(true)
97134
.routing(request.routing())
135+
.ttl(ttl)
98136
.refresh(request.refresh())
99137
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
100138
indexRequest.operationThreaded(false);
@@ -121,7 +159,6 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
121159
String operation = null;
122160
String timestamp = null;
123161
Long ttl = null;
124-
Object fetchedTTL = null;
125162
final Map<String, Object> updatedSourceAsMap;
126163
final XContentType updateSourceContentType = sourceAndContent.v1();
127164
String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
@@ -164,15 +201,8 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
164201
operation = (String) ctx.get("op");
165202
timestamp = (String) ctx.get("_timestamp");
166203

167-
fetchedTTL = ctx.get("_ttl");
168-
if (fetchedTTL != null) {
169-
if (fetchedTTL instanceof Number) {
170-
ttl = ((Number) fetchedTTL).longValue();
171-
} else {
172-
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
173-
}
174-
}
175-
204+
ttl = getTTLFromScriptContext(ctx);
205+
176206
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
177207
}
178208

@@ -211,6 +241,19 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
211241
}
212242
}
213243

244+
private Long getTTLFromScriptContext(Map<String, Object> ctx) {
245+
Long ttl = null;
246+
Object fetchedTTL = ctx.get("_ttl");
247+
if (fetchedTTL != null) {
248+
if (fetchedTTL instanceof Number) {
249+
ttl = ((Number) fetchedTTL).longValue();
250+
} else {
251+
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
252+
}
253+
}
254+
return ttl;
255+
}
256+
214257
/**
215258
* Extracts the fields from the updated document to be returned in a update response
216259
*/

src/main/java/org/elasticsearch/action/update/UpdateRequest.java

+18
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
7676

7777
private IndexRequest upsertRequest;
7878

79+
private boolean scriptedUpsert = false;
7980
private boolean docAsUpsert = false;
8081
private boolean detectNoop = false;
8182

@@ -596,6 +597,8 @@ public UpdateRequest source(BytesReference source) throws Exception {
596597
scriptParams = parser.map();
597598
} else if ("lang".equals(currentFieldName)) {
598599
scriptLang = parser.text();
600+
} else if ("scripted_upsert".equals(currentFieldName)) {
601+
scriptedUpsert = parser.booleanValue();
599602
} else if ("upsert".equals(currentFieldName)) {
600603
XContentBuilder builder = XContentFactory.contentBuilder(xContentType);
601604
builder.copyCurrentStructure(parser);
@@ -621,6 +624,15 @@ public boolean docAsUpsert() {
621624
public void docAsUpsert(boolean shouldUpsertDoc) {
622625
this.docAsUpsert = shouldUpsertDoc;
623626
}
627+
628+
public boolean scriptedUpsert(){
629+
return this.scriptedUpsert;
630+
}
631+
632+
public void scriptedUpsert(boolean scriptedUpsert) {
633+
this.scriptedUpsert = scriptedUpsert;
634+
}
635+
624636

625637
@Override
626638
public void readFrom(StreamInput in) throws IOException {
@@ -663,6 +675,9 @@ public void readFrom(StreamInput in) throws IOException {
663675
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
664676
detectNoop = in.readBoolean();
665677
}
678+
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
679+
scriptedUpsert = in.readBoolean();
680+
}
666681
}
667682

668683
@Override
@@ -715,6 +730,9 @@ public void writeTo(StreamOutput out) throws IOException {
715730
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
716731
out.writeBoolean(detectNoop);
717732
}
733+
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
734+
out.writeBoolean(scriptedUpsert);
735+
}
718736
}
719737

720738
}

src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java

+9
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,15 @@ public UpdateRequestBuilder setDetectNoop(boolean detectNoop) {
353353
request.detectNoop(detectNoop);
354354
return this;
355355
}
356+
357+
358+
/**
359+
* Sets whether the script should be run in the case of an insert
360+
*/
361+
public UpdateRequestBuilder setScriptedUpsert(boolean scriptedUpsert) {
362+
request.scriptedUpsert(scriptedUpsert);
363+
return this;
364+
}
356365

357366
@Override
358367
protected void doExecute(ActionListener<UpdateResponse> listener) {

src/test/java/org/elasticsearch/update/UpdateTests.java

+50-5
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@
1919

2020
package org.elasticsearch.update;
2121

22-
import org.apache.lucene.document.Field;
2322
import org.apache.lucene.index.MergePolicy;
2423
import org.apache.lucene.index.NoMergePolicy;
2524
import org.apache.lucene.util.LuceneTestCase.Slow;
2625
import org.elasticsearch.ElasticsearchException;
2726
import org.elasticsearch.ElasticsearchTimeoutException;
2827
import org.elasticsearch.action.ActionListener;
2928
import org.elasticsearch.action.ActionRequestValidationException;
29+
import org.elasticsearch.action.delete.DeleteRequest;
3030
import org.elasticsearch.action.delete.DeleteResponse;
3131
import org.elasticsearch.action.get.GetResponse;
3232
import org.elasticsearch.action.update.UpdateRequest;
33-
import org.elasticsearch.action.delete.DeleteRequest;
3433
import org.elasticsearch.action.update.UpdateRequestBuilder;
3534
import org.elasticsearch.action.update.UpdateResponse;
3635
import org.elasticsearch.client.transport.NoNodeAvailableException;
@@ -44,14 +43,15 @@
4443
import org.elasticsearch.index.engine.VersionConflictEngineException;
4544
import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
4645
import org.elasticsearch.index.merge.policy.MergePolicyModule;
47-
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
48-
import org.elasticsearch.index.store.CorruptedFileTest;
4946
import org.elasticsearch.index.store.Store;
5047
import org.elasticsearch.script.ScriptService;
5148
import org.elasticsearch.test.ElasticsearchIntegrationTest;
5249
import org.junit.Test;
5350

54-
import java.util.*;
51+
import java.util.ArrayList;
52+
import java.util.HashMap;
53+
import java.util.List;
54+
import java.util.Map;
5555
import java.util.concurrent.CopyOnWriteArrayList;
5656
import java.util.concurrent.CountDownLatch;
5757
import java.util.concurrent.Semaphore;
@@ -179,6 +179,51 @@ public void testUpsert() throws Exception {
179179
assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2"));
180180
}
181181
}
182+
183+
@Test
184+
public void testScriptedUpsert() throws Exception {
185+
createIndex();
186+
ensureGreen();
187+
188+
// Script logic is
189+
// 1) New accounts take balance from "balance" in upsert doc and first payment is charged at 50%
190+
// 2) Existing accounts subtract full payment from balance stored in elasticsearch
191+
192+
String script="int oldBalance=ctx._source.balance;"+
193+
"int deduction=ctx.op == \"create\" ? (payment/2) : payment;"+
194+
"ctx._source.balance=oldBalance-deduction;";
195+
int openingBalance=10;
196+
197+
// Pay money from what will be a new account and opening balance comes from upsert doc
198+
// provided by client
199+
UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1")
200+
.setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject())
201+
.setScriptedUpsert(true)
202+
.addScriptParam("payment", 2)
203+
.setScript(script, ScriptService.ScriptType.INLINE)
204+
.execute().actionGet();
205+
assertTrue(updateResponse.isCreated());
206+
207+
for (int i = 0; i < 5; i++) {
208+
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
209+
assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("9"));
210+
}
211+
212+
// Now pay money for an existing account where balance is stored in es
213+
updateResponse = client().prepareUpdate("test", "type1", "1")
214+
.setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject())
215+
.setScriptedUpsert(true)
216+
.addScriptParam("payment", 2)
217+
.setScript(script, ScriptService.ScriptType.INLINE)
218+
.execute().actionGet();
219+
assertFalse(updateResponse.isCreated());
220+
221+
222+
for (int i = 0; i < 5; i++) {
223+
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
224+
assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("7"));
225+
}
226+
}
182227

183228
@Test
184229
public void testUpsertDoc() throws Exception {

0 commit comments

Comments
 (0)