Skip to content

Commit fd78967

Browse files
author
Olivier Favre
committed
Provide more fields in the script context
Added: _index, _uid, _type, _id, _version, _routing, _parent, _timestamp and _ttl.
1 parent 643bca3 commit fd78967

File tree

3 files changed

+138
-3
lines changed

3 files changed

+138
-3
lines changed

README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,5 +113,35 @@ Additional general options in request body:
113113
* `routing`: Sets the routing that will be used to route the document to the relevant shard.
114114
* `timeout`: Timeout waiting for a shard to become available.
115115

116+
Context variables
117+
-----------------
118+
119+
The script has access to the following variables:
120+
121+
* `ctx`
122+
* `_index`
123+
* `_uid`
124+
* `_type`
125+
* `_id`
126+
* `_version`
127+
* `_source`
128+
* `_routing`
129+
* `_parent`
130+
* `_timestamp` (in milliseconds)
131+
* `_ttl` (in milliseconds)
132+
133+
### Output variables
134+
135+
You may update the following variables:
136+
137+
* `ctx`
138+
* `_timestamp`
139+
* `_ttl`
140+
141+
They are parsed as time values: either milliseconds since epoch, or a duration string like `"30m"`.
142+
If you wish to change the timestamp of your document, to make it more recent, either set a new value to `_timestamp` or set it to `null`.
143+
Otherwise the previous `_timestamp` is preserved.
144+
`_ttl` is preserved too if you don't change or remove it.
145+
116146

117147
[es#2230]: https://github.com/elasticsearch/elasticsearch/issues/2230

src/main/java/org/elasticsearch/action/updatebyquery/TransportShardUpdateByQueryAction.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
package org.elasticsearch.action.updatebyquery;
2121

2222
import org.apache.lucene.index.AtomicReaderContext;
23+
import org.apache.lucene.index.IndexableField;
2324
import org.apache.lucene.index.ReaderUtil;
24-
import org.apache.lucene.index.StoredFieldVisitor;
2525
import org.apache.lucene.index.Term;
2626
import org.elasticsearch.common.collect.Maps;
2727
import org.apache.lucene.document.Document;
@@ -88,6 +88,7 @@ public class TransportShardUpdateByQueryAction extends TransportAction<ShardUpda
8888
fieldsToLoad.add(RoutingFieldMapper.NAME);
8989
fieldsToLoad.add(ParentFieldMapper.NAME);
9090
fieldsToLoad.add(TTLFieldMapper.NAME);
91+
fieldsToLoad.add(TimestampFieldMapper.NAME);
9192
}
9293

9394
private final TransportShardBulkAction bulkAction;
@@ -344,16 +345,33 @@ private void fillBatch(DocIdSetIterator iterator, IndexReader indexReader, Shard
344345
private ActionRequest createRequest(ShardUpdateByQueryRequest request, Document document, AtomicReaderContext subReaderContext) {
345346
Uid uid = Uid.createUid(document.get(UidFieldMapper.NAME));
346347
Term tUid = new Term(UidFieldMapper.NAME, uid.toString());
347-
long version = version = UidField.loadVersion(subReaderContext, tUid);
348+
long version = UidField.loadVersion(subReaderContext, tUid);
348349
BytesReference _source = new BytesArray(document.getBinaryValue(SourceFieldMapper.NAME));
349350
String routing = document.get(RoutingFieldMapper.NAME);
350351
String parent = document.get(ParentFieldMapper.NAME);
352+
IndexableField optionalField;
353+
optionalField = document.getField(TimestampFieldMapper.NAME);
354+
Number originTimestamp = optionalField == null ? null : optionalField.numericValue();
355+
optionalField = document.getField(TTLFieldMapper.NAME);
356+
Number originTtl = optionalField == null ? null : optionalField.numericValue();
357+
if (originTtl != null && originTimestamp != null)
358+
// Unshift TTL from timestamp
359+
originTtl = originTtl.longValue() - originTimestamp.longValue();
351360

352361
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(_source, true);
353362
final XContentType updateSourceContentType = sourceAndContent.v1();
354363

355364
updateByQueryContext.scriptContext.clear();
365+
updateByQueryContext.scriptContext.put("_index", request.index());
366+
updateByQueryContext.scriptContext.put("_uid", uid.toString());
367+
updateByQueryContext.scriptContext.put("_type", uid.type());
368+
updateByQueryContext.scriptContext.put("_id", uid.id());
369+
updateByQueryContext.scriptContext.put("_version", version);
356370
updateByQueryContext.scriptContext.put("_source", sourceAndContent.v2());
371+
updateByQueryContext.scriptContext.put("_routing", routing);
372+
updateByQueryContext.scriptContext.put("_parent", parent);
373+
updateByQueryContext.scriptContext.put("_timestamp", originTimestamp);
374+
updateByQueryContext.scriptContext.put("_ttl", originTtl);
357375

358376
try {
359377
updateByQueryContext.executableScript.setNextVar("ctx", updateByQueryContext.scriptContext);
@@ -365,7 +383,15 @@ private ActionRequest createRequest(ShardUpdateByQueryRequest request, Document
365383
}
366384

367385
String operation = (String) updateByQueryContext.scriptContext.get("op");
368-
String timestamp = (String) updateByQueryContext.scriptContext.get("_timestamp");
386+
Object fetchedTimestamp = updateByQueryContext.scriptContext.get("_timestamp");
387+
String timestamp = null;
388+
if (fetchedTimestamp != null) {
389+
if (fetchedTimestamp instanceof String) {
390+
timestamp = String.valueOf(TimeValue.parseTimeValue((String) fetchedTimestamp, null).millis());
391+
} else {
392+
timestamp = fetchedTimestamp.toString();
393+
}
394+
}
369395
Object fetchedTTL = updateByQueryContext.scriptContext.get("_ttl");
370396
Long ttl = null;
371397
if (fetchedTTL != null) {

src/test/java/org/elasticsearch/test/integration/updatebyquery/UpdateByQueryTests.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
2424
import org.elasticsearch.action.bulk.BulkItemResponse;
2525
import org.elasticsearch.action.count.CountResponse;
26+
import org.elasticsearch.action.search.SearchResponse;
2627
import org.elasticsearch.action.updatebyquery.BulkResponseOption;
2728
import org.elasticsearch.action.updatebyquery.IndexUpdateByQueryResponse;
2829
import org.elasticsearch.action.updatebyquery.UpdateByQueryResponse;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.common.settings.Settings;
3334
import org.elasticsearch.common.xcontent.XContentFactory;
3435
import org.elasticsearch.index.query.FilterBuilders;
36+
import org.elasticsearch.search.SearchHit;
3537
import org.elasticsearch.test.integration.AbstractNodesTests;
3638
import org.testng.annotations.AfterClass;
3739
import org.testng.annotations.BeforeClass;
@@ -314,4 +316,81 @@ public void testUpdateByQuery_usingAliases() {
314316
assertThat(client.prepareGet("alias1", "type1", "4").execute().actionGet().isExists(), equalTo(false));
315317
}
316318

319+
@Test
320+
public void testUpdateByQuery_fields() throws Exception {
321+
createIndex("test");
322+
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
323+
assertThat(clusterHealth.isTimedOut(), equalTo(false));
324+
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
325+
326+
long timestamp = System.currentTimeMillis();
327+
client.prepareIndex()
328+
.setIndex("test")
329+
.setType("type1")
330+
.setId("id1")
331+
.setRouting("routing1")
332+
.setTimestamp(String.valueOf(timestamp))
333+
.setTTL(111211211)
334+
.setSource("field1", 1, "content", "foo")
335+
.execute().actionGet();
336+
client.admin().indices().prepareRefresh("test").setWaitForOperations(true).execute().actionGet();
337+
338+
CountResponse countResponse = client.prepareCount("test")
339+
.setQuery(termQuery("field1", 1).buildAsBytes())
340+
.execute()
341+
.actionGet();
342+
assertThat(countResponse.getCount(), equalTo(1L));
343+
344+
countResponse = client.prepareCount("test")
345+
.setQuery(termQuery("field1", 2).buildAsBytes())
346+
.execute()
347+
.actionGet();
348+
assertThat(countResponse.getCount(), equalTo(0L));
349+
350+
Map<String, Object> scriptParams = new HashMap<String, Object>();
351+
scriptParams.put("delim", "_");
352+
UpdateByQueryResponse response = updateByQueryClientWrapper.prepareUpdateByQuery()
353+
.setIndices("test")
354+
.setTypes("type1")
355+
.setIncludeBulkResponses(BulkResponseOption.ALL)
356+
.setScript("ctx._source.field1 += 1;\n"+
357+
"ctx._source.content = ctx._index" +
358+
" + delim + ctx._type" +
359+
" + delim + ctx._id" +
360+
" + delim + ctx._uid" +
361+
" + delim + ctx._parent" +
362+
" + delim + ctx._routing" +
363+
" + delim + ctx._timestamp" +
364+
" + delim + ctx._ttl" +
365+
" + delim + ctx._version" +
366+
" + delim + ctx._source.content;")
367+
.setScriptParams(scriptParams)
368+
.setQuery(matchAllQuery())
369+
.execute()
370+
.actionGet();
371+
372+
assertThat(response, notNullValue());
373+
assertThat(response.mainFailures().length, equalTo(0));
374+
assertThat(response.totalHits(), equalTo(1L));
375+
assertThat(response.updated(), equalTo(1L));
376+
assertThat(response.indexResponses().length, equalTo(1));
377+
assertThat(response.indexResponses()[0].countShardResponses(), equalTo(1L));
378+
assertThat(response.indexResponses()[0].failuresByShard().isEmpty(), equalTo(true));
379+
380+
client.admin().indices().prepareRefresh("test").execute().actionGet();
381+
countResponse = client.prepareCount("test")
382+
.setQuery(termQuery("field1", 2).buildAsBytes())
383+
.execute()
384+
.actionGet();
385+
assertThat(countResponse.getCount(), equalTo(1L));
386+
387+
SearchResponse searchResponse = client.prepareSearch("test").setQuery(matchAllQuery()).execute().actionGet();
388+
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
389+
for (SearchHit hit : searchResponse.getHits().getHits()) {
390+
assertThat(hit.getType(), equalTo("type1"));
391+
assertThat(hit.getId(), equalTo("id1"));
392+
assertThat((String)hit.getSource().get("content"), equalTo("test_type1_id1_type1#id1_null_routing1_"+timestamp+"_111211211_1_foo"));
393+
}
394+
}
395+
317396
}

0 commit comments

Comments
 (0)