Skip to content

Commit e83c728

Browse files
committed
Merge pull request #15475 from s1monw/beef_up_translog_tests
Beef up TranslogTests with concurrent fatal exceptions test
2 parents e084e50 + e5dc124 commit e83c728

File tree

2 files changed

+193
-55
lines changed

2 files changed

+193
-55
lines changed

core/src/main/java/org/elasticsearch/index/translog/Translog.java

+21-15
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public Translog(TranslogConfig config) throws IOException {
158158

159159
try {
160160
if (translogGeneration != null) {
161-
final Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
161+
final Checkpoint checkpoint = readCheckpoint();
162162
this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint);
163163
if (recoveredTranslogs.isEmpty()) {
164164
throw new IllegalStateException("at least one reader must be recovered");
@@ -421,13 +421,7 @@ public Location add(Operation operation) throws IOException {
421421
return location;
422422
}
423423
} catch (AlreadyClosedException | IOException ex) {
424-
if (current.getTragicException() != null) {
425-
try {
426-
close();
427-
} catch (Exception inner) {
428-
ex.addSuppressed(inner);
429-
}
430-
}
424+
closeOnTragicEvent(ex);
431425
throw ex;
432426
} catch (Throwable e) {
433427
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
@@ -507,13 +501,7 @@ public void sync() throws IOException {
507501
current.sync();
508502
}
509503
} catch (AlreadyClosedException | IOException ex) {
510-
if (current.getTragicException() != null) {
511-
try {
512-
close();
513-
} catch (Exception inner) {
514-
ex.addSuppressed(inner);
515-
}
516-
}
504+
closeOnTragicEvent(ex);
517505
throw ex;
518506
}
519507
}
@@ -545,10 +533,23 @@ public boolean ensureSynced(Location location) throws IOException {
545533
ensureOpen();
546534
return current.syncUpTo(location.translogLocation + location.size);
547535
}
536+
} catch (AlreadyClosedException | IOException ex) {
537+
closeOnTragicEvent(ex);
538+
throw ex;
548539
}
549540
return false;
550541
}
551542

543+
private void closeOnTragicEvent(Throwable ex) {
544+
if (current.getTragicException() != null) {
545+
try {
546+
close();
547+
} catch (Exception inner) {
548+
ex.addSuppressed(inner);
549+
}
550+
}
551+
}
552+
552553
/**
553554
* return stats
554555
*/
@@ -1433,4 +1434,9 @@ public Throwable getTragicException() {
14331434
return current.getTragicException();
14341435
}
14351436

1437+
/** Reads and returns the current checkpoint */
1438+
final Checkpoint readCheckpoint() throws IOException {
1439+
return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
1440+
}
1441+
14361442
}

core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

+172-40
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.mockfile.FilterFileChannel;
2626
import org.apache.lucene.store.AlreadyClosedException;
2727
import org.apache.lucene.store.ByteArrayDataOutput;
28+
import org.apache.lucene.store.MockDirectoryWrapper;
2829
import org.apache.lucene.util.IOUtils;
2930
import org.apache.lucene.util.LineFileDocs;
3031
import org.apache.lucene.util.LuceneTestCase;
@@ -62,6 +63,7 @@
6263
import java.util.concurrent.atomic.AtomicInteger;
6364
import java.util.concurrent.atomic.AtomicLong;
6465
import java.util.concurrent.atomic.AtomicReference;
66+
import java.util.function.Predicate;
6567

6668
import static org.hamcrest.Matchers.*;
6769

@@ -1242,11 +1244,11 @@ private static class TranslogThread extends Thread {
12421244
private final CountDownLatch downLatch;
12431245
private final int opsPerThread;
12441246
private final int threadId;
1245-
private final BlockingQueue<LocationOperation> writtenOperations;
1247+
private final Collection<LocationOperation> writtenOperations;
12461248
private final Throwable[] threadExceptions;
12471249
private final Translog translog;
12481250

1249-
public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, BlockingQueue<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
1251+
public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, Collection<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
12501252
this.translog = translog;
12511253
this.downLatch = downLatch;
12521254
this.opsPerThread = opsPerThread;
@@ -1276,76 +1278,58 @@ public void run() {
12761278
throw new ElasticsearchException("not supported op type");
12771279
}
12781280

1279-
Translog.Location loc = translog.add(op);
1281+
Translog.Location loc = add(op);
12801282
writtenOperations.add(new LocationOperation(op, loc));
1283+
afterAdd();
12811284
}
12821285
} catch (Throwable t) {
12831286
threadExceptions[threadId] = t;
12841287
}
12851288
}
1289+
1290+
protected Translog.Location add(Translog.Operation op) throws IOException {
1291+
return translog.add(op);
1292+
}
1293+
1294+
protected void afterAdd() throws IOException {}
12861295
}
12871296

12881297
public void testFailFlush() throws IOException {
12891298
Path tempDir = createTempDir();
1290-
final AtomicBoolean simulateDiskFull = new AtomicBoolean();
1299+
final AtomicBoolean fail = new AtomicBoolean();
12911300
TranslogConfig config = getTranslogConfig(tempDir);
1292-
Translog translog = new Translog(config) {
1293-
@Override
1294-
TranslogWriter.ChannelFactory getChannelFactory() {
1295-
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
1296-
1297-
return new TranslogWriter.ChannelFactory() {
1298-
@Override
1299-
public FileChannel open(Path file) throws IOException {
1300-
FileChannel channel = factory.open(file);
1301-
return new FilterFileChannel(channel) {
1302-
1303-
@Override
1304-
public int write(ByteBuffer src) throws IOException {
1305-
if (simulateDiskFull.get()) {
1306-
if (src.limit() > 1) {
1307-
final int pos = src.position();
1308-
final int limit = src.limit();
1309-
src.limit(limit / 2);
1310-
super.write(src);
1311-
src.position(pos);
1312-
src.limit(limit);
1313-
throw new IOException("__FAKE__ no space left on device");
1314-
}
1315-
}
1316-
return super.write(src);
1317-
}
1318-
};
1319-
}
1320-
};
1321-
}
1322-
};
1301+
Translog translog = getFailableTranslog(fail, config);
13231302

13241303
List<Translog.Location> locations = new ArrayList<>();
13251304
int opsSynced = 0;
1326-
int opsAdded = 0;
13271305
boolean failed = false;
13281306
while(failed == false) {
13291307
try {
13301308
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
1331-
opsAdded++;
13321309
translog.sync();
13331310
opsSynced++;
1311+
} catch (MockDirectoryWrapper.FakeIOException ex) {
1312+
failed = true;
1313+
assertFalse(translog.isOpen());
13341314
} catch (IOException ex) {
13351315
failed = true;
13361316
assertFalse(translog.isOpen());
13371317
assertEquals("__FAKE__ no space left on device", ex.getMessage());
13381318
}
1339-
simulateDiskFull.set(randomBoolean());
1319+
fail.set(randomBoolean());
13401320
}
1341-
simulateDiskFull.set(false);
1321+
fail.set(false);
13421322
if (randomBoolean()) {
13431323
try {
13441324
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
13451325
fail("we are already closed");
13461326
} catch (AlreadyClosedException ex) {
13471327
assertNotNull(ex.getCause());
1348-
assertEquals(ex.getCause().getMessage(), "__FAKE__ no space left on device");
1328+
if (ex.getCause() instanceof MockDirectoryWrapper.FakeIOException) {
1329+
assertNull(ex.getCause().getMessage());
1330+
} else {
1331+
assertEquals(ex.getCause().getMessage(), "__FAKE__ no space left on device");
1332+
}
13491333
}
13501334

13511335
}
@@ -1402,4 +1386,152 @@ public void testTranslogOpsCountIsCorrect() throws IOException {
14021386
}
14031387
}
14041388
}
1389+
1390+
public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
1391+
Path tempDir = createTempDir();
1392+
final AtomicBoolean fail = new AtomicBoolean(false);
1393+
1394+
TranslogConfig config = getTranslogConfig(tempDir);
1395+
Translog translog = getFailableTranslog(fail, config);
1396+
1397+
final int threadCount = randomIntBetween(1, 5);
1398+
Thread[] threads = new Thread[threadCount];
1399+
final Throwable[] threadExceptions = new Throwable[threadCount];
1400+
final CountDownLatch downLatch = new CountDownLatch(1);
1401+
final CountDownLatch added = new CountDownLatch(randomIntBetween(10, 100));
1402+
List<LocationOperation> writtenOperations = Collections.synchronizedList(new ArrayList<>());
1403+
for (int i = 0; i < threadCount; i++) {
1404+
final int threadId = i;
1405+
threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, threadExceptions) {
1406+
@Override
1407+
protected Translog.Location add(Translog.Operation op) throws IOException {
1408+
Translog.Location add = super.add(op);
1409+
added.countDown();
1410+
return add;
1411+
}
1412+
1413+
@Override
1414+
protected void afterAdd() throws IOException {
1415+
if (randomBoolean()) {
1416+
translog.sync();
1417+
}
1418+
}
1419+
};
1420+
threads[i].setDaemon(true);
1421+
threads[i].start();
1422+
}
1423+
downLatch.countDown();
1424+
added.await();
1425+
try (Translog.View view = translog.newView()) {
1426+
// this holds a reference to the current tlog channel such that it's not closed
1427+
// if we hit a tragic event. this is important to ensure that asserts inside the Translog#add doesn't trip
1428+
// otherwise our assertions here are off by one sometimes.
1429+
fail.set(true);
1430+
for (int i = 0; i < threadCount; i++) {
1431+
threads[i].join();
1432+
}
1433+
boolean atLeastOneFailed = false;
1434+
for (Throwable ex : threadExceptions) {
1435+
if (ex != null) {
1436+
atLeastOneFailed = true;
1437+
break;
1438+
}
1439+
}
1440+
if (atLeastOneFailed == false) {
1441+
try {
1442+
boolean syncNeeded = translog.syncNeeded();
1443+
translog.close();
1444+
assertFalse("should have failed if sync was needed", syncNeeded);
1445+
} catch (IOException ex) {
1446+
// boom now we failed
1447+
}
1448+
}
1449+
Collections.sort(writtenOperations, (a, b) -> a.location.compareTo(b.location));
1450+
assertFalse(translog.isOpen());
1451+
final Checkpoint checkpoint = Checkpoint.read(config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME));
1452+
Iterator<LocationOperation> iterator = writtenOperations.iterator();
1453+
while (iterator.hasNext()) {
1454+
LocationOperation next = iterator.next();
1455+
if (checkpoint.offset < (next.location.translogLocation + next.location.size)) {
1456+
// drop all that haven't been synced
1457+
iterator.remove();
1458+
}
1459+
}
1460+
config.setTranslogGeneration(translog.getGeneration());
1461+
try (Translog tlog = new Translog(config)) {
1462+
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
1463+
if (writtenOperations.size() != snapshot.estimatedTotalOperations()) {
1464+
for (int i = 0; i < threadCount; i++) {
1465+
if (threadExceptions[i] != null)
1466+
threadExceptions[i].printStackTrace();
1467+
}
1468+
}
1469+
assertEquals(writtenOperations.size(), snapshot.estimatedTotalOperations());
1470+
for (int i = 0; i < writtenOperations.size(); i++) {
1471+
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation);
1472+
Translog.Operation next = snapshot.next();
1473+
assertNotNull("operation " + i + " must be non-null", next);
1474+
assertEquals(next, writtenOperations.get(i).operation);
1475+
}
1476+
}
1477+
}
1478+
}
1479+
}
1480+
1481+
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
1482+
return new Translog(config) {
1483+
@Override
1484+
TranslogWriter.ChannelFactory getChannelFactory() {
1485+
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
1486+
1487+
return new TranslogWriter.ChannelFactory() {
1488+
@Override
1489+
public FileChannel open(Path file) throws IOException {
1490+
FileChannel channel = factory.open(file);
1491+
return new ThrowingFileChannel(fail, randomBoolean(), channel);
1492+
}
1493+
};
1494+
}
1495+
};
1496+
}
1497+
1498+
public static class ThrowingFileChannel extends FilterFileChannel {
1499+
private final AtomicBoolean fail;
1500+
private final boolean partialWrite;
1501+
1502+
public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, FileChannel delegate) {
1503+
super(delegate);
1504+
this.fail = fail;
1505+
this.partialWrite = partialWrite;
1506+
}
1507+
1508+
@Override
1509+
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
1510+
throw new UnsupportedOperationException();
1511+
}
1512+
1513+
@Override
1514+
public int write(ByteBuffer src, long position) throws IOException {
1515+
throw new UnsupportedOperationException();
1516+
}
1517+
1518+
1519+
public int write(ByteBuffer src) throws IOException {
1520+
if (fail.get()) {
1521+
if (partialWrite) {
1522+
if (src.limit() > 1) {
1523+
final int pos = src.position();
1524+
final int limit = src.limit();
1525+
src.limit(limit / 2);
1526+
super.write(src);
1527+
src.position(pos);
1528+
src.limit(limit);
1529+
throw new IOException("__FAKE__ no space left on device");
1530+
}
1531+
}
1532+
throw new MockDirectoryWrapper.FakeIOException();
1533+
}
1534+
return super.write(src);
1535+
}
1536+
}
14051537
}

0 commit comments

Comments
 (0)