26
26
import com .google .common .cache .RemovalListener ;
27
27
import com .google .common .cache .RemovalNotification ;
28
28
import org .apache .lucene .search .DocIdSet ;
29
- import org .elasticsearch .cache .recycler .CacheRecycler ;
30
29
import org .elasticsearch .common .component .AbstractComponent ;
31
30
import org .elasticsearch .common .inject .Inject ;
32
- import org .elasticsearch .common .recycler .Recycler ;
33
31
import org .elasticsearch .common .settings .Settings ;
34
32
import org .elasticsearch .common .unit .ByteSizeValue ;
35
33
import org .elasticsearch .common .unit .MemorySizeValue ;
47
45
public class IndicesFilterCache extends AbstractComponent implements RemovalListener <WeightedFilterCache .FilterCacheKey , DocIdSet > {
48
46
49
47
private final ThreadPool threadPool ;
50
- private final CacheRecycler cacheRecycler ;
51
48
52
49
private Cache <WeightedFilterCache .FilterCacheKey , DocIdSet > cache ;
53
50
@@ -91,10 +88,9 @@ public void onRefreshSettings(Settings settings) {
91
88
}
92
89
93
90
@ Inject
94
- public IndicesFilterCache (Settings settings , ThreadPool threadPool , CacheRecycler cacheRecycler , NodeSettingsService nodeSettingsService ) {
91
+ public IndicesFilterCache (Settings settings , ThreadPool threadPool , NodeSettingsService nodeSettingsService ) {
95
92
super (settings );
96
93
this .threadPool = threadPool ;
97
- this .cacheRecycler = cacheRecycler ;
98
94
this .size = componentSettings .get ("size" , "10%" );
99
95
this .expire = componentSettings .getAsTime ("expire" , null );
100
96
this .cleanInterval = componentSettings .getAsTime ("clean_interval" , TimeValue .timeValueSeconds (60 ));
@@ -167,6 +163,10 @@ public void onRemoval(RemovalNotification<WeightedFilterCache.FilterCacheKey, Do
167
163
*/
168
164
class ReaderCleaner implements Runnable {
169
165
166
+ // this is thread safe since we only schedule the next cleanup once the current one is
167
+ // done, so no concurrent execution
168
+ private final ObjectOpenHashSet <Object > keys = ObjectOpenHashSet .newInstance ();
169
+
170
170
@ Override
171
171
public void run () {
172
172
if (closed ) {
@@ -180,33 +180,30 @@ public void run() {
180
180
threadPool .executor (ThreadPool .Names .GENERIC ).execute (new Runnable () {
181
181
@ Override
182
182
public void run () {
183
- Recycler .V <ObjectOpenHashSet <Object >> keys = cacheRecycler .hashSet (-1 );
184
- try {
185
- for (Iterator <Object > it = readersKeysToClean .iterator (); it .hasNext (); ) {
186
- keys .v ().add (it .next ());
187
- it .remove ();
188
- }
189
- cache .cleanUp ();
190
- if (!keys .v ().isEmpty ()) {
191
- for (Iterator <WeightedFilterCache .FilterCacheKey > it = cache .asMap ().keySet ().iterator (); it .hasNext (); ) {
192
- WeightedFilterCache .FilterCacheKey filterCacheKey = it .next ();
193
- if (keys .v ().contains (filterCacheKey .readerKey ())) {
194
- // same as invalidate
195
- it .remove ();
196
- }
183
+ keys .clear ();
184
+ for (Iterator <Object > it = readersKeysToClean .iterator (); it .hasNext (); ) {
185
+ keys .add (it .next ());
186
+ it .remove ();
187
+ }
188
+ cache .cleanUp ();
189
+ if (!keys .isEmpty ()) {
190
+ for (Iterator <WeightedFilterCache .FilterCacheKey > it = cache .asMap ().keySet ().iterator (); it .hasNext (); ) {
191
+ WeightedFilterCache .FilterCacheKey filterCacheKey = it .next ();
192
+ if (keys .contains (filterCacheKey .readerKey ())) {
193
+ // same as invalidate
194
+ it .remove ();
197
195
}
198
196
}
199
- schedule ();
200
- } finally {
201
- keys .close ();
202
197
}
198
+ schedule ();
199
+ keys .clear ();
203
200
}
204
201
});
205
202
} catch (EsRejectedExecutionException ex ) {
206
203
logger .debug ("Can not run ReaderCleaner - execution rejected" , ex );
207
- }
204
+ }
208
205
}
209
-
206
+
210
207
private void schedule () {
211
208
try {
212
209
threadPool .schedule (cleanInterval , ThreadPool .Names .SAME , this );
0 commit comments