Skip to content

Batch Index Settings Update Requests #82896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 25, 2022
Merged
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -50,13 +51,12 @@ public class MetadataUpdateSettingsService {
private static final Logger logger = LogManager.getLogger(MetadataUpdateSettingsService.class);

private final ClusterService clusterService;

private final AllocationService allocationService;

private final IndexScopedSettings indexScopedSettings;
private final IndicesService indicesService;
private final ShardLimitValidator shardLimitValidator;
private final ThreadPool threadPool;
private final ClusterStateTaskExecutor<AckedClusterStateUpdateTask> executor;

public MetadataUpdateSettingsService(
ClusterService clusterService,
Expand All @@ -67,11 +67,28 @@ public MetadataUpdateSettingsService(
ThreadPool threadPool
) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.allocationService = allocationService;
this.indexScopedSettings = indexScopedSettings;
this.indicesService = indicesService;
this.shardLimitValidator = shardLimitValidator;
this.threadPool = threadPool;
this.executor = (currentState, tasks) -> {
ClusterTasksResult.Builder<AckedClusterStateUpdateTask> builder = ClusterTasksResult.builder();
ClusterState state = currentState;
for (AckedClusterStateUpdateTask task : tasks) {
try {
state = task.execute(state);
builder.success(task);
} catch (Exception e) {
builder.failure(task, e);
}
}
if (state != currentState) {
// reroute in case things change that require it (like number of replicas)
state = allocationService.reroute(state, "settings update");
}
return builder.build(state);
};
}

public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
Expand Down Expand Up @@ -105,149 +122,153 @@ public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request
final Settings openSettings = settingsForOpenIndices.build();
final boolean preserveExisting = request.isPreserveExisting();

clusterService.submitStateUpdateTask(
"update-settings " + Arrays.toString(request.indices()),
new AckedClusterStateUpdateTask(Priority.URGENT, request, wrapPreservingContext(listener, threadPool.getThreadContext())) {

@Override
public ClusterState execute(ClusterState currentState) {
// TODO: move this to custom class instead of AckedClusterStateUpdateTask
AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask(
Priority.URGENT,
request,
wrapPreservingContext(listener, threadPool.getThreadContext())
) {
@Override
public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder routingTableBuilder = null;
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());

RoutingTable.Builder routingTableBuilder = null;
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());

// allow to change any settings to a close index, and only allow dynamic settings to be changed
// on an open index
Set<Index> openIndices = new HashSet<>();
Set<Index> closeIndices = new HashSet<>();
final String[] actualIndices = new String[request.indices().length];
for (int i = 0; i < request.indices().length; i++) {
Index index = request.indices()[i];
actualIndices[i] = index.getName();
final IndexMetadata metadata = currentState.metadata().getIndexSafe(index);
if (metadata.getState() == IndexMetadata.State.OPEN) {
openIndices.add(index);
} else {
closeIndices.add(index);
}
// allow to change any settings to a closed index, and only allow dynamic settings to be changed
// on an open index
Set<Index> openIndices = new HashSet<>();
Set<Index> closedIndices = new HashSet<>();
final String[] actualIndices = new String[request.indices().length];
for (int i = 0; i < request.indices().length; i++) {
Index index = request.indices()[i];
actualIndices[i] = index.getName();
final IndexMetadata metadata = currentState.metadata().getIndexSafe(index);
if (metadata.getState() == IndexMetadata.State.OPEN) {
openIndices.add(index);
} else {
closedIndices.add(index);
}
}

if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Can't update non dynamic settings [%s] for open indices %s",
skippedSettings,
openIndices
)
);
}
if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Can't update non dynamic settings [%s] for open indices %s",
skippedSettings,
openIndices
)
);
}

if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) {
final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
if (preserveExisting == false) {
// Verify that this won't take us over the cluster shard limit.
shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);
if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) {
final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
if (preserveExisting == false) {
// Verify that this won't take us over the cluster shard limit.
shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);

/*
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation
* which makes these copies stale.
*
* TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
*/
routingTableBuilder = RoutingTable.builder(currentState.routingTable());
routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
}
/*
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation
* which makes these copies stale.
*
* TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
*/
routingTableBuilder = RoutingTable.builder(currentState.routingTable());
routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
}
}

updateIndexSettings(
openIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateDynamicSettings(
openSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);
updateIndexSettings(
openIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateDynamicSettings(
openSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);

updateIndexSettings(
closeIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateSettings(
closedSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);
updateIndexSettings(
closedIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateSettings(
closedSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);

if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings)
|| IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) {
for (String index : actualIndices) {
final Settings settings = metadataBuilder.get(index).getSettings();
MetadataCreateIndexService.validateTranslogRetentionSettings(settings);
MetadataCreateIndexService.validateStoreTypeSetting(settings);
}
if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings)
|| IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) {
for (String index : actualIndices) {
final Settings settings = metadataBuilder.get(index).getSettings();
MetadataCreateIndexService.validateTranslogRetentionSettings(settings);
MetadataCreateIndexService.validateStoreTypeSetting(settings);
}
boolean changed = false;
// increment settings versions
for (final String index : actualIndices) {
if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) {
changed = true;
final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index));
builder.settingsVersion(1 + builder.settingsVersion());
metadataBuilder.put(builder);
}
}
boolean changed = false;
// increment settings versions
for (final String index : actualIndices) {
if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) {
changed = true;
final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index));
builder.settingsVersion(1 + builder.settingsVersion());
metadataBuilder.put(builder);
}
}

final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
boolean changedBlocks = false;
for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
}
changed |= changedBlocks;
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
boolean changedBlocks = false;
for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
}
changed |= changedBlocks;

if (changed == false) {
return currentState;
}
if (changed == false) {
return currentState;
}

ClusterState updatedState = ClusterState.builder(currentState)
.metadata(metadataBuilder)
.routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build())
.blocks(changedBlocks ? blocks.build() : currentState.blocks())
.build();

ClusterState updatedState = ClusterState.builder(currentState)
.metadata(metadataBuilder)
.routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build())
.blocks(changedBlocks ? blocks.build() : currentState.blocks())
.build();
// we need to tweak auto expand replicas in order to avoid tripping assertions in
// AllocationService.reroute(RoutingAllocation allocation) -- this is far from ideal
updatedState = allocationService.adaptAutoExpandReplicas(updatedState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to share the discussion we had on this:
This may be something we want to fix in some form. It's somewhat weird that reroute forces an assertion on us that ensure that this was called. Maybe this logic should be part of the settings update itself somehow so we don't even have to grind through all the indices here to check if any of them require an update for auto-expand-replicas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is here:

private ClusterState executeClusterStateUpdateTask(ClusterState state, Runnable runnable) {
ClusterState[] result = new ClusterState[1];
doAnswer(invocationOnMock -> {
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1];
result[0] = task.execute(state);
return null;
}).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class), any());
runnable.run();
assertThat(result[0], notNullValue());
return result[0];
}

In these tests we run each individual ClusterStateUpdateTask but with this change we moved the tidy-up reroute() call outside the individual tasks so it's not running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! I think 92bc39d should do the trick then, yes?


// now, reroute in case things change that require it (like number of replicas)
updatedState = allocationService.reroute(updatedState, "settings update");
try {
for (Index index : openIndices) {
final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
}
for (Index index : closeIndices) {
final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
// Verifies that the current index settings can be updated with the updated dynamic settings.
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
// Now check that we can create the index with the updated settings (dynamic and non-dynamic).
// This step is mandatory since we allow to update non-dynamic settings on closed indices.
indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata);
}
} catch (IOException ex) {
throw ExceptionsHelper.convertToElastic(ex);
try {
for (Index index : openIndices) {
final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
}
return updatedState;
for (Index index : closedIndices) {
final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
// Verifies that the current index settings can be updated with the updated dynamic settings.
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
// Now check that we can create the index with the updated settings (dynamic and non-dynamic).
// This step is mandatory since we allow to update non-dynamic settings on closed indices.
indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata);
}
} catch (IOException ex) {
throw ExceptionsHelper.convertToElastic(ex);
}
},
ClusterStateTaskExecutor.unbatched()
);

return updatedState;
}
};

clusterService.submitStateUpdateTask("update-settings " + Arrays.toString(request.indices()), clusterTask, this.executor);
}

public static void updateIndexSettings(
Expand All @@ -256,7 +277,6 @@ public static void updateIndexSettings(
BiFunction<Index, Settings.Builder, Boolean> settingUpdater,
Boolean preserveExisting,
IndexScopedSettings indexScopedSettings

) {
for (Index index : indices) {
IndexMetadata indexMetadata = metadataBuilder.getSafe(index);
Expand Down