Skip to content

Expose ctx._now in update scripts #20835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ <T extends ActionRequest> T request() {
}

private UpdateResult shardUpdateOperation(IndexMetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard, threadPool::estimatedTimeInMillis);
switch (translate.getResponseResult()) {
case CREATED:
case UPDATED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
final ShardId shardId = request.getShardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis);
switch (result.getResponseResult()) {
case CREATED:
IndexRequest upsertRequest = result.action();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongSupplier;

/**
* Helper for translating an update request to an index, delete request or update response.
Expand All @@ -73,18 +74,18 @@ public UpdateHelper(Settings settings, ScriptService scriptService) {
/**
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request, IndexShard indexShard) {
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME},
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE);
return prepare(indexShard.shardId(), request, getResult);
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
}

/**
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
@SuppressWarnings("unchecked")
protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult) {
protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) {
long getDateNS = System.nanoTime();
if (!getResult.isExists()) {
if (request.upsertRequest() == null && !request.docAsUpsert()) {
Expand All @@ -100,6 +101,7 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult
// Tell the script that this is a create and not an update
ctx.put("op", "create");
ctx.put("_source", upsertDoc);
ctx.put("_now", nowInMillis.getAsLong());
ctx = executeScript(request.script, ctx);
//Allow the script to set TTL using ctx._ttl
if (ttl == null) {
Expand Down Expand Up @@ -188,6 +190,7 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult
ctx.put("_timestamp", originalTimestamp);
ctx.put("_ttl", originalTtl);
ctx.put("_source", sourceAndContent.v2());
ctx.put("_now", nowInMillis.getAsLong());

ctx = executeScript(request.script, ctx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,25 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContextRegistry;
import org.elasticsearch.script.ScriptEngineRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.script.ScriptSettings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.arrayContaining;
Expand Down Expand Up @@ -184,9 +196,10 @@ public void testUpdateRequestWithTTL() throws Exception {
.doc(jsonBuilder().startObject().field("fooz", "baz").endObject())
.upsert(indexRequest);

long nowInMillis = randomPositiveLong();
// We simulate that the document is not existing yet
GetResult getResult = new GetResult("test", "type1", "1", 0, false, null, null);
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0),updateRequest, getResult);
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0),updateRequest, getResult, () -> nowInMillis);
Streamable action = result.action();
assertThat(action, instanceOf(IndexRequest.class));
IndexRequest indexAction = (IndexRequest) action;
Expand All @@ -203,7 +216,7 @@ public void testUpdateRequestWithTTL() throws Exception {

// We simulate that the document is not existing yet
getResult = new GetResult("test", "type1", "2", 0, false, null, null);
result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult);
result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis);
action = result.action();
assertThat(action, instanceOf(IndexRequest.class));
indexAction = (IndexRequest) action;
Expand Down Expand Up @@ -276,4 +289,70 @@ public void testFetchSourceParsing() throws Exception {
assertThat(request.fetchSource().includes()[0], equalTo("path.inner.*"));
assertThat(request.fetchSource().excludes()[0], equalTo("another.inner.*"));
}

public void testNowInScript() throws IOException {
Path genericConfigFolder = createTempDir();
Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(Environment.PATH_CONF_SETTING.getKey(), genericConfigFolder)
.build();
Environment environment = new Environment(baseSettings);
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
scripts.put("ctx._source.update_timestamp = ctx._now",
(vars) -> {
Map<String, Object> ctx = (Map) vars.get("ctx");
Map<String, Object> source = (Map) ctx.get("_source");
source.put("update_timestamp", ctx.get("_now"));
return null;});
scripts.put("ctx._timestamp = ctx._now",
(vars) -> {
Map<String, Object> ctx = (Map) vars.get("ctx");
ctx.put("_timestamp", ctx.get("_now"));
return null;});
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList());
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(new MockScriptEngine("mock",
scripts)));

ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
ScriptService scriptService = new ScriptService(baseSettings, environment,
new ResourceWatcherService(baseSettings, null), scriptEngineRegistry, scriptContextRegistry, scriptSettings);
TimeValue providedTTLValue = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl");
Settings settings = settings(Version.CURRENT).build();

UpdateHelper updateHelper = new UpdateHelper(settings, scriptService);

// We just upsert one document with now() using a script
IndexRequest indexRequest = new IndexRequest("test", "type1", "2")
.source(jsonBuilder().startObject().field("foo", "bar").endObject())
.ttl(providedTTLValue);

{
UpdateRequest updateRequest = new UpdateRequest("test", "type1", "2")
.upsert(indexRequest)
.script(new Script("ctx._source.update_timestamp = ctx._now", ScriptType.INLINE, "mock", Collections.emptyMap()))
.scriptedUpsert(true);
long nowInMillis = randomPositiveLong();
// We simulate that the document is not existing yet
GetResult getResult = new GetResult("test", "type1", "2", 0, false, null, null);
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis);
Streamable action = result.action();
assertThat(action, instanceOf(IndexRequest.class));
IndexRequest indexAction = (IndexRequest) action;
assertEquals(indexAction.sourceAsMap().get("update_timestamp"), nowInMillis);
}
{
UpdateRequest updateRequest = new UpdateRequest("test", "type1", "2")
.upsert(indexRequest)
.script(new Script("ctx._timestamp = ctx._now", ScriptType.INLINE, "mock", Collections.emptyMap()))
.scriptedUpsert(true);
long nowInMillis = randomPositiveLong();
// We simulate that the document is not existing yet
GetResult getResult = new GetResult("test", "type1", "2", 0, true, new BytesArray("{}"), null);
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis);
Streamable action = result.action();
assertThat(action, instanceOf(IndexRequest.class));
IndexRequest indexAction = (IndexRequest) action;
assertEquals(indexAction.timestamp(), Long.toString(nowInMillis));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,24 @@

- match: { _source.foo: xxx }

- do:
update:
index: test_1
type: test
id: 3
body:
script:
inline: "ctx._source.has_now = ctx._now > 0"
lang: "painless"
upsert: { has_now: false }
scripted_upsert: true

- do:
get:
index: test_1
type: test
id: 3

- match: { _source.has_now: true }