Skip to content

Make Metadata extend AbstractCollection #83791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -86,7 +87,7 @@
* The details of how this is persisted are covered in {@link org.elasticsearch.gateway.PersistedClusterStateService}.
* </p>
*/
public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, ToXContentFragment {
public class Metadata extends AbstractCollection<IndexMetadata> implements Diffable<Metadata>, ToXContentFragment {

private static final Logger logger = LogManager.getLogger(Metadata.class);

Expand Down Expand Up @@ -898,6 +899,11 @@ public Iterator<IndexMetadata> iterator() {
return indices.valuesIt();
}

@Override
public int size() {
return indices.size();
}

public static boolean isGlobalStateEquals(Metadata metadata1, Metadata metadata2) {
if (metadata1.coordinationMetadata.equals(metadata2.coordinationMetadata) == false) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static java.util.Arrays.asList;
import static org.elasticsearch.action.support.master.MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT;
Expand Down Expand Up @@ -229,10 +228,10 @@ public void onResponse(final Collection<ActionResponse> responses) {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

ClusterStateResponse stateResponse = extractResponse(responses, ClusterStateResponse.class);
Map<String, IndexMetadata> indicesStates = StreamSupport.stream(
stateResponse.getState().getMetadata().spliterator(),
false
).collect(Collectors.toMap(indexMetadata -> indexMetadata.getIndex().getName(), Function.identity()));
Map<String, IndexMetadata> indicesStates = stateResponse.getState()
.getMetadata()
.stream()
.collect(Collectors.toMap(indexMetadata -> indexMetadata.getIndex().getName(), Function.identity()));

ClusterHealthResponse healthResponse = extractResponse(responses, ClusterHealthResponse.class);
Map<String, ClusterIndexHealth> indicesHealths = healthResponse.getIndices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* This decider looks at all indices and ensures a minimum capacity is available if any indices are in the frozen ILM phase, since that
Expand All @@ -45,7 +44,9 @@ public String name() {

@Override
public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDeciderContext context) {
List<String> indicesNeedingFrozen = StreamSupport.stream(context.state().metadata().spliterator(), false)
List<String> indicesNeedingFrozen = context.state()
.metadata()
.stream()
.filter(this::needsTier)
.map(imd -> imd.getIndex().getName())
.limit(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.StreamSupport;

/**
* This decider enforces that on a 64GB memory node (31GB heap) we can max have 2000 shards. We arrive at 2000 because our current limit is
Expand Down Expand Up @@ -58,7 +57,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
}

static int countFrozenShards(Metadata metadata) {
return StreamSupport.stream(metadata.spliterator(), false)
return metadata.stream()
.filter(imd -> FrozenUtils.isFrozenIndex(imd.getSettings()))
.mapToInt(IndexMetadata::getTotalNumberOfShards)
.sum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.StreamSupport;

public class FrozenStorageDeciderService implements AutoscalingDeciderService {
public static final String NAME = "frozen_storage";
Expand All @@ -42,7 +41,7 @@ public String name() {
@Override
public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDeciderContext context) {
Metadata metadata = context.state().metadata();
long dataSetSize = StreamSupport.stream(metadata.spliterator(), false)
long dataSetSize = metadata.stream()
.filter(imd -> FrozenUtils.isFrozenIndex(imd.getSettings()))
.mapToLong(imd -> estimateSize(imd, context.info()))
.sum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;

/**
* This class upgrades frozen indices to apply the index.shard_limit.group=frozen setting after all nodes have been upgraded to 7.13+
Expand Down Expand Up @@ -91,7 +90,8 @@ public void onFailure(Exception e) {
}

static boolean needsUpgrade(ClusterState state) {
return StreamSupport.stream(state.metadata().spliterator(), false)
return state.metadata()
.stream()
.filter(
imd -> imd.getCompatibilityVersion().onOrAfter(Version.V_7_12_0) && imd.getCompatibilityVersion().before(Version.V_8_0_0)
)
Expand All @@ -105,7 +105,8 @@ static ClusterState upgradeIndices(ClusterState currentState) {
return currentState;
}
Metadata.Builder builder = Metadata.builder(currentState.metadata());
StreamSupport.stream(currentState.metadata().spliterator(), false)
currentState.metadata()
.stream()
.filter(
imd -> imd.getCompatibilityVersion().onOrAfter(Version.V_7_12_0) && imd.getCompatibilityVersion().before(Version.V_8_0_0)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;

import java.util.stream.StreamSupport;

import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -71,7 +69,7 @@ public void testUpgradeIndices() {
assertThat(upgradedState, not(sameInstance(originalState)));
assertThat(upgradedState.metadata().indices().size(), equalTo(originalState.metadata().indices().size()));

assertTrue(StreamSupport.stream(upgradedState.metadata().spliterator(), false).anyMatch(upgraded -> {
assertTrue(upgradedState.metadata().stream().anyMatch(upgraded -> {
IndexMetadata original = originalState.metadata().index(upgraded.getIndex());
assertThat(original, notNullValue());
if (upgraded.isPartialSearchableSnapshot() == false
Expand Down