Skip to content

Commit f753fa2

Browse files
committed
HttpHandlers should return correct list of objects (#49283)
This commit fixes the server side logic of "List Objects" operations of Azure and S3 fixtures. Until today, the fixtures were returning a " flat" view of stored objects and were not correctly handling the delimiter parameter. This causes some objects listing to be wrongly interpreted by the snapshot deletion logic in Elasticsearch which relies on the ability to list child containers of BlobContainer (#42653) to correctly delete stale indices. As a consequence, the blobs were not correctly deleted from the emulated storage service and stayed in heap until they got garbage collected, causing CI failures like #48978. This commit fixes the server side logic of Azure and S3 fixture when listing objects so that it now return correct common blob prefixes as expected by the snapshot deletion process. It also adds an after-test check to ensure that tests leave the repository empty (besides the root index files). Closes #48978
1 parent 4d6e037 commit f753fa2

File tree

8 files changed

+131
-27
lines changed

8 files changed

+131
-27
lines changed

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6464

6565
@Override
6666
protected Map<String, HttpHandler> createHttpHandlers() {
67-
return Collections.singletonMap("/container", new AzureHttpHandler("container"));
67+
return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container"));
6868
}
6969

7070
@Override
@@ -115,6 +115,14 @@ BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
115115
}
116116
}
117117

118+
@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint")
119+
private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler {
120+
121+
AzureBlobStoreHttpHandler(final String container) {
122+
super(container);
123+
}
124+
}
125+
118126
/**
119127
* HTTP handler that injects random Azure service errors
120128
*

plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.storage.StorageOptions;
2525
import com.sun.net.httpserver.HttpExchange;
2626
import com.sun.net.httpserver.HttpHandler;
27+
import fixture.gcs.FakeOAuth2HttpHandler;
2728
import fixture.gcs.GoogleCloudStorageHttpHandler;
2829
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2930
import org.elasticsearch.common.SuppressForbidden;
@@ -77,8 +78,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
7778
@Override
7879
protected Map<String, HttpHandler> createHttpHandlers() {
7980
final Map<String, HttpHandler> handlers = new HashMap<>(2);
80-
handlers.put("/", new GoogleCloudStorageHttpHandler("bucket"));
81-
handlers.put("/token", new fixture.gcs.FakeOAuth2HttpHandler());
81+
handlers.put("/", new GoogleCloudStorageBlobStoreHttpHandler("bucket"));
82+
handlers.put("/token", new FakeOAuth2HttpHandler());
8283
return Collections.unmodifiableMap(handlers);
8384
}
8485

@@ -186,6 +187,14 @@ long getLargeBlobThresholdInBytes() {
186187
}
187188
}
188189

190+
@SuppressForbidden(reason = "this test uses a HttpHandler to emulate a Google Cloud Storage endpoint")
191+
private static class GoogleCloudStorageBlobStoreHttpHandler extends GoogleCloudStorageHttpHandler implements BlobStoreHttpHandler {
192+
193+
GoogleCloudStorageBlobStoreHttpHandler(final String bucket) {
194+
super(bucket);
195+
}
196+
}
197+
189198
/**
190199
* HTTP handler that injects random Google Cloud Storage service errors
191200
*

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6767

6868
@Override
6969
protected Map<String, HttpHandler> createHttpHandlers() {
70-
return Collections.singletonMap("/bucket", new S3HttpHandler("bucket"));
70+
return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket"));
7171
}
7272

7373
@Override
@@ -134,6 +134,14 @@ void ensureMultiPartUploadSize(long blobSize) {
134134
}
135135
}
136136

137+
@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint")
138+
private static class S3BlobStoreHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler {
139+
140+
S3BlobStoreHttpHandler(final String bucket) {
141+
super(bucket);
142+
}
143+
}
144+
137145
/**
138146
* HTTP handler that injects random S3 service errors
139147
*

test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
import java.nio.charset.StandardCharsets;
3737
import java.util.Arrays;
3838
import java.util.HashMap;
39+
import java.util.HashSet;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Objects;
43+
import java.util.Set;
4244
import java.util.concurrent.ConcurrentHashMap;
4345
import java.util.regex.Matcher;
4446
import java.util.regex.Pattern;
@@ -153,13 +155,32 @@ public void handle(final HttpExchange exchange) throws IOException {
153155
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
154156
list.append("<EnumerationResults>");
155157
final String prefix = params.get("prefix");
158+
final Set<String> blobPrefixes = new HashSet<>();
159+
final String delimiter = params.get("delimiter");
160+
if (delimiter != null) {
161+
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
162+
}
156163
list.append("<Blobs>");
157164
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
158-
if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) {
159-
list.append("<Blob><Name>").append(blob.getKey().replace("/" + container + "/", "")).append("</Name>");
160-
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
161-
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
165+
if (prefix != null && blob.getKey().startsWith("/" + container + "/" + prefix) == false) {
166+
continue;
167+
}
168+
String blobPath = blob.getKey().replace("/" + container + "/", "");
169+
if (delimiter != null) {
170+
int fromIndex = (prefix != null ? prefix.length() : 0);
171+
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
172+
if (delimiterPosition > 0) {
173+
blobPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
174+
continue;
175+
}
162176
}
177+
list.append("<Blob><Name>").append(blobPath).append("</Name>");
178+
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
179+
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
180+
}
181+
if (blobPrefixes.isEmpty() == false) {
182+
blobPrefixes.forEach(p -> list.append("<BlobPrefix><Name>").append(p).append("</Name></BlobPrefix>"));
183+
163184
}
164185
list.append("</Blobs>");
165186
list.append("</EnumerationResults>");
@@ -177,6 +198,10 @@ public void handle(final HttpExchange exchange) throws IOException {
177198
}
178199
}
179200

201+
public Map<String, BytesReference> blobs() {
202+
return blobs;
203+
}
204+
180205
public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException {
181206
final Headers headers = exchange.getResponseHeaders();
182207
headers.add("Content-Type", "application/xml");

test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.SuppressForbidden;
2626
import org.elasticsearch.common.UUIDs;
2727
import org.elasticsearch.common.bytes.BytesArray;
28+
import org.elasticsearch.common.bytes.BytesReference;
2829
import org.elasticsearch.common.collect.Tuple;
2930
import org.elasticsearch.common.io.Streams;
3031
import org.elasticsearch.common.network.InetAddresses;
@@ -64,7 +65,7 @@
6465
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
6566
public class GoogleCloudStorageHttpHandler implements HttpHandler {
6667

67-
private final ConcurrentMap<String, BytesArray> blobs;
68+
private final ConcurrentMap<String, BytesReference> blobs;
6869
private final String bucket;
6970

7071
public GoogleCloudStorageHttpHandler(final String bucket) {
@@ -86,7 +87,7 @@ public void handle(final HttpExchange exchange) throws IOException {
8687
final Set<String> prefixes = new HashSet<>();
8788
final List<String> listOfBlobs = new ArrayList<>();
8889

89-
for (final Map.Entry<String, BytesArray> blob : blobs.entrySet()) {
90+
for (final Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
9091
final String blobName = blob.getKey();
9192
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
9293
int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
@@ -122,15 +123,15 @@ public void handle(final HttpExchange exchange) throws IOException {
122123

123124
} else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) {
124125
// Download Object https://cloud.google.com/storage/docs/request-body
125-
BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
126+
BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
126127
if (blob != null) {
127128
final String range = exchange.getRequestHeaders().getFirst("Range");
128129
Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
129130
if (matcher.find() == false) {
130131
throw new AssertionError("Range bytes header does not match expected format: " + range);
131132
}
132133

133-
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
134+
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0];
134135
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
135136
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
136137
exchange.getResponseBody().write(response);
@@ -141,8 +142,8 @@ public void handle(final HttpExchange exchange) throws IOException {
141142
} else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) {
142143
// Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete
143144
int deletions = 0;
144-
for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
145-
Map.Entry<String, BytesArray> blob = iterator.next();
145+
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
146+
Map.Entry<String, BytesReference> blob = iterator.next();
146147
if (blob.getKey().equals(exchange.getRequestURI().toString())) {
147148
iterator.remove();
148149
deletions++;
@@ -209,12 +210,11 @@ public void handle(final HttpExchange exchange) throws IOException {
209210
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
210211

211212
final String blobName = params.get("test_blob_name");
212-
byte[] blob = blobs.get(blobName).array();
213-
if (blob == null) {
213+
if (blobs.containsKey(blobName) == false) {
214214
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
215215
return;
216216
}
217-
217+
byte[] blob = BytesReference.toBytes(blobs.get(blobName));
218218
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
219219
final Integer limit = getContentRangeLimit(range);
220220
final int start = getContentRangeStart(range);
@@ -250,6 +250,10 @@ public void handle(final HttpExchange exchange) throws IOException {
250250
}
251251
}
252252

253+
public Map<String, BytesReference> blobs() {
254+
return blobs;
255+
}
256+
253257
private String httpServerUrl(final HttpExchange exchange) {
254258
final InetSocketAddress address = exchange.getLocalAddress();
255259
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@
4141
import java.nio.charset.StandardCharsets;
4242
import java.security.MessageDigest;
4343
import java.util.HashMap;
44+
import java.util.HashSet;
4445
import java.util.Iterator;
4546
import java.util.Locale;
4647
import java.util.Map;
4748
import java.util.Objects;
49+
import java.util.Set;
4850
import java.util.concurrent.ConcurrentHashMap;
4951
import java.util.concurrent.ConcurrentMap;
5052
import java.util.regex.Matcher;
@@ -158,13 +160,34 @@ public void handle(final HttpExchange exchange) throws IOException {
158160
if (prefix != null) {
159161
list.append("<Prefix>").append(prefix).append("</Prefix>");
160162
}
163+
final Set<String> commonPrefixes = new HashSet<>();
164+
final String delimiter = params.get("delimiter");
165+
if (delimiter != null) {
166+
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
167+
}
161168
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
162-
if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) {
163-
list.append("<Contents>");
164-
list.append("<Key>").append(blob.getKey().replace("/" + bucket + "/", "")).append("</Key>");
165-
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
166-
list.append("</Contents>");
169+
if (prefix != null && blob.getKey().startsWith("/" + bucket + "/" + prefix) == false) {
170+
continue;
171+
}
172+
String blobPath = blob.getKey().replace("/" + bucket + "/", "");
173+
if (delimiter != null) {
174+
int fromIndex = (prefix != null ? prefix.length() : 0);
175+
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
176+
if (delimiterPosition > 0) {
177+
commonPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
178+
continue;
179+
}
167180
}
181+
list.append("<Contents>");
182+
list.append("<Key>").append(blobPath).append("</Key>");
183+
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
184+
list.append("</Contents>");
185+
}
186+
if (commonPrefixes.isEmpty() == false) {
187+
list.append("<CommonPrefixes>");
188+
commonPrefixes.forEach(commonPrefix -> list.append("<Prefix>").append(commonPrefix).append("</Prefix>"));
189+
list.append("</CommonPrefixes>");
190+
168191
}
169192
list.append("</ListBucketResult>");
170193

@@ -241,6 +264,10 @@ public void handle(final HttpExchange exchange) throws IOException {
241264
}
242265
}
243266

267+
public Map<String, BytesReference> blobs() {
268+
return blobs;
269+
}
270+
244271
private static String multipartKey(final String uploadId, int partNumber) {
245272
return uploadId + "\n" + partNumber;
246273
}

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.List;
4444
import java.util.Locale;
4545
import java.util.Set;
46-
import java.util.concurrent.ExecutionException;
4746

4847
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4948
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -272,9 +271,11 @@ public void testIndicesDeletedFromRepository() throws Exception {
272271
assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index
273272
}
274273
}
274+
275+
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "test-snap2").get());
275276
}
276277

277-
protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
278+
protected void addRandomDocuments(String name, int numDocs) throws InterruptedException {
278279
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
279280
for (int i = 0; i < numDocs; i++) {
280281
indexRequestBuilders[i] = client().prepareIndex(name, name, Integer.toString(i))

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.metadata.IndexMetaData;
2929
import org.elasticsearch.common.Strings;
3030
import org.elasticsearch.common.SuppressForbidden;
31+
import org.elasticsearch.common.bytes.BytesReference;
3132
import org.elasticsearch.common.network.InetAddresses;
3233
import org.elasticsearch.common.settings.Settings;
3334
import org.elasticsearch.mocksocket.MockHttpServer;
@@ -41,20 +42,31 @@
4142
import java.io.InputStream;
4243
import java.net.InetAddress;
4344
import java.net.InetSocketAddress;
45+
import java.util.List;
4446
import java.util.Map;
4547
import java.util.concurrent.ConcurrentHashMap;
4648
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.stream.Collectors;
4750

4851
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4952
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
5053
import static org.hamcrest.Matchers.equalTo;
54+
import static org.hamcrest.Matchers.hasSize;
5155

5256
/**
5357
* Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services.
5458
*/
5559
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
5660
public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase {
5761

62+
/**
63+
* A {@link HttpHandler} that allows to list stored blobs
64+
*/
65+
@SuppressForbidden(reason = "Uses a HttpServer to emulate a cloud-based storage service")
66+
protected interface BlobStoreHttpHandler extends HttpHandler {
67+
Map<String, BytesReference> blobs();
68+
}
69+
5870
private static final byte[] BUFFER = new byte[1024];
5971

6072
private static HttpServer httpServer;
@@ -81,7 +93,14 @@ public static void stopHttpServer() {
8193
@After
8294
public void tearDownHttpServer() {
8395
if (handlers != null) {
84-
handlers.keySet().forEach(context -> httpServer.removeContext(context));
96+
for(Map.Entry<String, HttpHandler> handler : handlers.entrySet()) {
97+
httpServer.removeContext(handler.getKey());
98+
if (handler.getValue() instanceof BlobStoreHttpHandler) {
99+
List<String> blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream()
100+
.filter(blob -> blob.contains("index") == false).collect(Collectors.toList());
101+
assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0));
102+
}
103+
}
85104
}
86105
}
87106

@@ -110,14 +129,17 @@ public final void testSnapshotWithLargeSegmentFiles() throws Exception {
110129
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
111130
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
112131

113-
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
132+
final String snapshot = "snapshot";
133+
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
114134
.setWaitForCompletion(true).setIndices(index));
115135

116136
assertAcked(client().admin().indices().prepareDelete(index));
117137

118-
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
138+
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
119139
ensureGreen(index);
120140
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
141+
142+
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
121143
}
122144

123145
protected static String httpServerUrl() {

0 commit comments

Comments
 (0)