Skip to content

Shard Search Scroll failures consistency #62061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import static org.hamcrest.Matchers.hasSize;

/**
* Integration test for retry behavior. Useful because retrying relies on the way that the
* Integration test for bulk retry behavior. Useful because retrying relies on the way that the
* rest of Elasticsearch throws exceptions and unit tests won't verify that.
*/
public class RetryTests extends ESIntegTestCase {
Expand All @@ -77,7 +77,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

/**
* Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried.
* Lower the queue sizes to be small enough that bulk will time out and have to be retried.
*/
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand Down Expand Up @@ -145,22 +145,15 @@ private void testCase(
BulkIndexByScrollResponseMatcher matcher)
throws Exception {
/*
* These test cases work by stuffing the search and bulk queues of a single node and
* making sure that we read and write from that node. Because of some "fun" with the
* way that searches work, we need at least one more node to act as the coordinating
* node for the search request. If we didn't do this then the searches would get stuck
* in the queue anyway because we force queue portions of the coordinating node's
* actions. This is not a big deal in normal operations but a real pain when you are
* intentionally stuffing queues hoping for a failure.
* These test cases work by stuffing the bulk queue of a single node and
* making sure that we read and write from that node.
*/

final Settings nodeSettings = Settings.builder()
// use pools of size 1 so we can block them
.put("thread_pool.write.size", 1)
.put("thread_pool.search.size", 1)
// use queues of size 1 because size 0 is broken and because search requests need the queue to function
// use queues of size 1 because size 0 is broken and because bulk requests need the queue to function
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.search.queue_size", 1)
.put("node.attr.color", "blue")
.build();
final String node = internalCluster().startDataOnlyNode(nodeSettings);
Expand All @@ -186,45 +179,25 @@ private void testCase(
assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures());
client().admin().indices().prepareRefresh("source").get();

logger.info("Blocking search");
CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH, node);

AbstractBulkByScrollRequestBuilder<?, ?> builder = request.apply(internalCluster().masterClient());
// Make sure we use more than one batch so we have to scroll
builder.source().setSize(DOC_COUNT / randomIntBetween(2, 10));

logger.info("Blocking bulk so we start to get bulk rejections");
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node);

logger.info("Starting request");
ActionFuture<BulkByScrollResponse> responseListener = builder.execute();

try {
logger.info("Waiting for search rejections on the initial search");
assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L)));

logger.info("Blocking bulk and unblocking search so we start to get bulk rejections");
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node);
initialSearchBlock.await();

logger.info("Waiting for bulk rejections");
assertBusy(() -> assertThat(taskStatus(action).getBulkRetries(), greaterThan(0L)));

// Keep a copy of the current number of search rejections so we can assert that we get more when we block the scroll
long initialSearchRejections = taskStatus(action).getSearchRetries();

logger.info("Blocking search and unblocking bulk so we should get search rejections for the scroll");
CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH, node);
bulkBlock.await();

logger.info("Waiting for search rejections for the scroll");
assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(initialSearchRejections)));

logger.info("Unblocking the scroll");
scrollBlock.await();

logger.info("Waiting for the request to finish");
BulkByScrollResponse response = responseListener.get();
assertThat(response, matcher);
assertThat(response.getBulkRetries(), greaterThan(0L));
assertThat(response.getSearchRetries(), greaterThan(initialSearchRejections));
} finally {
// Fetch the response just in case we blew up half way through. This will make sure the failure is thrown up to the top level.
BulkByScrollResponse response = responseListener.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,6 @@
* run separate fetch phases etc.
*/
abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements Runnable {
/*
* Some random TODO:
* Today we still have a dedicated executing mode for scrolls while we could simplify this by implementing
* scroll like functionality (mainly syntactic sugar) as an ordinary search with search_after. We could even go further and
* make the scroll entirely stateless and encode the state per shard in the scroll ID.
*
* Today we also hold a context per shard but maybe
* we want the context per coordinating node such that we route the scroll to the same coordinator all the time and hold the context
* here? This would have the advantage that if we loose that node the entire scroll is deal not just one shard.
*
* Additionally there is the possibility to associate the scroll with a seq. id. such that we can talk to any replica as long as
* the shards engine hasn't advanced that seq. id yet. Such a resume is possible and best effort, it could be even a safety net since
* if you rely on indices being read-only things can change in-between without notification or it's hard to detect if there where any
* changes while scrolling. These are all options to improve the current situation which we can look into down the road
*/
protected final Logger logger;
protected final ActionListener<SearchResponse> listener;
protected final ParsedScrollId scrollId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public abstract class Engine implements Closeable {
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; // TODO: Make source of search enum?
public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum?
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match";

protected final ShardId shardId;
protected final String allocationId;
Expand Down
Loading