Skip to content

Add stats for time spent fetching data while searching snapshots #51866

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.IndexId;
Expand Down Expand Up @@ -134,15 +135,15 @@ public static class CacheIndexInputStats implements Writeable, ToXContentObject
private final Counter contiguousReads;
private final Counter nonContiguousReads;
private final Counter cachedBytesRead;
private final Counter cachedBytesWritten;
private final Counter directBytesRead;
private final TimedCounter cachedBytesWritten;
private final TimedCounter directBytesRead;

public CacheIndexInputStats(String fileName, long fileLength, long openCount, long innerCount, long closeCount,
Counter forwardSmallSeeks, Counter backwardSmallSeeks,
Counter forwardLargeSeeks, Counter backwardLargeSeeks,
Counter contiguousReads, Counter nonContiguousReads,
Counter cachedBytesRead, Counter cachedBytesWritten,
Counter directBytesRead) {
Counter cachedBytesRead, TimedCounter cachedBytesWritten,
TimedCounter directBytesRead) {
this.fileName = fileName;
this.fileLength = fileLength;
this.openCount = openCount;
Expand Down Expand Up @@ -172,8 +173,8 @@ public CacheIndexInputStats(String fileName, long fileLength, long openCount, lo
this.contiguousReads = new Counter(in);
this.nonContiguousReads = new Counter(in);
this.cachedBytesRead = new Counter(in);
this.cachedBytesWritten = new Counter(in);
this.directBytesRead = new Counter(in);
this.cachedBytesWritten = new TimedCounter(in);
this.directBytesRead = new TimedCounter(in);
}

@Override
Expand Down Expand Up @@ -243,11 +244,11 @@ public Counter getCachedBytesRead() {
return cachedBytesRead;
}

public Counter getCachedBytesWritten() {
public TimedCounter getCachedBytesWritten() {
return cachedBytesWritten;
}

public Counter getDirectBytesRead() {
public TimedCounter getDirectBytesRead() {
return directBytesRead;
}

Expand Down Expand Up @@ -347,18 +348,22 @@ public void writeTo(final StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field("count", count);
builder.field("sum", total);
builder.field("min", min);
builder.field("max", max);
innerToXContent(builder, params);
}
builder.endObject();
return builder;
}

void innerToXContent(XContentBuilder builder, Params params) throws IOException {
}

public long getCount() {
return count;
}
Expand Down Expand Up @@ -395,4 +400,54 @@ public int hashCode() {
return Objects.hash(count, total, min, max);
}
}

public static class TimedCounter extends Counter {

private final long totalNanoseconds;

public TimedCounter(long count, long total, long min, long max, long totalNanoseconds) {
super(count, total, min, max);
this.totalNanoseconds = totalNanoseconds;
}

TimedCounter(StreamInput in) throws IOException {
super(in);
totalNanoseconds = in.readZLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(totalNanoseconds);
}

@Override
void innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (builder.humanReadable()) {
builder.field("time", TimeValue.timeValueNanos(totalNanoseconds).toString());
}
builder.field("time_in_nanos", totalNanoseconds);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
if (super.equals(other) == false) {
return false;
}
TimedCounter that = (TimedCounter) other;
return totalNanoseconds == that.totalNanoseconds;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), totalNanoseconds);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.CacheIndexInputStats;
import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.Counter;
import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.TimedCounter;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -45,11 +46,15 @@ private CacheIndexInputStats randomCacheIndexInputStats() {
randomCounter(), randomCounter(),
randomCounter(), randomCounter(),
randomCounter(), randomCounter(),
randomCounter(), randomCounter(),
randomCounter());
randomCounter(), randomTimedCounter(),
randomTimedCounter());
}

private Counter randomCounter() {
return new Counter(randomLong(), randomLong(), randomLong(), randomLong());
}

private TimedCounter randomTimedCounter() {
return new TimedCounter(randomLong(), randomLong(), randomLong(), randomLong(), randomLong());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,15 @@ teardown:
- gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.sum: 0 }
- gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.min: 0 }
- gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.max: 0 }
- gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.time_in_nanos: 0 }
- is_false: indices.docs.shards.0.0.files.0.cached_bytes_written.time

- gte: { indices.docs.shards.0.0.files.0.direct_bytes_read.count: 0 }
- gte: { indices.docs.shards.0.0.files.0.direct_bytes_read.sum: 0 }
- gte: { indices.docs.shards.0.0.files.0.direct_bytes_read.min: 0 }
- gte: { indices.docs.shards.0.0.files.0.direct_bytes_read.max: 0 }
- gte: { indices.docs.shards.0.0.files.0.direct_bytes_read.time_in_nanos: 0 }
- is_false: indices.docs.shards.0.0.files.0.direct_bytes_read.time

- gte: { indices.docs.shards.0.0.files.0.forward_seeks.small.count: 0 }
- gte: { indices.docs.shards.0.0.files.0.forward_seeks.small.sum: 0 }
Expand All @@ -190,3 +194,11 @@ teardown:
- gte: { indices.docs.shards.0.0.files.0.backward_seeks.large.sum: 0 }
- gte: { indices.docs.shards.0.0.files.0.backward_seeks.large.min: 0 }
- gte: { indices.docs.shards.0.0.files.0.backward_seeks.large.max: 0 }

- do:
searchable_snapshots.stats:
index: "d*"
human: true

- is_true: indices.docs.shards.0.0.files.0.cached_bytes_written.time
- is_true: indices.docs.shards.0.0.files.0.direct_bytes_read.time
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
Expand Down Expand Up @@ -74,7 +75,8 @@ public SearchableSnapshotRepository(Repository in) {
blobStoreRepository = (BlobStoreRepository) in;
}

private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath, CacheService cacheService) throws IOException {
private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath, CacheService cacheService,
LongSupplier currentTimeNanosSupplier) throws IOException {

IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()));
BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id());
Expand All @@ -86,7 +88,8 @@ private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath
Directory directory = new SearchableSnapshotDirectory(snapshot, blobContainer);
if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) {
final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID());
directory = new CacheDirectory(directory, cacheService, cacheDir, snapshotId, indexId, shardPath.getShardId());
directory = new CacheDirectory(directory, cacheService, cacheDir, snapshotId, indexId, shardPath.getShardId(),
currentTimeNanosSupplier);
}
directory = new InMemoryNoOpCommitDirectory(directory);

Expand Down Expand Up @@ -146,7 +149,8 @@ public Repository create(RepositoryMetaData metaData, Function<String, Factory>
}

public static IndexStorePlugin.DirectoryFactory newDirectoryFactory(final Supplier<RepositoriesService> repositoriesService,
final Supplier<CacheService> cacheService) {
final Supplier<CacheService> cacheService,
final LongSupplier currentTimeNanosSupplier) {
return (indexSettings, shardPath) -> {
final RepositoriesService repositories = repositoriesService.get();
assert repositories != null;
Expand All @@ -159,7 +163,7 @@ public static IndexStorePlugin.DirectoryFactory newDirectoryFactory(final Suppli
final CacheService cache = cacheService.get();
assert cache != null;

return ((SearchableSnapshotRepository) repository).makeDirectory(indexSettings, shardPath, cache);
return ((SearchableSnapshotRepository) repository).makeDirectory(indexSettings, shardPath, cache, currentTimeNanosSupplier);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) {
@Override
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Map.of(SearchableSnapshotRepository.SNAPSHOT_DIRECTORY_FACTORY_KEY,
SearchableSnapshotRepository.newDirectoryFactory(repositoriesService::get, cacheService::get));
SearchableSnapshotRepository.newDirectoryFactory(repositoriesService::get, cacheService::get, System::nanoTime));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using System::nanoTime since we need finer resolution than ThreadPool::relativeTimeInNanos offers.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats;
import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.CacheIndexInputStats;
import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.Counter;
import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.TimedCounter;
import org.elasticsearch.xpack.searchablesnapshots.InMemoryNoOpCommitDirectory;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory;
import org.elasticsearch.xpack.searchablesnapshots.cache.IndexInputStats;
Expand Down Expand Up @@ -122,14 +123,18 @@ private static CacheIndexInputStats toCacheIndexInputStats(final String fileName
toCounter(inputStats.getForwardSmallSeeks()), toCounter(inputStats.getBackwardSmallSeeks()),
toCounter(inputStats.getForwardLargeSeeks()), toCounter(inputStats.getBackwardLargeSeeks()),
toCounter(inputStats.getContiguousReads()), toCounter(inputStats.getNonContiguousReads()),
toCounter(inputStats.getCachedBytesRead()), toCounter(inputStats.getCachedBytesWritten()),
toCounter(inputStats.getDirectBytesRead()));
toCounter(inputStats.getCachedBytesRead()), toTimedCounter(inputStats.getCachedBytesWritten()),
toTimedCounter(inputStats.getDirectBytesRead()));
}

private static Counter toCounter(final IndexInputStats.Counter counter) {
return new Counter(counter.count(), counter.total(), counter.min(), counter.max());
}

private static TimedCounter toTimedCounter(final IndexInputStats.TimedCounter counter) {
return new TimedCounter(counter.count(), counter.total(), counter.min(), counter.max(), counter.totalNanoseconds());
}

@Nullable
private static CacheDirectory unwrap(Directory dir) {
while (dir != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

/**
* {@link CacheDirectory} uses a {@link CacheService} to cache Lucene files provided by another {@link Directory}.
Expand All @@ -49,8 +50,10 @@ public class CacheDirectory extends FilterDirectory {
private final IndexId indexId;
private final ShardId shardId;
private final Path cacheDir;
private final LongSupplier currentTimeNanosSupplier;

public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId)
public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId,
LongSupplier currentTimeNanosSupplier)
throws IOException {
super(in);
this.stats = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
Expand All @@ -59,6 +62,7 @@ public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir, Sn
this.snapshotId = Objects.requireNonNull(snapshotId);
this.indexId = Objects.requireNonNull(indexId);
this.shardId = Objects.requireNonNull(shardId);
this.currentTimeNanosSupplier = Objects.requireNonNull(currentTimeNanosSupplier);
}

private CacheKey createCacheKey(String fileName) {
Expand Down Expand Up @@ -274,6 +278,7 @@ void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
int bytesCopied = 0;
try (IndexInput input = in.openInput(cacheFileReference.getFileName(), ioContext)) {
stats.incrementInnerOpenCount();
final long startTimeNanos = currentTimeNanosSupplier.getAsLong();
if (start > 0) {
input.seek(start);
}
Expand All @@ -285,7 +290,8 @@ void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
bytesCopied += size;
remaining -= size;
}
stats.addCachedBytesWritten(bytesCopied);
final long endTimeNanos = currentTimeNanosSupplier.getAsLong();
stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
}
}

Expand Down Expand Up @@ -336,6 +342,7 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws
int bytesCopied = 0;
try (IndexInput input = in.openInput(cacheFileReference.getFileName(), ioContext)) {
stats.incrementInnerOpenCount();
final long startTimeNanos = currentTimeNanosSupplier.getAsLong();
if (start > 0) {
input.seek(start);
}
Expand All @@ -347,7 +354,8 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws
bytesCopied += len;
remaining -= len;
}
stats.addDirectBytesRead(bytesCopied);
final long endTimeNanos = currentTimeNanosSupplier.getAsLong();
stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos);
}
return bytesCopied;
}
Expand Down
Loading