Skip to content

Add callback for publication of new cluster state #15494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ default boolean runOnlyOnMaster() {
return true;
}

/**
* Callback invoked after new cluster state is published. Note that
* this method is not invoked if the cluster state was not updated.
*/
default void clusterStatePublished(ClusterState newClusterState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doubtful. I wonder if we should do this or follow the proven path we have in the listener interface where we have a processed callback (changed or not). I'm leaning towards the later as it gives more options (with the burden of an extra equality check).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I went back and forth, but opted for this simpler interface since for now the use case we have in mind doesn't need the callback on no change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I'm with you.

}

/**
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
}

executor.clusterStatePublished(newClusterState);

TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, source);
Expand Down
30 changes: 27 additions & 3 deletions core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -53,7 +60,11 @@

import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

/**
*
Expand Down Expand Up @@ -753,18 +764,30 @@ public void execute() {

class TaskExecutor implements ClusterStateTaskExecutor<Task> {
private AtomicInteger counter = new AtomicInteger();
private AtomicInteger batches = new AtomicInteger();
private AtomicInteger published = new AtomicInteger();

@Override
public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
tasks.forEach(task -> task.execute());
counter.addAndGet(tasks.size());
return BatchResult.<Task>builder().successes(tasks).build(currentState);
ClusterState maybeUpdatedClusterState = currentState;
if (randomBoolean()) {
maybeUpdatedClusterState = ClusterState.builder(currentState).build();
batches.incrementAndGet();
}
return BatchResult.<Task>builder().successes(tasks).build(maybeUpdatedClusterState);
}

@Override
public boolean runOnlyOnMaster() {
return false;
}

@Override
public void clusterStatePublished(ClusterState newClusterState) {
published.incrementAndGet();
}
}
int numberOfThreads = randomIntBetween(2, 8);
int tasksSubmittedPerThread = randomIntBetween(1, 1024);
Expand Down Expand Up @@ -838,6 +861,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
for (TaskExecutor executor : executors) {
if (counts.containsKey(executor)) {
assertEquals((int) counts.get(executor), executor.counter.get());
assertEquals(executor.batches.get(), executor.published.get());
}
}

Expand Down