Skip to content

Commit 6d9d61c

Browse files
Merge pull request elastic#6 from grcevski/spacetime_transactions
Chaining transaction ops
2 parents d8c046c + 816ab60 commit 6d9d61c

File tree

9 files changed

+239
-134
lines changed

9 files changed

+239
-134
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.index.mapper.SourceToParse;
5252
import org.elasticsearch.index.seqno.SequenceNumbers;
5353
import org.elasticsearch.index.shard.IndexShard;
54+
import org.elasticsearch.index.shard.ShardTransactionRegistry;
5455
import org.elasticsearch.index.translog.Translog;
5556
import org.elasticsearch.indices.ExecutorSelector;
5657
import org.elasticsearch.indices.IndicesService;
@@ -64,6 +65,7 @@
6465
import java.io.IOException;
6566
import java.util.Arrays;
6667
import java.util.Map;
68+
import java.util.Set;
6769
import java.util.concurrent.Executor;
6870
import java.util.function.Consumer;
6971
import java.util.function.LongSupplier;
@@ -79,6 +81,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
7981

8082
private final UpdateHelper updateHelper;
8183
private final MappingUpdatedAction mappingUpdatedAction;
84+
private static final ShardTransactionRegistry transactionRegistry = new ShardTransactionRegistry();
8285

8386
@Inject
8487
public TransportShardBulkAction(
@@ -186,10 +189,12 @@ public static void performOnPrimary(
186189

187190
@Override
188191
protected void doRun() throws Exception {
189-
String uid = UUIDs.base64UUID();
190-
long transactionId = -1L;
192+
TxID txID1 = TxID.create();
193+
Translog.Location[] transactionId = new Translog.Location[1];
191194
try {
192-
transactionId = primary.startTransaction(uid);
195+
transactionId[0] = primary.startTransaction(txID1.id());
196+
transactionRegistry.registerTransaction(txID1, Set.of(transactionId[0].id()));
197+
193198
while (context.hasMoreOperationsToExecute()) {
194199
if (executeBulkItemRequest(
195200
context,
@@ -207,12 +212,12 @@ protected void doRun() throws Exception {
207212
assert context.isInitial(); // either completed and moved to next or reset
208213
}
209214

210-
primary.commitTransaction(uid, transactionId);
215+
primary.commitTransaction(transactionId);
211216
} catch (Exception x) {
212217
logger.warn("Encountered an error while executing bulk transaction", x);
213-
primary.rollbackTransaction(uid, transactionId);
218+
primary.rollbackTransaction(transactionId);
214219
} finally {
215-
primary.closeTransaction(uid, transactionId);
220+
primary.closeTransaction(transactionId);
216221
}
217222
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
218223
// We're done, there's no more operations to execute so we resolve the wrapped listener
@@ -282,7 +287,7 @@ static boolean executeBulkItemRequest(
282287
mappingUpdater,
283288
waitForMappingUpdate,
284289
itemDoneListener,
285-
IndexShard.NO_TRANSACTION_ID
290+
new Translog.Location[] {IndexShard.NO_TRANSACTION_ID}
286291
);
287292
}
288293

@@ -298,7 +303,7 @@ static boolean executeBulkItemRequest(
298303
MappingUpdatePerformer mappingUpdater,
299304
Consumer<ActionListener<Void>> waitForMappingUpdate,
300305
ActionListener<Void> itemDoneListener,
301-
long transactionId
306+
Translog.Location[] transactionId
302307
) throws Exception {
303308
final DocWriteRequest.OpType opType = context.getCurrent().opType();
304309

@@ -344,7 +349,7 @@ static boolean executeBulkItemRequest(
344349
request.versionType(),
345350
request.ifSeqNo(),
346351
request.ifPrimaryTerm(),
347-
transactionId
352+
transactionId[0]
348353
);
349354
} else {
350355
final IndexRequest request = context.getRequestToExecute();
@@ -363,9 +368,14 @@ static boolean executeBulkItemRequest(
363368
request.ifPrimaryTerm(),
364369
request.getAutoGeneratedTimestamp(),
365370
request.isRetry(),
366-
transactionId
371+
transactionId[0]
367372
);
368373
}
374+
375+
if (result.getTranslogLocation() != null) {
376+
transactionId[0] = result.getTranslogLocation();
377+
}
378+
369379
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
370380

371381
try {
@@ -564,7 +574,8 @@ protected int replicaOperationCount(BulkShardRequest request) {
564574
return request.items().length;
565575
}
566576

567-
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, long transactionId) throws Exception {
577+
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, Translog.Location transactionId)
578+
throws Exception {
568579
Translog.Location location = null;
569580
for (int i = 0; i < request.items().length; i++) {
570581
final BulkItemRequest item = request.items()[i];
@@ -604,7 +615,7 @@ private static Engine.Result performOpOnReplica(
604615
DocWriteResponse primaryResponse,
605616
DocWriteRequest<?> docWriteRequest,
606617
IndexShard replica,
607-
long transactionId
618+
Translog.Location transactionId
608619
) throws Exception {
609620
final Engine.Result result;
610621
switch (docWriteRequest.opType()) {

server/src/main/java/org/elasticsearch/action/bulk/TxID.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,8 @@ public int hashCode() {
5959
public String toString() {
6060
return "[tx=" + id + "]";
6161
}
62+
63+
public String id() {
64+
return id;
65+
}
6266
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -321,13 +321,13 @@ public Condition newCondition() {
321321
}
322322
}
323323

324-
public abstract long startTransaction(String id) throws IOException;
324+
public abstract Translog.Location startTransaction(String id) throws IOException;
325325

326-
public abstract boolean commitTransaction(String id, long transactionId) throws IOException;
326+
public abstract Translog.Location commitTransaction(Translog.Location prevId) throws IOException;
327327

328-
public abstract boolean rollbackTransaction(String id, long transactionId) throws IOException;
328+
public abstract Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException;
329329

330-
public abstract boolean closeTransaction(String id, long transactionId) throws IOException;
330+
public abstract Translog.Location closeTransaction(Translog.Location prevId) throws IOException;
331331

332332
/**
333333
* Perform document index operation on the engine
@@ -1372,7 +1372,7 @@ public static class Index extends Operation {
13721372
private final boolean isRetry;
13731373
private final long ifSeqNo;
13741374
private final long ifPrimaryTerm;
1375-
private final long transactionId;
1375+
private final Translog.Location transactionId;
13761376

13771377
public Index(
13781378
Term uid,
@@ -1401,7 +1401,7 @@ public Index(
14011401
isRetry,
14021402
ifSeqNo,
14031403
ifPrimaryTerm,
1404-
-1L
1404+
new Translog.Location(0, 0, 0)
14051405
);
14061406
}
14071407

@@ -1418,7 +1418,7 @@ public Index(
14181418
boolean isRetry,
14191419
long ifSeqNo,
14201420
long ifPrimaryTerm,
1421-
long transactionId
1421+
Translog.Location transactionId
14221422
) {
14231423
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
14241424
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
@@ -1511,7 +1511,7 @@ public long getIfPrimaryTerm() {
15111511
return ifPrimaryTerm;
15121512
}
15131513

1514-
public long getTransactionId() {
1514+
public Translog.Location getTransactionId() {
15151515
return transactionId;
15161516
}
15171517
}
@@ -1521,7 +1521,7 @@ public static class Delete extends Operation {
15211521
private final String id;
15221522
private final long ifSeqNo;
15231523
private final long ifPrimaryTerm;
1524-
private final long transactionId;
1524+
private final Translog.Location transactionId;
15251525

15261526
public Delete(
15271527
String id,
@@ -1535,7 +1535,8 @@ public Delete(
15351535
long ifSeqNo,
15361536
long ifPrimaryTerm
15371537
) {
1538-
this(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm, -1L);
1538+
this(id, uid, seqNo, primaryTerm, version, versionType, origin,
1539+
startTime, ifSeqNo, ifPrimaryTerm, new Translog.Location(0, 0, 0));
15391540
}
15401541

15411542
public Delete(
@@ -1549,7 +1550,7 @@ public Delete(
15491550
long startTime,
15501551
long ifSeqNo,
15511552
long ifPrimaryTerm,
1552-
long transactionId
1553+
Translog.Location transactionId
15531554
) {
15541555
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
15551556
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
@@ -1616,7 +1617,7 @@ public long getIfPrimaryTerm() {
16161617
return ifPrimaryTerm;
16171618
}
16181619

1619-
public long getTransactionId() {
1620+
public Translog.Location getTransactionId() {
16201621
return transactionId;
16211622
}
16221623
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,8 @@ public IndexResult index(Index index) throws IOException {
982982
index.getAutoGeneratedIdTimestamp(),
983983
index.isRetry(),
984984
index.getIfSeqNo(),
985-
index.getIfPrimaryTerm()
985+
index.getIfPrimaryTerm(),
986+
index.getTransactionId()
986987
);
987988

988989
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
@@ -2078,27 +2079,48 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E
20782079
}
20792080

20802081
@Override
2081-
public long startTransaction(String id) throws IOException {
2082-
Translog.Location location = translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime()))));
2083-
return location.translogLocation;
2082+
public Translog.Location startTransaction(String id) throws IOException {
2083+
return translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime()))));
20842084
}
20852085

20862086
@Override
2087-
public boolean commitTransaction(String id, long transactionId) throws IOException {
2088-
translog.add(new Translog.TxCommit(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
2089-
return true;
2087+
public Translog.Location commitTransaction(Translog.Location prevId) throws IOException {
2088+
Translog.Location loc = prevId;
2089+
2090+
while (loc != null) {
2091+
Translog.Operation op = translog.readOperation(loc);
2092+
if (op == null) {
2093+
logger.error("Couldn't read translog location " + loc);
2094+
break;
2095+
}
2096+
2097+
logger.info("Committing op " + op);
2098+
2099+
if (op instanceof Translog.TransactionMember) {
2100+
loc = ((Translog.TransactionMember)op).getTransactionId();
2101+
} else if (op instanceof Translog.TxStart) {
2102+
break;
2103+
} else {
2104+
logger.error("Found op that doesn't have transaction loc?");
2105+
break;
2106+
}
2107+
}
2108+
2109+
return translog.add(
2110+
new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));
2111+
20902112
}
20912113

20922114
@Override
2093-
public boolean rollbackTransaction(String id, long transactionId) throws IOException {
2094-
translog.add(new Translog.TxRollback(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
2095-
return true;
2115+
public Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException {
2116+
return translog.add(
2117+
new Translog.TxRollback(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));
20962118
}
20972119

20982120
@Override
2099-
public boolean closeTransaction(String id, long transactionId) throws IOException {
2100-
translog.add(new Translog.TxClose(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
2101-
return true;
2121+
public Translog.Location closeTransaction(Translog.Location prevId) throws IOException {
2122+
return translog.add(
2123+
new Translog.TxClose(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));
21022124
}
21032125

21042126
private void pruneDeletedTombstones() {

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -528,23 +528,23 @@ public void skipTranslogRecovery() {}
528528
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {}
529529

530530
@Override
531-
public long startTransaction(String id) throws IOException {
532-
return 0;
531+
public Translog.Location startTransaction(String id) throws IOException {
532+
return null;
533533
}
534534

535535
@Override
536-
public boolean commitTransaction(String id, long transactionId) throws IOException {
537-
return false;
536+
public Translog.Location commitTransaction(Translog.Location transactionId) throws IOException {
537+
return null;
538538
}
539539

540540
@Override
541-
public boolean rollbackTransaction(String id, long transactionId) throws IOException {
542-
return false;
541+
public Translog.Location rollbackTransaction(Translog.Location transactionId) throws IOException {
542+
return null;
543543
}
544544

545545
@Override
546-
public boolean closeTransaction(String id, long transactionId) throws IOException {
547-
return false;
546+
public Translog.Location closeTransaction(Translog.Location transactionId) throws IOException {
547+
return null;
548548
}
549549

550550
@Override

0 commit comments

Comments
 (0)