19
19
20
20
package org .elasticsearch .repositories .gcs ;
21
21
22
- import com .google .cloud .storage .Storage ;
22
+ import com .sun .net .httpserver .HttpExchange ;
23
+ import com .sun .net .httpserver .HttpHandler ;
24
+ import com .sun .net .httpserver .HttpServer ;
25
+ import org .apache .http .HttpStatus ;
23
26
import org .elasticsearch .cluster .metadata .RepositoryMetaData ;
27
+ import org .elasticsearch .common .Strings ;
28
+ import org .elasticsearch .common .SuppressForbidden ;
29
+ import org .elasticsearch .common .bytes .BytesArray ;
30
+ import org .elasticsearch .common .bytes .BytesReference ;
31
+ import org .elasticsearch .common .io .Streams ;
32
+ import org .elasticsearch .common .network .InetAddresses ;
33
+ import org .elasticsearch .common .regex .Regex ;
34
+ import org .elasticsearch .common .settings .MockSecureSettings ;
24
35
import org .elasticsearch .common .settings .Settings ;
25
36
import org .elasticsearch .common .unit .ByteSizeUnit ;
26
37
import org .elasticsearch .common .unit .ByteSizeValue ;
38
+ import org .elasticsearch .common .xcontent .XContentBuilder ;
39
+ import org .elasticsearch .common .xcontent .XContentType ;
40
+ import org .elasticsearch .mocksocket .MockHttpServer ;
27
41
import org .elasticsearch .plugins .Plugin ;
28
42
import org .elasticsearch .repositories .blobstore .ESBlobStoreRepositoryIntegTestCase ;
43
+ import org .elasticsearch .rest .RestStatus ;
44
+ import org .elasticsearch .rest .RestUtils ;
29
45
import org .junit .After ;
46
+ import org .junit .AfterClass ;
47
+ import org .junit .Before ;
48
+ import org .junit .BeforeClass ;
30
49
50
+ import java .io .BufferedInputStream ;
51
+ import java .io .ByteArrayOutputStream ;
52
+ import java .io .IOException ;
53
+ import java .net .InetAddress ;
54
+ import java .net .InetSocketAddress ;
55
+ import java .net .URLDecoder ;
56
+ import java .security .KeyPairGenerator ;
57
+ import java .util .Arrays ;
58
+ import java .util .Base64 ;
31
59
import java .util .Collection ;
32
60
import java .util .Collections ;
61
+ import java .util .HashMap ;
62
+ import java .util .Iterator ;
63
+ import java .util .List ;
64
+ import java .util .Locale ;
65
+ import java .util .Map ;
66
+ import java .util .UUID ;
33
67
import java .util .concurrent .ConcurrentHashMap ;
34
68
import java .util .concurrent .ConcurrentMap ;
69
+ import java .util .regex .Matcher ;
70
+ import java .util .regex .Pattern ;
71
+ import java .util .stream .Collectors ;
72
+ import java .util .zip .GZIPInputStream ;
35
73
74
+ import static java .nio .charset .StandardCharsets .UTF_8 ;
75
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .CREDENTIALS_FILE_SETTING ;
76
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .ENDPOINT_SETTING ;
77
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .TOKEN_URI_SETTING ;
78
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageRepository .BUCKET ;
79
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageRepository .CLIENT_NAME ;
80
+
81
+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint" )
36
82
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
37
83
38
- private static final String BUCKET = "gcs-repository-test" ;
84
+ private static HttpServer httpServer ;
85
+ private static byte [] serviceAccount ;
86
+
87
+ @ BeforeClass
88
+ public static void startHttpServer () throws Exception {
89
+ httpServer = MockHttpServer .createHttp (new InetSocketAddress (InetAddress .getLoopbackAddress (), 0 ), 0 );
90
+ httpServer .start ();
91
+ serviceAccount = createServiceAccount ();
92
+ }
93
+
94
+ @ Before
95
+ public void setUpHttpServer () {
96
+ httpServer .createContext ("/" , new InternalHttpHandler ());
97
+ httpServer .createContext ("/token" , new FakeOAuth2HttpHandler ());
98
+ }
39
99
40
- // Static list of blobs shared among all nodes in order to act like a remote repository service:
41
- // all nodes must see the same content
42
- private static final ConcurrentMap <String , byte []> blobs = new ConcurrentHashMap <>();
100
+ @ AfterClass
101
+ public static void stopHttpServer () {
102
+ httpServer .stop (0 );
103
+ httpServer = null ;
104
+ }
105
+
106
+ @ After
107
+ public void tearDownHttpServer () {
108
+ httpServer .removeContext ("/" );
109
+ httpServer .removeContext ("/token" );
110
+ }
43
111
44
112
@ Override
45
113
protected String repositoryType () {
@@ -50,38 +118,31 @@ protected String repositoryType() {
50
118
protected Settings repositorySettings () {
51
119
return Settings .builder ()
52
120
.put (super .repositorySettings ())
53
- .put ("bucket" , BUCKET )
54
- .put ("base_path" , GoogleCloudStorageBlobStoreRepositoryTests . class . getSimpleName () )
121
+ .put (BUCKET . getKey (), "bucket" )
122
+ .put (CLIENT_NAME . getKey (), "test" )
55
123
.build ();
56
124
}
57
125
58
126
@ Override
59
127
protected Collection <Class <? extends Plugin >> nodePlugins () {
60
- return Collections .singletonList (MockGoogleCloudStoragePlugin .class );
61
- }
62
-
63
- @ After
64
- public void wipeRepository () {
65
- blobs .clear ();
128
+ return Collections .singletonList (GoogleCloudStoragePlugin .class );
66
129
}
67
130
68
- public static class MockGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
131
+ @ Override
132
+ protected Settings nodeSettings (int nodeOrdinal ) {
133
+ final Settings .Builder settings = Settings .builder ();
134
+ settings .put (super .nodeSettings (nodeOrdinal ));
69
135
70
- public MockGoogleCloudStoragePlugin (final Settings settings ) {
71
- super (settings );
72
- }
136
+ final InetSocketAddress address = httpServer .getAddress ();
137
+ final String endpoint = "http://" + InetAddresses .toUriString (address .getAddress ()) + ":" + address .getPort ();
138
+ settings .put (ENDPOINT_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), endpoint );
139
+ settings .put (TOKEN_URI_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), endpoint + "/token" );
73
140
74
- @ Override
75
- protected GoogleCloudStorageService createStorageService () {
76
- return new MockGoogleCloudStorageService ();
77
- }
78
- }
141
+ final MockSecureSettings secureSettings = new MockSecureSettings ();
142
+ secureSettings .setFile (CREDENTIALS_FILE_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), serviceAccount );
143
+ settings .setSecureSettings (secureSettings );
79
144
80
- public static class MockGoogleCloudStorageService extends GoogleCloudStorageService {
81
- @ Override
82
- public Storage client (String clientName ) {
83
- return new MockStorage (BUCKET , blobs );
84
- }
145
+ return settings .build ();
85
146
}
86
147
87
148
public void testChunkSize () {
@@ -121,4 +182,198 @@ public void testChunkSize() {
121
182
});
122
183
assertEquals ("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]" , e .getMessage ());
123
184
}
185
+
186
+ private static byte [] createServiceAccount () throws Exception {
187
+ final KeyPairGenerator keyPairGenerator = KeyPairGenerator .getInstance ("RSA" );
188
+ keyPairGenerator .initialize (1024 );
189
+ final String privateKey = Base64 .getEncoder ().encodeToString (keyPairGenerator .generateKeyPair ().getPrivate ().getEncoded ());
190
+
191
+ final ByteArrayOutputStream out = new ByteArrayOutputStream ();
192
+ try (XContentBuilder builder = new XContentBuilder (XContentType .JSON .xContent (), out )) {
193
+ builder .startObject ();
194
+ {
195
+ builder .field ("type" , "service_account" );
196
+ builder .field ("project_id" , getTestClass ().getName ().toLowerCase (Locale .ROOT ));
197
+ builder .field ("private_key_id" , UUID .randomUUID ().toString ());
198
+ builder .field ("private_key" , "-----BEGIN PRIVATE KEY-----\n " + privateKey + "\n -----END PRIVATE KEY-----\n " );
199
+ builder .
field (
"client_email" ,
"[email protected] " );
200
+ builder .field ("client_id" , String .valueOf (randomNonNegativeLong ()));
201
+ }
202
+ builder .endObject ();
203
+ }
204
+ return out .toByteArray ();
205
+ }
206
+
207
+ /**
208
+ * Minimal HTTP handler that acts as a Google Cloud Storage compliant server
209
+ *
210
+ * Note: it does not support resumable uploads
211
+ */
212
+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint" )
213
+ private static class InternalHttpHandler implements HttpHandler {
214
+
215
+ private final ConcurrentMap <String , BytesReference > blobs = new ConcurrentHashMap <>();
216
+
217
+ @ Override
218
+ public void handle (final HttpExchange exchange ) throws IOException {
219
+ final String request = exchange .getRequestMethod () + " " + exchange .getRequestURI ().toString ();
220
+ try {
221
+ if (Regex .simpleMatch ("GET /storage/v1/b/bucket/o*" , request )) {
222
+ final Map <String , String > params = new HashMap <>();
223
+ RestUtils .decodeQueryString (exchange .getRequestURI ().getQuery (), 0 , params );
224
+ final String prefix = params .get ("prefix" );
225
+
226
+ final List <Map .Entry <String , BytesReference >> listOfBlobs = blobs .entrySet ().stream ()
227
+ .filter (blob -> prefix == null || blob .getKey ().startsWith (prefix )).collect (Collectors .toList ());
228
+
229
+ final StringBuilder list = new StringBuilder ();
230
+ list .append ("{\" kind\" :\" storage#objects\" ,\" items\" :[" );
231
+ for (Iterator <Map .Entry <String , BytesReference >> it = listOfBlobs .iterator (); it .hasNext (); ) {
232
+ Map .Entry <String , BytesReference > blob = it .next ();
233
+ list .append ("{\" kind\" :\" storage#object\" ," );
234
+ list .append ("\" bucket\" :\" bucket\" ," );
235
+ list .append ("\" name\" :\" " ).append (blob .getKey ()).append ("\" ," );
236
+ list .append ("\" id\" :\" " ).append (blob .getKey ()).append ("\" ," );
237
+ list .append ("\" size\" :\" " ).append (blob .getValue ().length ()).append ("\" " );
238
+ list .append ('}' );
239
+
240
+ if (it .hasNext ()) {
241
+ list .append (',' );
242
+ }
243
+ }
244
+ list .append ("]}" );
245
+
246
+ byte [] response = list .toString ().getBytes (UTF_8 );
247
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/json; charset=utf-8" );
248
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
249
+ exchange .getResponseBody ().write (response );
250
+
251
+ } else if (Regex .simpleMatch ("GET /storage/v1/b/bucket*" , request )) {
252
+ byte [] response = ("{\" kind\" :\" storage#bucket\" ,\" name\" :\" bucket\" ,\" id\" :\" 0\" }" ).getBytes (UTF_8 );
253
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/json; charset=utf-8" );
254
+ exchange .sendResponseHeaders (HttpStatus .SC_OK , response .length );
255
+ exchange .getResponseBody ().write (response );
256
+
257
+ } else if (Regex .simpleMatch ("GET /download/storage/v1/b/bucket/o/*" , request )) {
258
+ BytesReference blob = blobs .get (exchange .getRequestURI ().getPath ().replace ("/download/storage/v1/b/bucket/o/" , "" ));
259
+ if (blob != null ) {
260
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/octet-stream" );
261
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), blob .length ());
262
+ exchange .getResponseBody ().write (blob .toBytesRef ().bytes );
263
+ } else {
264
+ exchange .sendResponseHeaders (RestStatus .NOT_FOUND .getStatus (), -1 );
265
+ }
266
+
267
+ } else if (Regex .simpleMatch ("DELETE /storage/v1/b/bucket/o/*" , request )) {
268
+ int deletions = 0 ;
269
+ for (Iterator <Map .Entry <String , BytesReference >> iterator = blobs .entrySet ().iterator (); iterator .hasNext (); ) {
270
+ Map .Entry <String , BytesReference > blob = iterator .next ();
271
+ if (blob .getKey ().equals (exchange .getRequestURI ().toString ())) {
272
+ iterator .remove ();
273
+ deletions ++;
274
+ }
275
+ }
276
+ exchange .sendResponseHeaders ((deletions > 0 ? RestStatus .OK : RestStatus .NO_CONTENT ).getStatus (), -1 );
277
+
278
+ } else if (Regex .simpleMatch ("POST /batch/storage/v1" , request )) {
279
+ final String uri = "/storage/v1/b/bucket/o/" ;
280
+ final StringBuilder batch = new StringBuilder ();
281
+ for (String line : Streams .readAllLines (new BufferedInputStream (exchange .getRequestBody ()))) {
282
+ if (line .length () == 0 || line .startsWith ("--" ) || line .toLowerCase (Locale .ROOT ).startsWith ("content" )) {
283
+ batch .append (line ).append ('\n' );
284
+ } else if (line .startsWith ("DELETE" )) {
285
+ final String name = line .substring (line .indexOf (uri ) + uri .length (), line .lastIndexOf (" HTTP" ));
286
+ if (Strings .hasText (name )) {
287
+ if (blobs .entrySet ().removeIf (blob -> blob .getKey ().equals (URLDecoder .decode (name , UTF_8 )))) {
288
+ batch .append ("HTTP/1.1 204 NO_CONTENT" ).append ('\n' );
289
+ batch .append ('\n' );
290
+ }
291
+ }
292
+ }
293
+ }
294
+ byte [] response = batch .toString ().getBytes (UTF_8 );
295
+ exchange .getResponseHeaders ().add ("Content-Type" , exchange .getRequestHeaders ().getFirst ("Content-Type" ));
296
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
297
+ exchange .getResponseBody ().write (response );
298
+
299
+ } else if (Regex .simpleMatch ("POST /upload/storage/v1/b/bucket/*uploadType=multipart*" , request )) {
300
+ byte [] response = new byte [0 ];
301
+ try (BufferedInputStream in = new BufferedInputStream (new GZIPInputStream (exchange .getRequestBody ()))) {
302
+ String blob = null ;
303
+ int read ;
304
+ while ((read = in .read ()) != -1 ) {
305
+ boolean markAndContinue = false ;
306
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream ()) {
307
+ do { // search next consecutive {carriage return, new line} chars and stop
308
+ if ((char ) read == '\r' ) {
309
+ int next = in .read ();
310
+ if (next != -1 ) {
311
+ if (next == '\n' ) {
312
+ break ;
313
+ }
314
+ out .write (read );
315
+ out .write (next );
316
+ continue ;
317
+ }
318
+ }
319
+ out .write (read );
320
+ } while ((read = in .read ()) != -1 );
321
+
322
+ final String line = new String (out .toByteArray (), UTF_8 );
323
+ if (line .length () == 0 || line .equals ("\r \n " ) || line .startsWith ("--" )
324
+ || line .toLowerCase (Locale .ROOT ).startsWith ("content" )) {
325
+ markAndContinue = true ;
326
+ } else if (line .startsWith ("{\" bucket\" :\" bucket\" " )) {
327
+ markAndContinue = true ;
328
+ Matcher matcher = Pattern .compile ("\" name\" :\" ([^\" ]*)\" " ).matcher (line );
329
+ if (matcher .find ()) {
330
+ blob = matcher .group (1 );
331
+ response = line .getBytes (UTF_8 );
332
+ }
333
+ }
334
+ if (markAndContinue ) {
335
+ in .mark (Integer .MAX_VALUE );
336
+ continue ;
337
+ }
338
+ }
339
+ if (blob != null ) {
340
+ in .reset ();
341
+ try (ByteArrayOutputStream binary = new ByteArrayOutputStream ()) {
342
+ while ((read = in .read ()) != -1 ) {
343
+ binary .write (read );
344
+ }
345
+ binary .flush ();
346
+ byte [] tmp = binary .toByteArray ();
347
+ // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
348
+ blobs .put (blob , new BytesArray (Arrays .copyOf (tmp , tmp .length - 23 )));
349
+ } finally {
350
+ blob = null ;
351
+ }
352
+ }
353
+ }
354
+ }
355
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/json" );
356
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
357
+ exchange .getResponseBody ().write (response );
358
+
359
+ } else {
360
+ exchange .sendResponseHeaders (RestStatus .INTERNAL_SERVER_ERROR .getStatus (), -1 );
361
+ }
362
+ } finally {
363
+ exchange .close ();
364
+ }
365
+ }
366
+ }
367
+
368
+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate a fake OAuth2 authentication service" )
369
+ private static class FakeOAuth2HttpHandler implements HttpHandler {
370
+ @ Override
371
+ public void handle (final HttpExchange exchange ) throws IOException {
372
+ byte [] response = ("{\" access_token\" :\" foo\" ,\" token_type\" :\" Bearer\" ,\" expires_in\" :3600}" ).getBytes (UTF_8 );
373
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/json" );
374
+ exchange .sendResponseHeaders (HttpStatus .SC_OK , response .length );
375
+ exchange .getResponseBody ().write (response );
376
+ exchange .close ();
377
+ }
378
+ }
124
379
}
0 commit comments