Skip to content

Commit fa264fb

Browse files
authored
Introduce TransportGetFromTranslogAction (#95998)
This is a change broken off from the ongoing real-time GET PR (#93976). It is just an action that can be used to invoke the new `ShardGetService.getFromTranslog` in #95736. It will be used on the search shards as a first step to handle a real-time get. Relates #93976, ES-5537
1 parent 00afc5b commit fa264fb

File tree

4 files changed

+341
-0
lines changed

4 files changed

+341
-0
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.get;
10+
11+
import org.elasticsearch.action.ActionListenerResponseHandler;
12+
import org.elasticsearch.action.admin.indices.alias.Alias;
13+
import org.elasticsearch.action.get.TransportGetFromTranslogAction;
14+
import org.elasticsearch.action.get.TransportGetFromTranslogAction.Response;
15+
import org.elasticsearch.action.support.PlainActionFuture;
16+
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.test.ESIntegTestCase;
19+
import org.elasticsearch.threadpool.ThreadPool;
20+
import org.elasticsearch.transport.TransportService;
21+
22+
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
23+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.greaterThan;
26+
27+
public class GetFromTranslogActionIT extends ESIntegTestCase {
28+
public void testGetFromTranslog() throws Exception {
29+
assertAcked(
30+
prepareCreate("test").setMapping("field1", "type=keyword,store=true")
31+
.setSettings(
32+
Settings.builder()
33+
.put("index.refresh_interval", -1)
34+
// A GetFromTranslogAction runs only Stateless where there is only one active indexing shard.
35+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
36+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
37+
)
38+
.addAlias(new Alias("alias").writeIndex(randomFrom(true, false, null)))
39+
);
40+
ensureGreen();
41+
42+
var response = getFromTranslog(indexOrAlias(), "1");
43+
assertNull(response.getResult());
44+
// There hasn't been any switches from unsafe to safe map
45+
assertThat(response.segmentGeneration(), equalTo(-1L));
46+
47+
var indexResponse = client().prepareIndex("test")
48+
.setId("1")
49+
.setSource("field1", "value1")
50+
.setRefreshPolicy(RefreshPolicy.NONE)
51+
.get();
52+
response = getFromTranslog(indexOrAlias(), "1");
53+
assertNotNull(response.getResult());
54+
assertThat(response.getResult().isExists(), equalTo(true));
55+
assertThat(response.getResult().getVersion(), equalTo(indexResponse.getVersion()));
56+
assertThat(response.segmentGeneration(), equalTo(-1L));
57+
// Get followed by a delete should still return a result
58+
client().prepareDelete("test", "1").get();
59+
response = getFromTranslog(indexOrAlias(), "1");
60+
assertNotNull("get followed by a delete should still return a result", response.getResult());
61+
assertThat(response.getResult().isExists(), equalTo(false));
62+
assertThat(response.segmentGeneration(), equalTo(-1L));
63+
64+
indexResponse = client().prepareIndex("test").setSource("field1", "value2").get();
65+
response = getFromTranslog(indexOrAlias(), indexResponse.getId());
66+
assertNotNull(response.getResult());
67+
assertThat(response.getResult().isExists(), equalTo(true));
68+
assertThat(response.getResult().getVersion(), equalTo(indexResponse.getVersion()));
69+
assertThat(response.segmentGeneration(), equalTo(-1L));
70+
// After a refresh we should not be able to get from translog
71+
refresh("test");
72+
response = getFromTranslog(indexOrAlias(), indexResponse.getId());
73+
assertNull("after a refresh we should not be able to get from translog", response.getResult());
74+
assertThat(response.segmentGeneration(), equalTo(-1L));
75+
// After two refreshes the LiveVersionMap switches back to append-only and stops tracking IDs
76+
// Refreshing with empty LiveVersionMap doesn't cause the switch, see {@link LiveVersionMap.Maps#shouldInheritSafeAccess()}.
77+
client().prepareIndex("test").setSource("field1", "value3").get();
78+
refresh("test");
79+
refresh("test");
80+
// An optimized index operation marks the maps as unsafe
81+
client().prepareIndex("test").setSource("field1", "value4").get();
82+
response = getFromTranslog(indexOrAlias(), "non-existent");
83+
assertNull(response.getResult());
84+
assertThat(response.segmentGeneration(), greaterThan(0L));
85+
}
86+
87+
private Response getFromTranslog(String index, String id) throws Exception {
88+
var getRequest = client().prepareGet(index, id).request();
89+
var shardRouting = randomFrom(clusterService().state().routingTable().allShards("test"));
90+
var node = clusterService().state().nodes().get(shardRouting.currentNodeId());
91+
assertNotNull(node);
92+
TransportGetFromTranslogAction.Request request = new TransportGetFromTranslogAction.Request(getRequest, shardRouting.shardId());
93+
var transportService = internalCluster().getInstance(TransportService.class);
94+
PlainActionFuture<Response> response = new PlainActionFuture<>();
95+
transportService.sendRequest(
96+
node,
97+
TransportGetFromTranslogAction.NAME,
98+
request,
99+
new ActionListenerResponseHandler<>(response, Response::new, ThreadPool.Names.GET)
100+
);
101+
return response.get();
102+
}
103+
104+
private String indexOrAlias() {
105+
return randomBoolean() ? "test" : "alias";
106+
}
107+
}

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public TransportGetAction(
5959
);
6060
this.indicesService = indicesService;
6161
this.executorSelector = executorSelector;
62+
// register the internal TransportGetFromTranslogAction
63+
new TransportGetFromTranslogAction(transportService, indicesService, actionFilters);
6264
}
6365

6466
@Override
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.get;
10+
11+
import org.apache.lucene.store.AlreadyClosedException;
12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.ActionRequest;
15+
import org.elasticsearch.action.ActionRequestValidationException;
16+
import org.elasticsearch.action.ActionResponse;
17+
import org.elasticsearch.action.support.ActionFilters;
18+
import org.elasticsearch.action.support.HandledTransportAction;
19+
import org.elasticsearch.common.inject.Inject;
20+
import org.elasticsearch.common.io.stream.StreamInput;
21+
import org.elasticsearch.common.io.stream.StreamOutput;
22+
import org.elasticsearch.core.Nullable;
23+
import org.elasticsearch.index.IndexService;
24+
import org.elasticsearch.index.engine.Engine;
25+
import org.elasticsearch.index.engine.InternalEngine;
26+
import org.elasticsearch.index.get.GetResult;
27+
import org.elasticsearch.index.shard.IndexShard;
28+
import org.elasticsearch.index.shard.ShardId;
29+
import org.elasticsearch.indices.IndicesService;
30+
import org.elasticsearch.logging.LogManager;
31+
import org.elasticsearch.logging.Logger;
32+
import org.elasticsearch.tasks.Task;
33+
import org.elasticsearch.threadpool.ThreadPool;
34+
import org.elasticsearch.transport.TransportService;
35+
36+
import java.io.IOException;
37+
import java.util.Objects;
38+
39+
// TODO(ES-5727): add a retry mechanism to TransportGetFromTranslogAction
40+
public class TransportGetFromTranslogAction extends HandledTransportAction<
41+
TransportGetFromTranslogAction.Request,
42+
TransportGetFromTranslogAction.Response> {
43+
44+
public static final String NAME = "internal:data/read/get_from_translog";
45+
public static final Logger logger = LogManager.getLogger(TransportGetFromTranslogAction.class);
46+
47+
private final IndicesService indicesService;
48+
49+
@Inject
50+
public TransportGetFromTranslogAction(TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
51+
super(NAME, transportService, actionFilters, Request::new, ThreadPool.Names.GET);
52+
this.indicesService = indicesService;
53+
}
54+
55+
@Override
56+
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
57+
final GetRequest getRequest = request.getRequest();
58+
final ShardId shardId = request.shardId();
59+
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
60+
IndexShard indexShard = indexService.getShard(shardId.id());
61+
assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry();
62+
assert getRequest.realtime();
63+
ActionListener.completeWith(listener, () -> {
64+
var result = indexShard.getService()
65+
.getFromTranslog(
66+
getRequest.id(),
67+
getRequest.storedFields(),
68+
getRequest.realtime(),
69+
getRequest.version(),
70+
getRequest.versionType(),
71+
getRequest.fetchSourceContext(),
72+
getRequest.isForceSyntheticSource()
73+
);
74+
long segmentGeneration = -1;
75+
if (result == null) {
76+
Engine engine = indexShard.getEngineOrNull();
77+
if (engine == null) {
78+
throw new AlreadyClosedException("engine closed");
79+
}
80+
segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets();
81+
}
82+
return new Response(result, segmentGeneration);
83+
});
84+
}
85+
86+
public static class Request extends ActionRequest {
87+
88+
private final GetRequest getRequest;
89+
private final ShardId shardId;
90+
91+
public Request(GetRequest getRequest, ShardId shardId) {
92+
this.getRequest = Objects.requireNonNull(getRequest);
93+
this.shardId = Objects.requireNonNull(shardId);
94+
}
95+
96+
public Request(StreamInput in) throws IOException {
97+
super(in);
98+
getRequest = new GetRequest(in);
99+
shardId = new ShardId(in);
100+
}
101+
102+
@Override
103+
public void writeTo(StreamOutput out) throws IOException {
104+
super.writeTo(out);
105+
getRequest.writeTo(out);
106+
shardId.writeTo(out);
107+
}
108+
109+
public GetRequest getRequest() {
110+
return getRequest;
111+
}
112+
113+
public ShardId shardId() {
114+
return shardId;
115+
}
116+
117+
@Override
118+
public ActionRequestValidationException validate() {
119+
return null;
120+
}
121+
122+
@Override
123+
public String toString() {
124+
return "GetFromTranslogRequest{" + "getRequest=" + getRequest + ", shardId=" + shardId + "}";
125+
}
126+
}
127+
128+
public static class Response extends ActionResponse {
129+
@Nullable
130+
private final GetResult getResult;
131+
private final long segmentGeneration;
132+
133+
public Response(GetResult getResult, long segmentGeneration) {
134+
this.getResult = getResult;
135+
this.segmentGeneration = segmentGeneration;
136+
}
137+
138+
public Response(StreamInput in) throws IOException {
139+
super(in);
140+
segmentGeneration = in.readZLong();
141+
getResult = in.readOptionalWriteable(GetResult::new);
142+
}
143+
144+
@Override
145+
public void writeTo(StreamOutput out) throws IOException {
146+
out.writeZLong(segmentGeneration);
147+
out.writeOptionalWriteable(getResult);
148+
}
149+
150+
@Nullable
151+
public GetResult getResult() {
152+
return getResult;
153+
}
154+
155+
/**
156+
* The segment generation that the search shard should wait for before handling the real-time GET request locally.
157+
* -1 if the result is not null (i.e., the result is served from the indexing shard), or there hasn't simply been
158+
* any switches from unsafe to safe map in the LiveVersionMap (see {@link InternalEngine#getVersionFromMap(BytesRef)}).
159+
*/
160+
public long segmentGeneration() {
161+
return segmentGeneration;
162+
}
163+
164+
@Override
165+
public boolean equals(Object o) {
166+
if (this == o) return true;
167+
if (o instanceof Response == false) return false;
168+
Response other = (Response) o;
169+
return segmentGeneration == other.segmentGeneration && Objects.equals(getResult, other.getResult);
170+
}
171+
172+
@Override
173+
public int hashCode() {
174+
return Objects.hash(segmentGeneration, getResult);
175+
}
176+
177+
@Override
178+
public String toString() {
179+
return "Response{" + "getResult=" + getResult + ", segmentGeneration=" + segmentGeneration + "}";
180+
}
181+
}
182+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.get;
10+
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.index.get.GetResult;
13+
import org.elasticsearch.index.get.GetResultTests;
14+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
15+
import org.elasticsearch.xcontent.XContentType;
16+
17+
import java.io.IOException;
18+
19+
public class GetFromTranslogResponseSerializationTests extends AbstractWireSerializingTestCase<TransportGetFromTranslogAction.Response> {
20+
@Override
21+
protected Writeable.Reader<TransportGetFromTranslogAction.Response> instanceReader() {
22+
return TransportGetFromTranslogAction.Response::new;
23+
}
24+
25+
@Override
26+
protected TransportGetFromTranslogAction.Response createTestInstance() {
27+
return new TransportGetFromTranslogAction.Response(randomGetResult(), randomSegmentGeneration());
28+
}
29+
30+
@Override
31+
protected TransportGetFromTranslogAction.Response mutateInstance(TransportGetFromTranslogAction.Response instance) throws IOException {
32+
return randomBoolean()
33+
? new TransportGetFromTranslogAction.Response(
34+
instance.getResult(),
35+
randomValueOtherThan(instance.segmentGeneration(), this::randomSegmentGeneration)
36+
)
37+
: new TransportGetFromTranslogAction.Response(
38+
randomValueOtherThan(instance.getResult(), this::randomGetResult),
39+
instance.segmentGeneration()
40+
);
41+
}
42+
43+
private long randomSegmentGeneration() {
44+
return randomBoolean() ? -1L : randomNonNegativeLong();
45+
}
46+
47+
private GetResult randomGetResult() {
48+
return randomBoolean() ? null : GetResultTests.randomGetResult(randomFrom(XContentType.values())).v1();
49+
}
50+
}

0 commit comments

Comments
 (0)