Skip to content

Provide a way to create RefreshListeners at the tip of the Translog #36541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
clandry94 opened this issue Dec 12, 2018 · 10 comments
Closed

Provide a way to create RefreshListeners at the tip of the Translog #36541

clandry94 opened this issue Dec 12, 2018 · 10 comments
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard.

Comments

@clandry94
Copy link

Describe the feature:

Currently, the addRefreshListener method in the IndexShard class is publicly exposed, but expects to receive a Translog.Location and Consumer to add a new RefreshListener. This is fine in the context of indexation where the Translog is readily available, but as a plugin developer there is no way to publicly retrieve the Translog, especially when extending an IndexModule to react to different state changes of shards such as void afterIndexShardStarted(IndexShard indexShard) and void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings)

I'm proposing one of two changes within the IndexShard class:

  1. Publicly expose the Engine from within IndexShard. This used to be the case, but was changed in previous commits a while back. This would allow plugins to grab the Translog as well as the last write location and thus build their own RefreshListeners.

-or-

  1. Add a new public method within IndexShard which would look something like this
public void addRefreshListenerAtLatestWriteLocation(Consumer<boolean> listener) {
  Translog.Location latestLocation = getEngine().getTranslogLastWriteLocation();
  addRefreshListener(latestLocation, listener);
}

This method would be safer than option 1 since it keeps the Engine accessor package private. Since it is read only, I don't think this would introduce any synchronization issues.

Making either of these changes would enable plugin developers to make use of RefreshListeners within a cluster and fulfill a couple of use-cases:

  1. Retrieve metrics on refresh rates at a shard level rather than an average of all shard refreshes at an index level. This is nice for those of us that maintain clusters with hundreds of shards and the average refresh rate is not particularly useful for us.

  2. Build plugins that make use of shard level refresh data and perform actions such as application level cache invalidation based on when data on a specific shard is available for search

@colings86 colings86 added the :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. label Dec 12, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@s1monw
Copy link
Contributor

s1monw commented Dec 12, 2018

hey @clandry94 thanks for opening this ticket.

I agree it's not ideal that this method is public, yet it has a very specific purpose, it's there to inform if a certain change (a Translog.Location) is visible and if we can return from the indexing request. That said, I am happy to expose more APIs if necessary but we need to understand the purpose. The two points you listed can and should be solved differently. Lemme elaborate:

Retrieve metrics on refresh rates at a shard level rather than an average of all shard refreshes at an index level. This is nice for those of us that maintain clusters with hundreds of shards and the average refresh rate is not particularly useful for us.

this one is possible already you can call IndexShard#refreshStats

Build plugins that make use of shard level refresh data and perform actions such as application level cache invalidation based on when data on a specific shard is available for search

Cache invalidation should be done based on the DirectoryReader you obtain from IndexShard#acquireSearcher you should look closer into something like this:

Searcher searcher = indexShard.acquireSearcher("my_plugin");
DirectoryReader r = searcher. getDirectoryReader();
CacheHelper helper = r.getReaderCacheHelper();
CacheKey key = helper.getKey(); // use this as a cache key

helper.addClosedListener(key -> {
  // invalidate caches
});

Also there are 2 levels of refreshes we are differentiating between, Internal and External. For caches you need to look into internal refreshes and for visibility into external refreshes. Yet, I don't think caching should be done based on refresh callbacks.

Hope this helps, if you need more help or have other problems you need to discuss, don't hesitate.

@clandry94
Copy link
Author

Thanks for the info @s1monw. I didn't notice that refresh stats are available per shard. I'm guessing that the IndexShard#refreshStats are external refreshes? If thats the case, I think that will fulfill my needs and be a cleaner way to do what I'm trying to accomplish.

Regarding the cache invalidation, I think we are speaking about different things. By cache invalidation, I meant invalidating caches on something external to ES like a web application. For example, larger clusters with a high indexation rate generally need higher refresh intervals for optimal performance (30s to 1m) and minimizing the time between a document being created in the web application and visible for search in ES is tough to do without some sort of signal to know when to bust caches within the web application. This could be done using the wait_for flag on indexation, but that amounts to a sad attempt of making ES synchronous and does not scale well. Large, multi-tenant ES clusters might have deterministic routing in ES based on some kind of key, so I've been experimenting with invalidating the web application's caches based on the latest refresh that happened on the shard which maps to the previously mentioned key.

Does your comment about caching based on refresh callbacks still seem like a bad idea in this case since the cache is external to ES?

@s1monw
Copy link
Contributor

s1monw commented Dec 12, 2018

I didn't notice that refresh stats are available per shard. I'm guessing that the IndexShard#refreshStats are external refreshes? If thats the case, I think that will fulfill my needs and be a cleaner way to do what I'm trying to accomplish.

no these stats are the actual refreshes done on the lower level. I don't think we have stats that are on the outer level. This doesn't mean we can add them

This could be done using the wait_for flag on indexation, but that amounts to a sad attempt of making ES synchronous and does not scale well.

I have a question why you think that is is the case. ES will asynchronously wait for the refresh to happen, it's done in an efficient way. your client can also wait without blocking. Maybe you mean you will have a higher latency due to the response is coming back later?

Does your comment about caching based on refresh callbacks still seem like a bad idea in this case since the cache is external to ES?

I can't say yes or now at this point. I would like to understand your usecase better to give better advice. Can you elaborate on your problem a bit more what the cache is used for?

@s1monw
Copy link
Contributor

s1monw commented Dec 13, 2018

@clandry94 ping

@clandry94
Copy link
Author

clandry94 commented Dec 13, 2018

no these stats are the actual refreshes done on the lower level. I don't think we have stats that are on the outer level. This doesn't mean we can add them

I think that these could be very useful, especially for examining performance issues with hot shards e.g. examining the relationship of indexation rate to a specific shard, memory buffer size, and refresh rate. Or with the use-case for a greedy form of cache busting in an application using ES.

I have a question why you think that is is the case. ES will asynchronously wait for the refresh to happen, it's done in an efficient way. your client can also wait without blocking. Maybe you mean you will have a higher latency due to the response is coming back later?

My problem with wait_for is that it doesn't seem like it will scale well. Based on the docs here https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-refresh.html, there are a few areas of concern:

Never start multiple refresh=wait_for requests in a row. Instead batch them into a single bulk request with refresh=wait_for and Elasticsearch will start them all in parallel and return only when they have all finished.

Not starting multiple wait_for requests in row isn't a good option without extending the amount of time it takes to index a document at a high indexation rate. For example, with a refresh rate of 30 seconds and an indexation rate of something like 50,000 docs/s to an index, the time to index a document and have it be searchable would be:

batch 1: up to 30 seconds for the first batch
batch 2: time waiting for batch 1 to index (up to 30 seconds) + up to 30 seconds for batch 2
batch 3: time waiting for batch 1 to index (up to 30 seconds) + time waiting for batch 2 to index (up to 30 seconds) + up to 30 seconds for batch 3

and so on.

At the same time, the doc also notes what can happen if more than one wait_for request is made at once:

If a refresh=wait_for request comes in when there are already index.max_refresh_listeners (defaults to 1000) requests waiting for a refresh on that shard then that request will behave just as though it had refresh set to true instead: it will force a refresh.

Assuming we keep the defaults set here and the conditions in the example before and we index documents in batches of 500 docs per batch at 50,000 docs per second, that would amount to 100 refresh listeners added to the index per second and a refresh would be forced every 10 seconds. One option is that those numbers could be tuned to minimize the number of refreshes, but those don't come without performance tradeoffs as well.

I'm still investigating if this approach would work well at high load with proper tuning, but I just want to keep my options open as well.

I can't say yes or now at this point. I would like to understand your usecase better to give better advice. Can you elaborate on your problem a bit more what the cache is used for?

The cache i'm referring to is being used in a web application to load a large number of assets such as titles, metadata, descriptions, etc for objects displayed on the web page. The objects are cached after being created so that a fetch to MySQL doesn't need to be performed and to further avoid expensive queries to MySQL, the objects are retrieved from Elasticsearch whenever the cache is invalidated based on a time interval. However, since cache invalidations can happen while an object's corresponding ES document is still in the Elasticsearch memory buffer, a large number of the objects are out of date when a cache invalidation occurs and must wait until the next cache invalidation interval completes to retrieve the up-to-date objects from ES.

That is where my idea of invalidating the caches based on refresh events in individual shards is relevant. From the web application layer, I know on which shard all of the objects route to on Elasticsearch. Since I know this, I can form a mapping between an object and its shard ID. So, by tracking the refresh events of an individual ES shard, I can invalidate the cache each time a refresh occurs on its corresponding ES shard and improve my chances of retrieving up-to-date data from Elasticsearch with this smarter caching invalidation mechanism.

Hopefully this clarifies things a bit, there are a lot of moving parts going on 😆

@s1monw
Copy link
Contributor

s1monw commented Dec 14, 2018

@clandry94 thanks for the insights. I personally think building cache invalidation based on refreshes will be a very tricky and error prone construct. I would try hard to not go down that path. One thing that I can see being tricky is that whenever you get a callback on a refresh you can't tell if the change is visible on all replicas. This guarantee is different with wait_for. If you need to know when to invalidate a cache I would run a refresh manually ahead of the cache invalidation, is this something that you explored? I mean calling occasional refreshes will not have a big impact on ingest performance?

Another options is to set the wait_for only on selected indexing requests because once it returns you can be sure that all docs acked before this document will be visible. That way you don't run into issue of making things synchronous. not sure if that helps?

@clandry94
Copy link
Author

Another options is to set the wait_for only on selected indexing requests because once it returns you can be sure that all docs acked before this document will be visible.

I really like this approach, but we use Kafka to have a resilient indexation pipeline an unfortunately won't work for me.

On the other hand,

If you need to know when to invalidate a cache I would run a refresh manually ahead of the cache invalidation, is this something that you explored?

I'm going to explore this more. I think it could be a pretty resilient way to invalidate the caches since the cluster will fallback to its original refresh interval if whatever job/system/etc that forces refreshes fails.

Anyway, thank you for the help! I have a lot of new insight from this discussion. I still think that exposing external refresh stats at a shard level would be useful from a monitoring standpoint. I can take a look at implementing that if you think its worthwhile?

@s1monw
Copy link
Contributor

s1monw commented Dec 15, 2018

Anyway, thank you for the help! I have a lot of new insight from this discussion.

my pleasure! great conversation, happy it was helpful. Keep asking if you need more insights.

I still think that exposing external refresh stats at a shard level would be useful from a monitoring standpoint. I can take a look at implementing that if you think its worthwhile?

I agree we should have dedicated stats. Yet, if you execute a refresh with scope EXTERNAL you will also see it in the current stats. The reason is that it delegates to the internal refresh code. What we don't have is how many EXTERNAL refreshes we have vs INTERNAL having that number would totally be an improvement. I am happy to support you on the implementation side.

if you are ok with it, can you close this issue we can discuss further changes on dedicated issues but still can continue the conversation here if necessary. Reopening is always a possibility too.

@clandry94
Copy link
Author

Sure, I'll make a new issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard.
Projects
None yet
Development

No branches or pull requests

4 participants