23
23
import org .elasticsearch .common .Nullable ;
24
24
import org .elasticsearch .common .cache .Cache ;
25
25
import org .elasticsearch .common .cache .CacheBuilder ;
26
+ import org .elasticsearch .common .cache .RemovalNotification ;
26
27
import org .elasticsearch .common .settings .Setting ;
27
28
import org .elasticsearch .common .settings .Setting .Property ;
28
29
import org .elasticsearch .common .settings .Settings ;
29
30
import org .elasticsearch .common .unit .ByteSizeUnit ;
30
31
import org .elasticsearch .common .unit .ByteSizeValue ;
31
32
import org .elasticsearch .common .unit .TimeValue ;
33
+ import org .elasticsearch .common .util .concurrent .ReleasableLock ;
32
34
import org .elasticsearch .common .util .set .Sets ;
35
+ import org .elasticsearch .threadpool .ThreadPool ;
33
36
34
37
import java .io .Closeable ;
35
38
import java .util .List ;
38
41
import java .util .Set ;
39
42
import java .util .concurrent .ConcurrentHashMap ;
40
43
import java .util .concurrent .ExecutionException ;
44
+ import java .util .concurrent .ExecutorService ;
45
+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
41
46
42
47
/**
43
48
* This is a cache for {@link BitSet} instances that are used with the {@link DocumentSubsetReader}.
@@ -64,17 +69,48 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
64
69
private static final BitSet NULL_MARKER = new FixedBitSet (0 );
65
70
66
71
private final Logger logger ;
72
+
73
+ /**
74
+ * When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}.
75
+ * We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the
76
+ * {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}.
77
+ * The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately
78
+ * re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache}
79
+ * but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}.
80
+ */
81
+ private final ReleasableLock cacheEvictionLock ;
82
+ private final ReleasableLock cacheModificationLock ;
83
+ private final ExecutorService cleanupExecutor ;
84
+
67
85
private final Cache <BitsetCacheKey , BitSet > bitsetCache ;
68
86
private final Map <IndexReader .CacheKey , Set <BitsetCacheKey >> keysByIndex ;
69
87
70
- public DocumentSubsetBitsetCache (Settings settings ) {
88
+ public DocumentSubsetBitsetCache (Settings settings , ThreadPool threadPool ) {
89
+ this (settings , threadPool .executor (ThreadPool .Names .GENERIC ));
90
+ }
91
+
92
+ /**
93
+ * @param settings The global settings object for this node
94
+ * @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
95
+ * it is sometimes necessary to run an asynchronous task to synchronize the internal state.
96
+ */
97
+ protected DocumentSubsetBitsetCache (Settings settings , ExecutorService cleanupExecutor ) {
71
98
this .logger = LogManager .getLogger (getClass ());
99
+
100
+ final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock ();
101
+ this .cacheEvictionLock = new ReleasableLock (readWriteLock .writeLock ());
102
+ this .cacheModificationLock = new ReleasableLock (readWriteLock .readLock ());
103
+ this .cleanupExecutor = cleanupExecutor ;
104
+
72
105
final TimeValue ttl = CACHE_TTL_SETTING .get (settings );
73
106
final ByteSizeValue size = CACHE_SIZE_SETTING .get (settings );
74
107
this .bitsetCache = CacheBuilder .<BitsetCacheKey , BitSet >builder ()
75
108
.setExpireAfterAccess (ttl )
76
109
.setMaximumWeight (size .getBytes ())
77
- .weigher ((key , bitSet ) -> bitSet == NULL_MARKER ? 0 : bitSet .ramBytesUsed ()).build ();
110
+ .weigher ((key , bitSet ) -> bitSet == NULL_MARKER ? 0 : bitSet .ramBytesUsed ())
111
+ .removalListener (this ::onCacheEviction )
112
+ .build ();
113
+
78
114
this .keysByIndex = new ConcurrentHashMap <>();
79
115
}
80
116
@@ -88,6 +124,31 @@ public void onClose(IndexReader.CacheKey ownerCoreCacheKey) {
88
124
}
89
125
}
90
126
127
+ /**
128
+ * Cleanup (synchronize) the internal state when an object is removed from the primary cache
129
+ */
130
+ private void onCacheEviction (RemovalNotification <BitsetCacheKey , BitSet > notification ) {
131
+ final BitsetCacheKey bitsetKey = notification .getKey ();
132
+ final IndexReader .CacheKey indexKey = bitsetKey .index ;
133
+ if (keysByIndex .getOrDefault (indexKey , Set .of ()).contains (bitsetKey ) == false ) {
134
+ // If the bitsetKey isn't in the lookup map, then there's nothing to synchronize
135
+ return ;
136
+ }
137
+ // We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is
138
+ // simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore
139
+ // holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write ("eviction") lock, but we
140
+ // need to acquire that lock here.
141
+ cleanupExecutor .submit (() -> {
142
+ try (ReleasableLock ignored = cacheEvictionLock .acquire ()) {
143
+ // it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
144
+ if (bitsetCache .get (bitsetKey ) == null ) {
145
+ // key is no longer in the cache, make sure it is no longer in the lookup map either.
146
+ keysByIndex .getOrDefault (indexKey , Set .of ()).remove (bitsetKey );
147
+ }
148
+ }
149
+ });
150
+ }
151
+
91
152
@ Override
92
153
public void close () {
93
154
clear ("close" );
@@ -96,7 +157,8 @@ public void close() {
96
157
public void clear (String reason ) {
97
158
logger .debug ("clearing all DLS bitsets because [{}]" , reason );
98
159
// Due to the order here, it is possible than a new entry could be added _after_ the keysByIndex map is cleared
99
- // but _before_ the cache is cleared. This would mean it sits orphaned in keysByIndex, but this is not a issue.
160
+ // but _before_ the cache is cleared. This should get fixed up in the "onCacheEviction" callback, but if anything slips through
161
+ // and sits orphaned in keysByIndex, it will not be a significant issue.
100
162
// When the index is closed, the key will be removed from the map, and there will not be a corresponding item
101
163
// in the cache, which will make the cache-invalidate a no-op.
102
164
// Since the entry is not in the cache, if #getBitSet is called, it will be loaded, and the new key will be added
@@ -130,31 +192,33 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
130
192
final IndexReader .CacheKey indexKey = coreCacheHelper .getKey ();
131
193
final BitsetCacheKey cacheKey = new BitsetCacheKey (indexKey , query );
132
194
133
- final BitSet bitSet = bitsetCache .computeIfAbsent (cacheKey , ignore1 -> {
134
- // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
135
- keysByIndex .compute (indexKey , (ignore2 , set ) -> {
136
- if (set == null ) {
137
- set = Sets .newConcurrentHashSet ();
195
+ try (ReleasableLock ignored = cacheModificationLock .acquire ()) {
196
+ final BitSet bitSet = bitsetCache .computeIfAbsent (cacheKey , ignore1 -> {
197
+ // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
198
+ keysByIndex .compute (indexKey , (ignore2 , set ) -> {
199
+ if (set == null ) {
200
+ set = Sets .newConcurrentHashSet ();
201
+ }
202
+ set .add (cacheKey );
203
+ return set ;
204
+ });
205
+ final IndexReaderContext topLevelContext = ReaderUtil .getTopLevelContext (context );
206
+ final IndexSearcher searcher = new IndexSearcher (topLevelContext );
207
+ searcher .setQueryCache (null );
208
+ final Weight weight = searcher .createWeight (searcher .rewrite (query ), ScoreMode .COMPLETE_NO_SCORES , 1f );
209
+ Scorer s = weight .scorer (context );
210
+ if (s == null ) {
211
+ // A cache loader is not allowed to return null, return a marker object instead.
212
+ return NULL_MARKER ;
213
+ } else {
214
+ return BitSet .of (s .iterator (), context .reader ().maxDoc ());
138
215
}
139
- set .add (cacheKey );
140
- return set ;
141
216
});
142
- final IndexReaderContext topLevelContext = ReaderUtil .getTopLevelContext (context );
143
- final IndexSearcher searcher = new IndexSearcher (topLevelContext );
144
- searcher .setQueryCache (null );
145
- final Weight weight = searcher .createWeight (searcher .rewrite (query ), ScoreMode .COMPLETE_NO_SCORES , 1f );
146
- Scorer s = weight .scorer (context );
147
- if (s == null ) {
148
- // A cache loader is not allowed to return null, return a marker object instead.
149
- return NULL_MARKER ;
217
+ if (bitSet == NULL_MARKER ) {
218
+ return null ;
150
219
} else {
151
- return BitSet . of ( s . iterator (), context . reader (). maxDoc ()) ;
220
+ return bitSet ;
152
221
}
153
- });
154
- if (bitSet == NULL_MARKER ) {
155
- return null ;
156
- } else {
157
- return bitSet ;
158
222
}
159
223
}
160
224
@@ -203,4 +267,27 @@ public String toString() {
203
267
return getClass ().getSimpleName () + "(" + index + "," + query + ")" ;
204
268
}
205
269
}
270
+
271
+ /**
272
+ * This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one
273
+ * another. This method is only called by tests.
274
+ */
275
+ void verifyInternalConsistency () {
276
+ this .bitsetCache .keys ().forEach (bck -> {
277
+ final Set <BitsetCacheKey > set = this .keysByIndex .get (bck .index );
278
+ if (set == null ) {
279
+ throw new IllegalStateException ("Key [" + bck + "] is in the cache, but there is no entry for [" + bck .index +
280
+ "] in the lookup map" );
281
+ }
282
+ if (set .contains (bck ) == false ) {
283
+ throw new IllegalStateException ("Key [" + bck + "] is in the cache, but the lookup entry for [" + bck .index +
284
+ "] does not contain that key" );
285
+ }
286
+ });
287
+ this .keysByIndex .values ().stream ().flatMap (Set ::stream ).forEach (bck -> {
288
+ if (this .bitsetCache .get (bck ) == null ) {
289
+ throw new IllegalStateException ("Key [" + bck + "] is in the lookup map, but is not in the cache" );
290
+ }
291
+ });
292
+ }
206
293
}
0 commit comments