Skip to content

Commit 8e9e2fb

Browse files
Merge pull request elastic#8 from henningandersen/spacetime_transactions_conflict_test
Added simple conflict test.
2 parents 84a09e1 + 94e894f commit 8e9e2fb

File tree

3 files changed

+78
-4
lines changed

3 files changed

+78
-4
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.action.bulk;
1010

1111
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.action.ActionFuture;
1213
import org.elasticsearch.action.ActionRequestValidationException;
1314
import org.elasticsearch.action.admin.indices.alias.Alias;
1415
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
@@ -19,10 +20,20 @@
1920
import org.elasticsearch.action.support.replication.ReplicationRequest;
2021
import org.elasticsearch.cluster.metadata.IndexMetadata;
2122
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
25+
import org.elasticsearch.core.Tuple;
2226
import org.elasticsearch.ingest.IngestTestPlugin;
2327
import org.elasticsearch.plugins.Plugin;
2428
import org.elasticsearch.rest.RestStatus;
2529
import org.elasticsearch.test.ESIntegTestCase;
30+
import org.elasticsearch.test.InternalSettingsPlugin;
31+
import org.elasticsearch.test.transport.MockTransportService;
32+
import org.elasticsearch.test.transport.StubbableTransport;
33+
import org.elasticsearch.transport.Transport;
34+
import org.elasticsearch.transport.TransportRequest;
35+
import org.elasticsearch.transport.TransportRequestOptions;
36+
import org.elasticsearch.transport.TransportService;
2637
import org.elasticsearch.xcontent.XContentBuilder;
2738
import org.elasticsearch.xcontent.XContentType;
2839

@@ -32,6 +43,10 @@
3243
import java.util.Collection;
3344
import java.util.Collections;
3445
import java.util.Map;
46+
import java.util.Set;
47+
import java.util.concurrent.BrokenBarrierException;
48+
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.CyclicBarrier;
3550
import java.util.concurrent.ExecutionException;
3651
import java.util.concurrent.atomic.AtomicBoolean;
3752
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,7 +66,7 @@
5166
public class BulkIntegrationIT extends ESIntegTestCase {
5267
@Override
5368
protected Collection<Class<? extends Plugin>> nodePlugins() {
54-
return Arrays.asList(IngestTestPlugin.class);
69+
return Arrays.asList(IngestTestPlugin.class, MockTransportService.TestPlugin.class);
5570
}
5671

5772
public void testBulkIndexCreatesMapping() throws Exception {
@@ -197,4 +212,64 @@ public void testDeleteIndexWhileIndexing() throws Exception {
197212
}
198213
}
199214

215+
// tests that we abandon one of two conflicting transactions.
216+
public void testPrepareConflict() throws Exception {
217+
int shards = between(1, 5);
218+
createIndex("test", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shards).build());
219+
String coordinating = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
220+
Iterable<TransportService> transportServiceIterable = internalCluster().getInstances(
221+
TransportService.class
222+
);
223+
CountDownLatch ready = new CountDownLatch(1);
224+
Set<TxID> txes = ConcurrentCollections.newConcurrentSet();
225+
// todo: only really need to do this on coordinator.
226+
transportServiceIterable.forEach(ts -> ((MockTransportService) ts).addSendBehavior(new StubbableTransport.SendRequestBehavior() {
227+
@Override
228+
public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
229+
if (action.startsWith(ShardPrepareCommitAction.NAME)) {
230+
txes.add(((ShardPrepareCommitRequest) request).txid());
231+
new Thread(() -> {
232+
try {
233+
ready.await();
234+
} catch (InterruptedException e) {
235+
fail();
236+
}
237+
try {
238+
connection.sendRequest(requestId, action, request, options);
239+
} catch (IOException e) {
240+
fail();
241+
}
242+
}).start();
243+
} else {
244+
connection.sendRequest(requestId, action, request, options);
245+
}
246+
}
247+
}));
248+
249+
ActionFuture<IndexResponse> future1 = client(coordinating).prepareIndex("test")
250+
.setId("1")
251+
.setSource(Map.of("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
252+
.execute();
253+
ActionFuture<IndexResponse> future2 = client(coordinating).prepareIndex("test")
254+
.setId("1")
255+
.setSource(Map.of("g" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
256+
.execute();
257+
assertBusy(() -> assertThat(txes.size(), equalTo(2)));
258+
259+
ready.countDown();
260+
Tuple<IndexResponse, Exception> response1 = resultOrException(future1);
261+
Tuple<IndexResponse, Exception> response2 = resultOrException(future2);
262+
263+
// one failure
264+
assertThat(response1.v2() != null, is(response2.v2() == null));
265+
266+
}
267+
268+
Tuple<IndexResponse, Exception> resultOrException(ActionFuture<IndexResponse> future) {
269+
try {
270+
return Tuple.tuple(future.actionGet(), null);
271+
} catch (Exception e) {
272+
return Tuple.tuple(null, e);
273+
}
274+
}
200275
}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,7 @@ private Void commitOrFail(Collection<ShardPrepareCommitResponse> responses) {
779779
for (ShardPrepareCommitResponse response : responses) {
780780
for (Map.Entry<TxID, Boolean> conflict : response.conflicts().entrySet()) {
781781
if (conflict.getValue() == false) {
782+
logger.info("aborting transaction due to other transaction " + conflict.getKey());
782783
throw new ElasticsearchException("conflicting transaction [{}] on shard", conflict.getKey());
783784
}
784785
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4139,9 +4139,7 @@ public void registerTransaction(TxID id, Set<String> keys) {
41394139
}
41404140

41414141
public Map<TxID, Boolean> prepareCommit(TxID txID) {
4142-
// todo: lookup in transaction table
4143-
transactionRegistry.prepare(txID);
4144-
return new HashMap<TxID, Boolean>();
4142+
return transactionRegistry.prepare(txID);
41454143
}
41464144

41474145

0 commit comments

Comments
 (0)