-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Add the ability to partition a scroll in multiple slices. #18237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Relates to #13494 (comment) / @s1monw |
hey @jimferenczi this is awesome. Yet, I have concerns about the field data. Loading all the stuff into fielddata only has a real benefit if it's reused but I am not sure if that is the common case. I wonder if we should use something like a dedicated query that we can use that walks the |
@s1monw I've pushed another change that uses a BitSet per sliced scroll instead of building a specialized fieldcache for the _uid field: f391a59 @jpountz I needed a way to tell the query cache that certain query should never be cached. For instance the SliceTermQuery builds a bitset for the toplevel reader associated with a sliced scroll. This bitset is valid during the lifetime of the scroll but should be freed when the scroll is cleared. |
String field = null; | ||
int id = -1; | ||
int max = -1; | ||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use org.elasticsearch.common.xcontent.ObjectParser
for this instead of parsing all the stuff ourself
@jimferenczi I left a bunch of comments |
@jimferenczi For caching I think we have two options: we can either make the top-level index reader part of the query's equals/hashcode definitions so that the cache would only be used if the query is reused on the same top-level reader. This means that the query will only be cached in practice if the index is pretty static. In case we really never want to cache (eg. if these queries are cheaper than a cached DocIdSet, like the MatchAllDocsQuery iterator for instance), then your QueryCachingPolicy wrapper looks good to me. I would just move it to its own class instead of an anonymous class in IndexShard? |
@@ -1380,6 +1408,7 @@ public boolean equals(Object obj) { | |||
&& Objects.equals(size, other.size) | |||
&& Objects.equals(sorts, other.sorts) | |||
&& Objects.equals(searchAfterBuilder, other.searchAfterBuilder) | |||
&& Objects.equals(sliceBuilder, other.sliceBuilder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be used in the hashcode too
|
@jpountz the TermsSliceQuery is just a way to avoid the usage of the fielddata on the _uid field.
They are not cheaper. The DocValuesSliceQuery is not faster but avoids the bitset entirely.
The TermsSliceQuery is supposed to be a cached DocIdSet but I wanted to avoid the QueryCachingPolicy. I agree that it should use the query cache but only if we can ensure that this entry will reliably stay around during the lifespan of the scroll. Currently this is achieved by keeping the query (and the bitset built during the initial scroll query) in the SearchContext. |
Forcing the query cache te hold on an entry for a certain amount of time would require to add more APIs, which I'd like to avoid. So I think the current approach to cache at the query/searchcontext level is better? |
} | ||
cachingPolicy = new QueryCachingPolicy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move it to its own class, eg. ElasticsearchQueryCachingPolicy?
@jimferenczi I had a conversation with @jpountz and I think we should go without the sorting altogether in the first iteration of this feature. I think we should have another conversation how we can implement this feature really just as a parallel version of scroll that only guarantees that we partition all docs within a single scroll ID. ie. we somehow need to maintain a context per slice on the same scroll context. I am not sure about the implementation yet but lets have a chat about it first. I also talked to @costin and we are good with that. |
heya @jimferenczi - i've just thought of a potential problem with the current API: each scroll request no longer represents the same snapshot-in-time if indexing continues. In practice I don't know if this really is an issue... |
@clintongormley this is exactly what we wanted when this issue started. This means that the slices are independent and that they can be processed completely independently. The snapshot should be valid only during the lifespan of a single slice. The nice thing about slicing on _uid (or any pseudo random field that is never updated) is that we cannot miss any document. You can have new updates and deletes between two slices but it doesn't change the fact that each slice is consistent. |
@jimferenczi yeah, I think that the fact that the point-in-time may be slightly different per slice doesn't make any real difference. The same already applies to multiple shards. |
@s1monw I pushed another commit to address the memory explosion. The slices are now splitted across shards like you suggested, so for instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard. This means that the total number of bitsets needed is bounded by the number of slices instead of (numShards*numSlices). |
// in such case we can reduce the number of requested shards by slice | ||
|
||
// first we check if the slice is responsible of this shard | ||
int targetShard = id % numShards; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use Math.floorMod here just to be sure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If id or numShards are negative we're screwed anyway so using floorMod would just hide the problem ? We always check that id is positive so it should not be a problem.
I left minors but I love it! thanks for doing it. |
@@ -66,7 +74,7 @@ public int hashCode() { | |||
} | |||
|
|||
@Override | |||
public String toString(String field) { | |||
public String toString(String f) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol :)
Wonderful, LGTM! |
} | ||
// get the new slice id for this shard | ||
int shardSlice = id / numShards; | ||
if (useTermQuery) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit pick return useTermQuery ? new TermsSliceQuery(field, shardSlice, numSlicesInShard):new DocValuesSliceQuery(field, shardSlice, numSlicesInShard);
LGTM no need for another review! |
API: ``` curl -XGET 'localhost:9200/twitter/tweet/_search?scroll=1m' -d '{ "slice": { "field": "_uid", <1> "id": 0, <2> "max": 10 <3> }, "query": { "match" : { "title" : "elasticsearch" } } } ``` <1> (optional) The field name used to do the slicing (_uid by default) <2> The id of the slice By default the splitting is done on the shards first and then locally on each shard using the _uid field with the following formula: `slice(doc) = floorMod(hashCode(doc._uid), max)` For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard. Each scroll is independent and can be processed in parallel like any scroll request. Closes #13494
public boolean get(int doc) { | ||
values.setDocument(doc); | ||
for (int i = 0; i < values.count(); i++) { | ||
return contains(Long.hashCode(values.valueAt(i))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Long.hashCode/BitMixer.mix64/ ? otherwise we might still have the issue with doubles given that Long.hashCode is a bit naive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I was naive too, I pushed another commit.
API:
<1> (optional) The field name used to do the slicing (_uid by default)
<2> The id of the slice
By default the splitting is done on the shards first and then locally on each shard using the _uid field
with the following formula:
slice(doc) = floorMod(hashCode(doc._uid), max)
For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned
to the first shard and the slices 1 and 3 are assigned to the second shard.
Each scroll is independent and can be processed in parallel like any scroll request.
Closes #13494