Skip to content

Commit 8d3347b

Browse files
committed
Memory Monitor: Remove explicit GC call, clear cached instead, closes #179.
1 parent 45ae8f6 commit 8d3347b

File tree

6 files changed

+56
-53
lines changed

6 files changed

+56
-53
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.index.Index;
2626
import org.elasticsearch.index.cache.filter.support.AbstractConcurrentMapFilterCache;
2727
import org.elasticsearch.index.settings.IndexSettings;
28-
import org.elasticsearch.threadpool.ThreadPool;
2928
import org.elasticsearch.util.collect.MapMaker;
3029
import org.elasticsearch.util.inject.Inject;
3130
import org.elasticsearch.util.settings.Settings;
@@ -39,8 +38,8 @@
3938
*/
4039
public class SoftFilterCache extends AbstractConcurrentMapFilterCache {
4140

42-
@Inject public SoftFilterCache(Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
43-
super(index, indexSettings, threadPool, new MapMaker().softKeys().<IndexReader, ConcurrentMap<Filter, DocIdSet>>makeMap());
41+
@Inject public SoftFilterCache(Index index, @IndexSettings Settings indexSettings) {
42+
super(index, indexSettings, new MapMaker().softKeys().<IndexReader, ConcurrentMap<Filter, DocIdSet>>makeMap());
4443
}
4544

4645
@Override public String type() {

modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,11 @@
2626
import org.elasticsearch.index.Index;
2727
import org.elasticsearch.index.cache.filter.FilterCache;
2828
import org.elasticsearch.index.settings.IndexSettings;
29-
import org.elasticsearch.threadpool.ThreadPool;
30-
import org.elasticsearch.util.TimeValue;
3129
import org.elasticsearch.util.settings.Settings;
3230

3331
import java.io.IOException;
3432
import java.util.Iterator;
3533
import java.util.concurrent.ConcurrentMap;
36-
import java.util.concurrent.Future;
3734

3835
import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
3936
import static org.elasticsearch.util.lucene.docidset.DocIdSets.*;
@@ -47,24 +44,13 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
4744

4845
private final ConcurrentMap<IndexReader, ConcurrentMap<Filter, DocIdSet>> cache;
4946

50-
private final TimeValue readerCleanerSchedule;
51-
52-
private final Future scheduleFuture;
53-
54-
protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool,
47+
protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings indexSettings,
5548
ConcurrentMap<IndexReader, ConcurrentMap<Filter, DocIdSet>> cache) {
5649
super(index, indexSettings);
5750
this.cache = cache;
58-
59-
this.readerCleanerSchedule = componentSettings.getAsTime("reader_cleaner_schedule", TimeValue.timeValueSeconds(10));
60-
61-
logger.debug("Using [" + type() + "] filter cache with reader_cleaner_schedule [{}]", readerCleanerSchedule);
62-
63-
this.scheduleFuture = threadPool.scheduleWithFixedDelay(new IndexReaderCleaner(), readerCleanerSchedule);
6451
}
6552

6653
@Override public void close() {
67-
scheduleFuture.cancel(false);
6854
cache.clear();
6955
}
7056

@@ -82,7 +68,13 @@ protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings
8268
cleaned++;
8369
}
8470
}
85-
logger.trace("Cleaned [{}] out of estimated total [{}]", cleaned, totalCount);
71+
if (logger.isDebugEnabled()) {
72+
if (cleaned > 0) {
73+
logger.debug("Cleaned [{}] out of estimated total [{}]", cleaned, totalCount);
74+
}
75+
} else if (logger.isTraceEnabled()) {
76+
logger.trace("Cleaned [{}] out of estimated total [{}]", cleaned, totalCount);
77+
}
8678
}
8779

8880
@Override public Filter cache(Filter filterToCache) {

modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.index.Index;
2626
import org.elasticsearch.index.cache.filter.support.AbstractConcurrentMapFilterCache;
2727
import org.elasticsearch.index.settings.IndexSettings;
28-
import org.elasticsearch.threadpool.ThreadPool;
2928
import org.elasticsearch.util.collect.MapMaker;
3029
import org.elasticsearch.util.inject.Inject;
3130
import org.elasticsearch.util.settings.Settings;
@@ -39,8 +38,8 @@
3938
*/
4039
public class WeakFilterCache extends AbstractConcurrentMapFilterCache {
4140

42-
@Inject public WeakFilterCache(Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
43-
super(index, indexSettings, threadPool, new MapMaker().weakKeys().<IndexReader, ConcurrentMap<Filter, DocIdSet>>makeMap());
41+
@Inject public WeakFilterCache(Index index, @IndexSettings Settings indexSettings) {
42+
super(index, indexSettings, new MapMaker().weakKeys().<IndexReader, ConcurrentMap<Filter, DocIdSet>>makeMap());
4443
}
4544

4645
@Override public String type() {

modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.indices;
2121

22-
import org.elasticsearch.util.inject.Inject;
2322
import org.elasticsearch.index.engine.Engine;
2423
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
2524
import org.elasticsearch.index.service.IndexService;
@@ -32,6 +31,7 @@
3231
import org.elasticsearch.util.SizeValue;
3332
import org.elasticsearch.util.Tuple;
3433
import org.elasticsearch.util.component.AbstractComponent;
34+
import org.elasticsearch.util.inject.Inject;
3535
import org.elasticsearch.util.settings.Settings;
3636

3737
import java.util.ArrayList;
@@ -71,6 +71,18 @@ public TranslogCleanResult cleanTranslog(int translogNumberOfOperationsThreshold
7171
return new TranslogCleanResult(totalShards, cleanedShards, new SizeValue(cleaned, SizeUnit.BYTES));
7272
}
7373

74+
public void cacheClearUnreferenced() {
75+
for (IndexService indexService : indicesService) {
76+
indexService.cache().clearUnreferenced();
77+
}
78+
}
79+
80+
public void cacheClear() {
81+
for (IndexService indexService : indicesService) {
82+
indexService.cache().clear();
83+
}
84+
}
85+
7486
/**
7587
* Checks if memory needs to be cleaned and cleans it. Returns the amount of memory cleaned.
7688
*/
@@ -145,7 +157,7 @@ public SizeValue cleaned() {
145157
}
146158

147159
@Override public String toString() {
148-
return "cleaned[" + cleaned + "], cleaned_shards[" + cleanedShards + "], total_shards[" + totalShards + "]";
160+
return "cleaned [" + cleaned + "], cleaned_shards [" + cleanedShards + "], total_shards [" + totalShards + "]";
149161
}
150162
}
151163

@@ -179,7 +191,7 @@ public SizeValue cleaned() {
179191
}
180192

181193
@Override public String toString() {
182-
return "cleaned[" + cleaned + "], estimated_flushable_size[" + estimatedFlushableSize + "], cleaned_shards[" + cleanedShards + "], total_shards[" + totalShards + "]";
194+
return "cleaned [" + cleaned + "], estimated_flushable_size [" + estimatedFlushableSize + "], cleaned_shards [" + cleanedShards + "], total_shards [" + totalShards + "]";
183195
}
184196
}
185197
}

modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@
2323
import org.elasticsearch.indices.IndicesMemoryCleaner;
2424
import org.elasticsearch.monitor.memory.MemoryMonitor;
2525
import org.elasticsearch.threadpool.ThreadPool;
26-
import org.elasticsearch.util.SizeUnit;
27-
import org.elasticsearch.util.SizeValue;
28-
import org.elasticsearch.util.StopWatch;
29-
import org.elasticsearch.util.TimeValue;
26+
import org.elasticsearch.util.*;
3027
import org.elasticsearch.util.component.AbstractLifecycleComponent;
3128
import org.elasticsearch.util.inject.Inject;
3229
import org.elasticsearch.util.settings.Settings;
@@ -47,7 +44,7 @@ public class AlphaMemoryMonitor extends AbstractLifecycleComponent<MemoryMonitor
4744

4845
private final TimeValue interval;
4946

50-
private final int gcThreshold;
47+
private final int clearCacheThreshold;
5148

5249
private final int cleanThreshold;
5350

@@ -68,7 +65,7 @@ public class AlphaMemoryMonitor extends AbstractLifecycleComponent<MemoryMonitor
6865
private volatile ScheduledFuture scheduledFuture;
6966

7067
private AtomicLong totalCleans = new AtomicLong();
71-
private AtomicLong totalGCs = new AtomicLong();
68+
private AtomicLong totalClearCache = new AtomicLong();
7269

7370
@Inject public AlphaMemoryMonitor(Settings settings, ThreadPool threadPool, IndicesMemoryCleaner indicesMemoryCleaner) {
7471
super(settings);
@@ -78,12 +75,12 @@ public class AlphaMemoryMonitor extends AbstractLifecycleComponent<MemoryMonitor
7875
this.upperMemoryThreshold = componentSettings.getAsDouble("upper_memory_threshold", 0.8);
7976
this.lowerMemoryThreshold = componentSettings.getAsDouble("lower_memory_threshold", 0.5);
8077
this.interval = componentSettings.getAsTime("interval", timeValueMillis(500));
81-
this.gcThreshold = componentSettings.getAsInt("gc_threshold", 5);
78+
this.clearCacheThreshold = componentSettings.getAsInt("clear_cache_threshold", 2);
8279
this.cleanThreshold = componentSettings.getAsInt("clean_threshold", 10);
8380
this.minimumFlushableSizeToClean = componentSettings.getAsSize("minimum_flushable_size_to_clean", new SizeValue(5, SizeUnit.MB));
8481
this.translogNumberOfOperationsThreshold = componentSettings.getAsInt("translog_number_of_operations_threshold", 5000);
8582

86-
logger.debug("interval[" + interval + "], upper_memory_threshold[" + upperMemoryThreshold + "], lower_memory_threshold[" + lowerMemoryThreshold + "], translog_number_of_operations_threshold[" + translogNumberOfOperationsThreshold + "]");
83+
logger.debug("interval [" + interval + "], upper_memory_threshold [" + upperMemoryThreshold + "], lower_memory_threshold [" + lowerMemoryThreshold + "], translog_number_of_operations_threshold [" + translogNumberOfOperationsThreshold + "]");
8784

8885
this.runtime = Runtime.getRuntime();
8986
this.maxMemory = new SizeValue(runtime.maxMemory());
@@ -111,7 +108,7 @@ private long totalMemory() {
111108

112109
private class MemoryCleaner implements Runnable {
113110

114-
private int gcCounter;
111+
private int clearCacheCounter;
115112

116113
private boolean performedClean;
117114

@@ -120,12 +117,15 @@ private class MemoryCleaner implements Runnable {
120117
private StopWatch stopWatch = new StopWatch().keepTaskList(false);
121118

122119
@Override public void run() {
120+
// clear unreferenced in the cache
121+
indicesMemoryCleaner.cacheClearUnreferenced();
122+
123123
// try and clean translog based on a threshold, since we don't want to get a very large transaction log
124-
// which means recovery it will take a long time (since the target reindex all this data)
124+
// which means recovery it will take a long time (since the target re-index all this data)
125125
IndicesMemoryCleaner.TranslogCleanResult translogCleanResult = indicesMemoryCleaner.cleanTranslog(translogNumberOfOperationsThreshold);
126126
if (translogCleanResult.cleanedShards() > 0) {
127127
long totalClean = totalCleans.incrementAndGet();
128-
logger.debug("[" + totalClean + "] Translog Clean: " + translogCleanResult);
128+
logger.debug("[" + totalClean + "] [Translog] " + translogCleanResult);
129129
}
130130

131131
// the logic is simple, if the used memory is above the upper threshold, we need to clean
@@ -142,7 +142,7 @@ private class MemoryCleaner implements Runnable {
142142
long upperThresholdMemory = (long) (upperMemory * upperMemoryThreshold);
143143

144144
if (usedMemory - upperThresholdMemory <= 0) {
145-
gcCounter = 0;
145+
clearCacheCounter = 0;
146146
performedClean = false;
147147
cleanCounter = 0;
148148
return;
@@ -160,30 +160,31 @@ private class MemoryCleaner implements Runnable {
160160
long memoryToClean = usedMemory - lowerThresholdMemory;
161161
if (logger.isDebugEnabled()) {
162162
StringBuilder sb = new StringBuilder();
163-
sb.append('[').append(totalClean).append("]: ");
164-
sb.append("Cleaning, memoryToClean[").append(new SizeValue(memoryToClean)).append(']');
165-
sb.append(", lowerMemoryThreshold[").append(new SizeValue(lowerThresholdMemory)).append(']');
166-
sb.append(", upperMemoryThreshold[").append(new SizeValue(upperThresholdMemory)).append(']');
167-
sb.append(", usedMemory[").append(new SizeValue(usedMemory)).append(']');
168-
sb.append(", totalMemory[").append(new SizeValue(totalMemory)).append(']');
169-
sb.append(", maxMemory[").append(maxMemory).append(']');
163+
sb.append('[').append(totalClean).append("] ");
164+
sb.append("[Cleaning] memory_to_clean [").append(new SizeValue(memoryToClean)).append(']');
165+
sb.append(", lower_memory_threshold [").append(new SizeValue(lowerThresholdMemory)).append(']');
166+
sb.append(", upper_memory_threshold [").append(new SizeValue(upperThresholdMemory)).append(']');
167+
sb.append(", used_memory [").append(new SizeValue(usedMemory)).append(']');
168+
sb.append(", total_memory[").append(new SizeValue(totalMemory)).append(']');
169+
sb.append(", max_memory[").append(maxMemory).append(']');
170170
logger.debug(sb.toString());
171171
}
172172

173173
IndicesMemoryCleaner.MemoryCleanResult memoryCleanResult = indicesMemoryCleaner.cleanMemory(memoryToClean, minimumFlushableSizeToClean);
174174
if (logger.isDebugEnabled()) {
175-
logger.debug("[" + totalClean + "] Memory Clean: " + memoryCleanResult);
175+
logger.debug("[" + totalClean + "] [Cleaned ] " + memoryCleanResult);
176176
}
177-
performedClean = true;
178-
cleanCounter = 0;
179177

180-
if (++gcCounter >= gcThreshold) {
181-
long totalGc = totalGCs.incrementAndGet();
182-
logger.debug("[" + totalGc + "]: Running GC after [" + gcCounter + "] memory clean swipes");
183-
System.gc();
184-
gcCounter = 0;
178+
if (++clearCacheCounter >= clearCacheThreshold) {
179+
long totalClear = totalClearCache.incrementAndGet();
180+
logger.debug("[" + totalClear + "] [Cache ] cleared after [" + (cleanCounter / cleanThreshold) + "] memory clean swipes");
181+
indicesMemoryCleaner.cacheClear();
182+
ThreadLocals.clearReferencesThreadLocals();
183+
clearCacheCounter = 0;
185184
}
186185

186+
performedClean = true;
187+
cleanCounter = 0;
187188
}
188189
}
189190
}

modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadLocals.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private static void clearThreadLocalMap(Object map, Field internalTableField) th
118118
args[2] = value.getClass().getCanonicalName();
119119
args[3] = value.toString();
120120
if (logger.isDebugEnabled()) {
121-
logger.debug("ThreadLocal with key of type [{0}] (value [{1}]) and a value of type [{2}] (value [{3}]): The ThreadLocal has been forcibly removed.", args);
121+
logger.trace("ThreadLocal with key of type [{0}] (value [{1}]) and a value of type [{2}] (value [{3}]): The ThreadLocal has been forcibly removed.", args);
122122
}
123123
if (key == null) {
124124
staleEntriesCount++;

0 commit comments

Comments
 (0)