diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 7513d8889ecff..c1303c6b56cfd 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -402,7 +402,7 @@ T request() { } private UpdateResult shardUpdateOperation(IndexMetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) { - UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard); + UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard, threadPool::estimatedTimeInMillis); switch (translate.getResponseResult()) { case CREATED: case UPDATED: diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index e5322f51d50f3..2523496746ada 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -176,7 +176,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< final ShardId shardId = request.getShardId(); final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.getId()); - final UpdateHelper.Result result = updateHelper.prepare(request, indexShard); + final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis); switch (result.getResponseResult()) { case CREATED: IndexRequest upsertRequest = result.action(); diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index d8497d76c2923..3649f17c48d26 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -58,6 +58,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.LongSupplier; /** * Helper for translating an update request to an index, delete request or update response. @@ -73,18 +74,18 @@ public UpdateHelper(Settings settings, ScriptService scriptService) { /** * Prepares an update request by converting it into an index or delete request or an update response (no action). */ - public Result prepare(UpdateRequest request, IndexShard indexShard) { + public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { final GetResult getResult = indexShard.getService().get(request.type(), request.id(), new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME}, true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE); - return prepare(indexShard.shardId(), request, getResult); + return prepare(indexShard.shardId(), request, getResult, nowInMillis); } /** * Prepares an update request by converting it into an index or delete request or an update response (no action). */ @SuppressWarnings("unchecked") - protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult) { + protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) { long getDateNS = System.nanoTime(); if (!getResult.isExists()) { if (request.upsertRequest() == null && !request.docAsUpsert()) { @@ -100,6 +101,7 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult // Tell the script that this is a create and not an update ctx.put("op", "create"); ctx.put("_source", upsertDoc); + ctx.put("_now", nowInMillis.getAsLong()); ctx = executeScript(request.script, ctx); //Allow the script to set TTL using ctx._ttl if (ttl == null) { @@ -188,6 +190,7 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult ctx.put("_timestamp", originalTimestamp); ctx.put("_ttl", originalTtl); ctx.put("_source", sourceAndContent.v2()); + ctx.put("_now", nowInMillis.getAsLong()); ctx = executeScript(request.script, ctx); diff --git a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index cb27a527f6357..de0705dcab2e6 100644 --- a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -28,13 +28,25 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.env.Environment; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptContextRegistry; +import org.elasticsearch.script.ScriptEngineRegistry; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService.ScriptType; +import org.elasticsearch.script.ScriptSettings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.watcher.ResourceWatcherService; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.arrayContaining; @@ -184,9 +196,10 @@ public void testUpdateRequestWithTTL() throws Exception { .doc(jsonBuilder().startObject().field("fooz", "baz").endObject()) .upsert(indexRequest); + long nowInMillis = randomPositiveLong(); // We simulate that the document is not existing yet GetResult getResult = new GetResult("test", "type1", "1", 0, false, null, null); - UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0),updateRequest, getResult); + UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0),updateRequest, getResult, () -> nowInMillis); Streamable action = result.action(); assertThat(action, instanceOf(IndexRequest.class)); IndexRequest indexAction = (IndexRequest) action; @@ -203,7 +216,7 @@ public void testUpdateRequestWithTTL() throws Exception { // We simulate that the document is not existing yet getResult = new GetResult("test", "type1", "2", 0, false, null, null); - result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult); + result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis); action = result.action(); assertThat(action, instanceOf(IndexRequest.class)); indexAction = (IndexRequest) action; @@ -276,4 +289,70 @@ public void testFetchSourceParsing() throws Exception { assertThat(request.fetchSource().includes()[0], equalTo("path.inner.*")); assertThat(request.fetchSource().excludes()[0], equalTo("another.inner.*")); } + + public void testNowInScript() throws IOException { + Path genericConfigFolder = createTempDir(); + Settings baseSettings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .put(Environment.PATH_CONF_SETTING.getKey(), genericConfigFolder) + .build(); + Environment environment = new Environment(baseSettings); + Map, Object>> scripts = new HashMap<>(); + scripts.put("ctx._source.update_timestamp = ctx._now", + (vars) -> { + Map ctx = (Map) vars.get("ctx"); + Map source = (Map) ctx.get("_source"); + source.put("update_timestamp", ctx.get("_now")); + return null;}); + scripts.put("ctx._timestamp = ctx._now", + (vars) -> { + Map ctx = (Map) vars.get("ctx"); + ctx.put("_timestamp", ctx.get("_now")); + return null;}); + ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); + ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(new MockScriptEngine("mock", + scripts))); + + ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry); + ScriptService scriptService = new ScriptService(baseSettings, environment, + new ResourceWatcherService(baseSettings, null), scriptEngineRegistry, scriptContextRegistry, scriptSettings); + TimeValue providedTTLValue = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl"); + Settings settings = settings(Version.CURRENT).build(); + + UpdateHelper updateHelper = new UpdateHelper(settings, scriptService); + + // We just upsert one document with now() using a script + IndexRequest indexRequest = new IndexRequest("test", "type1", "2") + .source(jsonBuilder().startObject().field("foo", "bar").endObject()) + .ttl(providedTTLValue); + + { + UpdateRequest updateRequest = new UpdateRequest("test", "type1", "2") + .upsert(indexRequest) + .script(new Script("ctx._source.update_timestamp = ctx._now", ScriptType.INLINE, "mock", Collections.emptyMap())) + .scriptedUpsert(true); + long nowInMillis = randomPositiveLong(); + // We simulate that the document is not existing yet + GetResult getResult = new GetResult("test", "type1", "2", 0, false, null, null); + UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis); + Streamable action = result.action(); + assertThat(action, instanceOf(IndexRequest.class)); + IndexRequest indexAction = (IndexRequest) action; + assertEquals(indexAction.sourceAsMap().get("update_timestamp"), nowInMillis); + } + { + UpdateRequest updateRequest = new UpdateRequest("test", "type1", "2") + .upsert(indexRequest) + .script(new Script("ctx._timestamp = ctx._now", ScriptType.INLINE, "mock", Collections.emptyMap())) + .scriptedUpsert(true); + long nowInMillis = randomPositiveLong(); + // We simulate that the document is not existing yet + GetResult getResult = new GetResult("test", "type1", "2", 0, true, new BytesArray("{}"), null); + UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis); + Streamable action = result.action(); + assertThat(action, instanceOf(IndexRequest.class)); + IndexRequest indexAction = (IndexRequest) action; + assertEquals(indexAction.timestamp(), Long.toString(nowInMillis)); + } + } } diff --git a/modules/lang-painless/src/test/resources/rest-api-spec/test/plan_a/25_script_upsert.yaml b/modules/lang-painless/src/test/resources/rest-api-spec/test/plan_a/25_script_upsert.yaml index 2adf0de747ffc..3be567f2acbbb 100644 --- a/modules/lang-painless/src/test/resources/rest-api-spec/test/plan_a/25_script_upsert.yaml +++ b/modules/lang-painless/src/test/resources/rest-api-spec/test/plan_a/25_script_upsert.yaml @@ -63,4 +63,24 @@ - match: { _source.foo: xxx } + - do: + update: + index: test_1 + type: test + id: 3 + body: + script: + inline: "ctx._source.has_now = ctx._now > 0" + lang: "painless" + upsert: { has_now: false } + scripted_upsert: true + + - do: + get: + index: test_1 + type: test + id: 3 + + - match: { _source.has_now: true } +