Skip to content

Commit 45c1ab0

Browse files
committed
Versioning, closes elastic#594.
1 parent 9335b3a commit 45c1ab0

File tree

47 files changed

+1528
-202
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1528
-202
lines changed

.idea/dictionaries/kimchy.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/common/lucene/uidscan/LuceneUidScanBenchmark.java

Lines changed: 6 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,20 @@
1919

2020
package org.elasticsearch.benchmark.common.lucene.uidscan;
2121

22-
import org.apache.lucene.analysis.TokenFilter;
23-
import org.apache.lucene.analysis.TokenStream;
24-
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
25-
import org.apache.lucene.document.AbstractField;
2622
import org.apache.lucene.document.Document;
27-
import org.apache.lucene.document.Field;
28-
import org.apache.lucene.index.*;
23+
import org.apache.lucene.index.IndexReader;
24+
import org.apache.lucene.index.IndexWriter;
25+
import org.apache.lucene.index.Term;
26+
import org.apache.lucene.index.TermPositions;
2927
import org.apache.lucene.store.FSDirectory;
3028
import org.elasticsearch.common.Numbers;
3129
import org.elasticsearch.common.StopWatch;
32-
import org.elasticsearch.common.io.FastStringReader;
3330
import org.elasticsearch.common.lucene.Lucene;
31+
import org.elasticsearch.common.lucene.uid.UidField;
3432
import org.elasticsearch.common.unit.SizeValue;
3533
import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
3634

3735
import java.io.File;
38-
import java.io.IOException;
39-
import java.io.Reader;
4036
import java.util.concurrent.CountDownLatch;
4137

4238
/**
@@ -59,7 +55,7 @@ public static void main(String[] args) throws Exception {
5955
System.out.println("Indexing " + INDEX_COUNT + " docs...");
6056
for (long i = startUid; i < LIMIT; i++) {
6157
Document doc = new Document();
62-
doc.add(new UidField(Long.toString(i), i));
58+
doc.add(new UidField("_uid", Long.toString(i), i));
6359
writer.addDocument(doc);
6460
}
6561
System.out.println("Done indexing, took " + watch.stop().lastTaskTime());
@@ -104,55 +100,4 @@ public static void main(String[] args) throws Exception {
104100
watch.stop();
105101
System.out.println("Scanned in " + watch.totalTime() + " TP Seconds " + ((SCAN_COUNT * NUMBER_OF_THREADS) / watch.totalTime().secondsFrac()));
106102
}
107-
108-
109-
public static class UidField extends AbstractField {
110-
111-
private final String uid;
112-
113-
private final long version;
114-
115-
public UidField(String uid, long version) {
116-
super("_uid", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.NO);
117-
this.uid = uid;
118-
this.version = version;
119-
}
120-
121-
@Override public String stringValue() {
122-
return uid;
123-
}
124-
125-
@Override public Reader readerValue() {
126-
return null;
127-
}
128-
129-
@Override public TokenStream tokenStreamValue() {
130-
try {
131-
return new UidPayloadTokenStream(Lucene.KEYWORD_ANALYZER.reusableTokenStream("_uid", new FastStringReader(uid)), version);
132-
} catch (IOException e) {
133-
throw new RuntimeException("failed to create token stream", e);
134-
}
135-
}
136-
}
137-
138-
public static class UidPayloadTokenStream extends TokenFilter {
139-
140-
private final PayloadAttribute payloadAttribute;
141-
142-
private final long version;
143-
144-
public UidPayloadTokenStream(TokenStream input, long version) {
145-
super(input);
146-
this.version = version;
147-
payloadAttribute = addAttribute(PayloadAttribute.class);
148-
}
149-
150-
@Override public boolean incrementToken() throws IOException {
151-
if (!input.incrementToken()) {
152-
return false;
153-
}
154-
payloadAttribute.setPayload(new Payload(Numbers.longToBytes(version)));
155-
return true;
156-
}
157-
}
158103
}

modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public SimpleEngineBenchmark build() {
166166
.add(field("content", contentItem)).build();
167167
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
168168
if (create) {
169-
engine.create(new Engine.Create(pDoc));
169+
engine.create(new Engine.Create(new Term("_id", sId), pDoc));
170170
} else {
171171
engine.index(new Engine.Index(new Term("_id", sId), pDoc));
172172
}
@@ -280,7 +280,7 @@ private class WriterThread implements Runnable {
280280
.add(field("content", content(id))).build();
281281
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
282282
if (create) {
283-
engine.create(new Engine.Create(pDoc));
283+
engine.create(new Engine.Create(new Term("_id", sId), pDoc));
284284
} else {
285285
engine.index(new Engine.Index(new Term("_id", sId), pDoc));
286286
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,21 @@ public String getId() {
213213
return id();
214214
}
215215

216+
/**
217+
* The version of the action.
218+
*/
219+
public long version() {
220+
if (failure != null) {
221+
return -1;
222+
}
223+
if (response instanceof IndexResponse) {
224+
return ((IndexResponse) response).version();
225+
} else if (response instanceof DeleteResponse) {
226+
return ((DeleteResponse) response).version();
227+
}
228+
return -1;
229+
}
230+
216231
/**
217232
* The actual response ({@link IndexResponse} or {@link DeleteResponse}). <tt>null</tt> in
218233
* case of failure.

modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe)
112112
String routing = null;
113113
String parent = null;
114114
String opType = null;
115+
long version = 0;
115116

116117
String currentFieldName = null;
117118
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@@ -130,6 +131,8 @@ public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe)
130131
parent = parser.text();
131132
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
132133
opType = parser.text();
134+
} else if ("_version".equals(currentFieldName)) {
135+
version = parser.longValue();
133136
}
134137
}
135138
}
@@ -144,15 +147,15 @@ public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe)
144147
// order is important, we set parent after routing, so routing will be set to parent if not set explicitly
145148
if ("index".equals(action)) {
146149
if (opType == null) {
147-
add(new IndexRequest(index, type, id).routing(routing).parent(parent)
150+
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
148151
.source(data, from, nextMarker - from, contentUnsafe));
149152
} else {
150-
add(new IndexRequest(index, type, id).routing(routing).parent(parent)
153+
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
151154
.create("create".equals(opType))
152155
.source(data, from, nextMarker - from, contentUnsafe));
153156
}
154157
} else if ("create".equals(action)) {
155-
add(new IndexRequest(index, type, id).routing(routing).parent(parent)
158+
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
156159
.create(true)
157160
.source(data, from, nextMarker - from, contentUnsafe));
158161
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
120120
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
121121
.routing(indexRequest.routing()).parent(indexRequest.parent());
122122
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
123-
ops[i] = indexShard.prepareIndex(sourceToParse);
123+
ops[i] = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
124124
} else {
125-
ops[i] = indexShard.prepareCreate(sourceToParse);
125+
ops[i] = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
126126
}
127127
} catch (Exception e) {
128128
if (logger.isDebugEnabled()) {
@@ -134,7 +134,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
134134
} else if (item.request() instanceof DeleteRequest) {
135135
DeleteRequest deleteRequest = (DeleteRequest) item.request();
136136
try {
137-
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id());
137+
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
138138
} catch (Exception e) {
139139
if (logger.isDebugEnabled()) {
140140
logger.debug("[" + shardRequest.request.index() + "][" + shardRequest.shardId + "]" + ": Failed to execute bulk item (delete) [" + deleteRequest + "]", e);
@@ -157,8 +157,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
157157
BulkItemRequest item = request.items()[i];
158158
if (item.request() instanceof IndexRequest) {
159159
IndexRequest indexRequest = (IndexRequest) item.request();
160+
long version;
160161
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
161162
Engine.Index engineIndex = (Engine.Index) ops[i];
163+
version = engineIndex.version();
162164
if (!processedTypes.contains(engineIndex.type())) {
163165
processedTypes.add(engineIndex.type());
164166
ParsedDocument doc = engineIndex.parsedDoc();
@@ -168,6 +170,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
168170
}
169171
} else {
170172
Engine.Create engineCreate = (Engine.Create) ops[i];
173+
version = engineCreate.version();
171174
if (!processedTypes.contains(engineCreate.type())) {
172175
processedTypes.add(engineCreate.type());
173176
ParsedDocument doc = engineCreate.parsedDoc();
@@ -176,21 +179,26 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
176179
}
177180
}
178181
}
182+
// update the version on request so it will happen on the replicas
183+
indexRequest.version(version);
179184
if (failures != null && failures[i] != null) {
180185
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
181186
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(failures[i])));
182187
} else {
183188
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
184-
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id()));
189+
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version));
185190
}
186191
} else if (item.request() instanceof DeleteRequest) {
187192
DeleteRequest deleteRequest = (DeleteRequest) item.request();
193+
Engine.Delete engineDelete = (Engine.Delete) ops[i];
194+
// update the version on request so it will happen on the replicas
195+
deleteRequest.version(engineDelete.version());
188196
if (failures != null && failures[i] != null) {
189197
responses[i] = new BulkItemResponse(item.id(), "delete",
190198
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(failures[i])));
191199
} else {
192200
responses[i] = new BulkItemResponse(item.id(), "delete",
193-
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()));
201+
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), engineDelete.version(), engineDelete.notFound()));
194202
}
195203
}
196204
}
@@ -209,17 +217,17 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
209217
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
210218
.routing(indexRequest.routing()).parent(indexRequest.parent());
211219
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
212-
ops[i] = indexShard.prepareIndex(sourceToParse);
220+
ops[i] = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
213221
} else {
214-
ops[i] = indexShard.prepareCreate(sourceToParse);
222+
ops[i] = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
215223
}
216224
} catch (Exception e) {
217225
// ignore, we are on backup
218226
}
219227
} else if (item.request() instanceof DeleteRequest) {
220228
DeleteRequest deleteRequest = (DeleteRequest) item.request();
221229
try {
222-
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id());
230+
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.REPLICA);
223231
} catch (Exception e) {
224232
// ignore, we are on backup
225233
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
5151
private String id;
5252
@Nullable private String routing;
5353
private boolean refresh;
54+
private long version;
5455

5556
/**
5657
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
@@ -197,6 +198,19 @@ public boolean refresh() {
197198
return this.refresh;
198199
}
199200

201+
/**
202+
* Sets the version, which will cause the delete operation to only be performed if a matching
203+
* version exists and no changes happened on the doc since then.
204+
*/
205+
public DeleteRequest version(long version) {
206+
this.version = version;
207+
return this;
208+
}
209+
210+
public long version() {
211+
return this.version;
212+
}
213+
200214
@Override public void readFrom(StreamInput in) throws IOException {
201215
super.readFrom(in);
202216
type = in.readUTF();
@@ -205,6 +219,7 @@ public boolean refresh() {
205219
routing = in.readUTF();
206220
}
207221
refresh = in.readBoolean();
222+
version = in.readLong();
208223
}
209224

210225
@Override public void writeTo(StreamOutput out) throws IOException {
@@ -218,6 +233,7 @@ public boolean refresh() {
218233
out.writeUTF(routing);
219234
}
220235
out.writeBoolean(refresh);
236+
out.writeLong(version);
221237
}
222238

223239
@Override public String toString() {

modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,20 @@ public class DeleteResponse implements ActionResponse, Streamable {
4141

4242
private String type;
4343

44+
private long version;
45+
46+
private boolean notFound;
47+
4448
public DeleteResponse() {
4549

4650
}
4751

48-
public DeleteResponse(String index, String type, String id) {
52+
public DeleteResponse(String index, String type, String id, long version, boolean notFound) {
4953
this.index = index;
5054
this.id = id;
5155
this.type = type;
56+
this.version = version;
57+
this.notFound = notFound;
5258
}
5359

5460
/**
@@ -93,15 +99,47 @@ public String getId() {
9399
return id;
94100
}
95101

102+
/**
103+
* The version of the delete operation.
104+
*/
105+
public long version() {
106+
return this.version;
107+
}
108+
109+
/**
110+
* The version of the delete operation.
111+
*/
112+
public long getVersion() {
113+
return this.version;
114+
}
115+
116+
/**
117+
* Returns <tt>true</tt> if there was no doc found to delete.
118+
*/
119+
public boolean notFound() {
120+
return notFound;
121+
}
122+
123+
/**
124+
* Returns <tt>true</tt> if there was no doc found to delete.
125+
*/
126+
public boolean isNotFound() {
127+
return notFound;
128+
}
129+
96130
@Override public void readFrom(StreamInput in) throws IOException {
97131
index = in.readUTF();
98132
id = in.readUTF();
99133
type = in.readUTF();
134+
version = in.readLong();
135+
notFound = in.readBoolean();
100136
}
101137

102138
@Override public void writeTo(StreamOutput out) throws IOException {
103139
out.writeUTF(index);
104140
out.writeUTF(id);
105141
out.writeUTF(type);
142+
out.writeLong(version);
143+
out.writeBoolean(notFound);
106144
}
107145
}

0 commit comments

Comments
 (0)