19
19
20
20
package org .elasticsearch .repositories .gcs ;
21
21
22
- import com .google .cloud .storage .Storage ;
22
+ import com .sun .net .httpserver .HttpExchange ;
23
+ import com .sun .net .httpserver .HttpHandler ;
24
+ import com .sun .net .httpserver .HttpServer ;
25
+ import org .apache .http .HttpStatus ;
23
26
import org .elasticsearch .cluster .metadata .RepositoryMetaData ;
27
+ import org .elasticsearch .common .Strings ;
28
+ import org .elasticsearch .common .SuppressForbidden ;
29
+ import org .elasticsearch .common .bytes .BytesArray ;
30
+ import org .elasticsearch .common .bytes .BytesReference ;
31
+ import org .elasticsearch .common .io .Streams ;
32
+ import org .elasticsearch .common .network .InetAddresses ;
33
+ import org .elasticsearch .common .regex .Regex ;
34
+ import org .elasticsearch .common .settings .MockSecureSettings ;
24
35
import org .elasticsearch .common .settings .Settings ;
25
36
import org .elasticsearch .common .unit .ByteSizeUnit ;
26
37
import org .elasticsearch .common .unit .ByteSizeValue ;
38
+ import org .elasticsearch .common .xcontent .XContentBuilder ;
39
+ import org .elasticsearch .common .xcontent .XContentType ;
40
+ import org .elasticsearch .mocksocket .MockHttpServer ;
27
41
import org .elasticsearch .plugins .Plugin ;
28
42
import org .elasticsearch .repositories .blobstore .ESBlobStoreRepositoryIntegTestCase ;
43
+ import org .elasticsearch .rest .RestStatus ;
44
+ import org .elasticsearch .rest .RestUtils ;
29
45
import org .junit .After ;
46
+ import org .junit .AfterClass ;
47
+ import org .junit .Before ;
48
+ import org .junit .BeforeClass ;
30
49
50
+ import java .io .BufferedInputStream ;
51
+ import java .io .ByteArrayOutputStream ;
52
+ import java .io .IOException ;
53
+ import java .io .UnsupportedEncodingException ;
54
+ import java .net .InetAddress ;
55
+ import java .net .InetSocketAddress ;
56
+ import java .net .URLDecoder ;
57
+ import java .security .KeyPairGenerator ;
58
+ import java .util .Arrays ;
59
+ import java .util .Base64 ;
31
60
import java .util .Collection ;
32
61
import java .util .Collections ;
62
+ import java .util .HashMap ;
63
+ import java .util .Iterator ;
64
+ import java .util .List ;
65
+ import java .util .Locale ;
66
+ import java .util .Map ;
67
+ import java .util .UUID ;
33
68
import java .util .concurrent .ConcurrentHashMap ;
34
69
import java .util .concurrent .ConcurrentMap ;
70
+ import java .util .regex .Matcher ;
71
+ import java .util .regex .Pattern ;
72
+ import java .util .stream .Collectors ;
73
+ import java .util .zip .GZIPInputStream ;
35
74
75
+ import static java .nio .charset .StandardCharsets .UTF_8 ;
76
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .CREDENTIALS_FILE_SETTING ;
77
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .ENDPOINT_SETTING ;
78
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageClientSettings .TOKEN_URI_SETTING ;
79
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageRepository .BUCKET ;
80
+ import static org .elasticsearch .repositories .gcs .GoogleCloudStorageRepository .CLIENT_NAME ;
81
+
82
+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint" )
36
83
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
37
84
38
- private static final String BUCKET = "gcs-repository-test" ;
85
+ private static HttpServer httpServer ;
86
+ private static byte [] serviceAccount ;
87
+
88
+ @ BeforeClass
89
+ public static void startHttpServer () throws Exception {
90
+ httpServer = MockHttpServer .createHttp (new InetSocketAddress (InetAddress .getLoopbackAddress (), 0 ), 0 );
91
+ httpServer .start ();
92
+ serviceAccount = createServiceAccount ();
93
+ }
94
+
95
+ @ Before
96
+ public void setUpHttpServer () {
97
+ httpServer .createContext ("/" , new InternalHttpHandler ());
98
+ httpServer .createContext ("/token" , new FakeOAuth2HttpHandler ());
99
+ }
39
100
40
- // Static list of blobs shared among all nodes in order to act like a remote repository service:
41
- // all nodes must see the same content
42
- private static final ConcurrentMap <String , byte []> blobs = new ConcurrentHashMap <>();
101
+ @ AfterClass
102
+ public static void stopHttpServer () {
103
+ httpServer .stop (0 );
104
+ httpServer = null ;
105
+ }
106
+
107
+ @ After
108
+ public void tearDownHttpServer () {
109
+ httpServer .removeContext ("/" );
110
+ httpServer .removeContext ("/token" );
111
+ }
43
112
44
113
@ Override
45
114
protected String repositoryType () {
@@ -50,38 +119,31 @@ protected String repositoryType() {
50
119
protected Settings repositorySettings () {
51
120
return Settings .builder ()
52
121
.put (super .repositorySettings ())
53
- .put ("bucket" , BUCKET )
54
- .put ("base_path" , GoogleCloudStorageBlobStoreRepositoryTests . class . getSimpleName () )
122
+ .put (BUCKET . getKey (), "bucket" )
123
+ .put (CLIENT_NAME . getKey (), "test" )
55
124
.build ();
56
125
}
57
126
58
127
@ Override
59
128
protected Collection <Class <? extends Plugin >> nodePlugins () {
60
- return Collections .singletonList (MockGoogleCloudStoragePlugin .class );
61
- }
62
-
63
- @ After
64
- public void wipeRepository () {
65
- blobs .clear ();
129
+ return Collections .singletonList (GoogleCloudStoragePlugin .class );
66
130
}
67
131
68
- public static class MockGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
132
+ @ Override
133
+ protected Settings nodeSettings (int nodeOrdinal ) {
134
+ final Settings .Builder settings = Settings .builder ();
135
+ settings .put (super .nodeSettings (nodeOrdinal ));
69
136
70
- public MockGoogleCloudStoragePlugin (final Settings settings ) {
71
- super (settings );
72
- }
137
+ final InetSocketAddress address = httpServer .getAddress ();
138
+ final String endpoint = "http://" + InetAddresses .toUriString (address .getAddress ()) + ":" + address .getPort ();
139
+ settings .put (ENDPOINT_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), endpoint );
140
+ settings .put (TOKEN_URI_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), endpoint + "/token" );
73
141
74
- @ Override
75
- protected GoogleCloudStorageService createStorageService () {
76
- return new MockGoogleCloudStorageService ();
77
- }
78
- }
142
+ final MockSecureSettings secureSettings = new MockSecureSettings ();
143
+ secureSettings .setFile (CREDENTIALS_FILE_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), serviceAccount );
144
+ settings .setSecureSettings (secureSettings );
79
145
80
- public static class MockGoogleCloudStorageService extends GoogleCloudStorageService {
81
- @ Override
82
- public Storage client (String clientName ) {
83
- return new MockStorage (BUCKET , blobs );
84
- }
146
+ return settings .build ();
85
147
}
86
148
87
149
public void testChunkSize () {
@@ -121,4 +183,204 @@ public void testChunkSize() {
121
183
});
122
184
assertEquals ("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]" , e .getMessage ());
123
185
}
186
+
187
+ private static byte [] createServiceAccount () throws Exception {
188
+ final KeyPairGenerator keyPairGenerator = KeyPairGenerator .getInstance ("RSA" );
189
+ keyPairGenerator .initialize (1024 );
190
+ final String privateKey = Base64 .getEncoder ().encodeToString (keyPairGenerator .generateKeyPair ().getPrivate ().getEncoded ());
191
+
192
+ final ByteArrayOutputStream out = new ByteArrayOutputStream ();
193
+ try (XContentBuilder builder = new XContentBuilder (XContentType .JSON .xContent (), out )) {
194
+ builder .startObject ();
195
+ {
196
+ builder .field ("type" , "service_account" );
197
+ builder .field ("project_id" , getTestClass ().getName ().toLowerCase (Locale .ROOT ));
198
+ builder .field ("private_key_id" , UUID .randomUUID ().toString ());
199
+ builder .field ("private_key" , "-----BEGIN PRIVATE KEY-----\n " + privateKey + "\n -----END PRIVATE KEY-----\n " );
200
+ builder .
field (
"client_email" ,
"[email protected] " );
201
+ builder .field ("client_id" , String .valueOf (randomNonNegativeLong ()));
202
+ }
203
+ builder .endObject ();
204
+ }
205
+ return out .toByteArray ();
206
+ }
207
+
208
+ /**
209
+ * Minimal HTTP handler that acts as a Google Cloud Storage compliant server
210
+ *
211
+ * Note: it does not support resumable uploads
212
+ */
213
+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint" )
214
+ private static class InternalHttpHandler implements HttpHandler {
215
+
216
+ private final ConcurrentMap <String , BytesReference > blobs = new ConcurrentHashMap <>();
217
+
218
+ @ Override
219
+ public void handle (final HttpExchange exchange ) throws IOException {
220
+ final String request = exchange .getRequestMethod () + " " + exchange .getRequestURI ().toString ();
221
+ try {
222
+ if (Regex .simpleMatch ("GET /storage/v1/b/bucket/o*" , request )) {
223
+ final Map <String , String > params = new HashMap <>();
224
+ RestUtils .decodeQueryString (exchange .getRequestURI ().getQuery (), 0 , params );
225
+ final String prefix = params .get ("prefix" );
226
+
227
+ final List <Map .Entry <String , BytesReference >> listOfBlobs = blobs .entrySet ().stream ()
228
+ .filter (blob -> prefix == null || blob .getKey ().startsWith (prefix )).collect (Collectors .toList ());
229
+
230
+ final StringBuilder list = new StringBuilder ();
231
+ list .append ("{\" kind\" :\" storage#objects\" ,\" items\" :[" );
232
+ for (Iterator <Map .Entry <String , BytesReference >> it = listOfBlobs .iterator (); it .hasNext (); ) {
233
+ Map .Entry <String , BytesReference > blob = it .next ();
234
+ list .append ("{\" kind\" :\" storage#object\" ," );
235
+ list .append ("\" bucket\" :\" bucket\" ," );
236
+ list .append ("\" name\" :\" " ).append (blob .getKey ()).append ("\" ," );
237
+ list .append ("\" id\" :\" " ).append (blob .getKey ()).append ("\" ," );
238
+ list .append ("\" size\" :\" " ).append (blob .getValue ().length ()).append ("\" " );
239
+ list .append ('}' );
240
+
241
+ if (it .hasNext ()) {
242
+ list .append (',' );
243
+ }
244
+ }
245
+ list .append ("]}" );
246
+
247
+ byte [] response = list .toString ().getBytes (UTF_8 );
248
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/json; charset=utf-8" );
249
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
250
+ exchange .getResponseBody ().write (response );
251
+
252
+ } else if (Regex .simpleMatch ("GET /storage/v1/b/bucket*" , request )) {
253
+ byte [] response = ("{\" kind\" :\" storage#bucket\" ,\" name\" :\" bucket\" ,\" id\" :\" 0\" }" ).getBytes (UTF_8 );
254
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/json; charset=utf-8" );
255
+ exchange .sendResponseHeaders (HttpStatus .SC_OK , response .length );
256
+ exchange .getResponseBody ().write (response );
257
+
258
+ } else if (Regex .simpleMatch ("GET /download/storage/v1/b/bucket/o/*" , request )) {
259
+ BytesReference blob = blobs .get (exchange .getRequestURI ().getPath ().replace ("/download/storage/v1/b/bucket/o/" , "" ));
260
+ if (blob != null ) {
261
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/octet-stream" );
262
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), blob .length ());
263
+ exchange .getResponseBody ().write (blob .toBytesRef ().bytes );
264
+ } else {
265
+ exchange .sendResponseHeaders (RestStatus .NOT_FOUND .getStatus (), -1 );
266
+ }
267
+
268
+ } else if (Regex .simpleMatch ("DELETE /storage/v1/b/bucket/o/*" , request )) {
269
+ int deletions = 0 ;
270
+ for (Iterator <Map .Entry <String , BytesReference >> iterator = blobs .entrySet ().iterator (); iterator .hasNext (); ) {
271
+ Map .Entry <String , BytesReference > blob = iterator .next ();
272
+ if (blob .getKey ().equals (exchange .getRequestURI ().toString ())) {
273
+ iterator .remove ();
274
+ deletions ++;
275
+ }
276
+ }
277
+ exchange .sendResponseHeaders ((deletions > 0 ? RestStatus .OK : RestStatus .NO_CONTENT ).getStatus (), -1 );
278
+
279
+ } else if (Regex .simpleMatch ("POST /batch/storage/v1" , request )) {
280
+ final String uri = "/storage/v1/b/bucket/o/" ;
281
+ final StringBuilder batch = new StringBuilder ();
282
+ for (String line : Streams .readAllLines (new BufferedInputStream (exchange .getRequestBody ()))) {
283
+ if (line .length () == 0 || line .startsWith ("--" ) || line .toLowerCase (Locale .ROOT ).startsWith ("content" )) {
284
+ batch .append (line ).append ('\n' );
285
+ } else if (line .startsWith ("DELETE" )) {
286
+ final String name = line .substring (line .indexOf (uri ) + uri .length (), line .lastIndexOf (" HTTP" ));
287
+ if (Strings .hasText (name )) {
288
+ try {
289
+ final String blobName = URLDecoder .decode (name , UTF_8 .name ());
290
+ if (blobs .entrySet ().removeIf (blob -> blob .getKey ().equals (blobName ))) {
291
+ batch .append ("HTTP/1.1 204 NO_CONTENT" ).append ('\n' );
292
+ batch .append ('\n' );
293
+ }
294
+ } catch (UnsupportedEncodingException e ) {
295
+ batch .append ("HTTP/1.1 404 NOT_FOUND" ).append ('\n' );
296
+ batch .append ('\n' );
297
+ }
298
+ }
299
+ }
300
+ }
301
+ byte [] response = batch .toString ().getBytes (UTF_8 );
302
+ exchange .getResponseHeaders ().add ("Content-Type" , exchange .getRequestHeaders ().getFirst ("Content-Type" ));
303
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
304
+ exchange .getResponseBody ().write (response );
305
+
306
+ } else if (Regex .simpleMatch ("POST /upload/storage/v1/b/bucket/*uploadType=multipart*" , request )) {
307
+ byte [] response = new byte [0 ];
308
+ try (BufferedInputStream in = new BufferedInputStream (new GZIPInputStream (exchange .getRequestBody ()))) {
309
+ String blob = null ;
310
+ int read ;
311
+ while ((read = in .read ()) != -1 ) {
312
+ boolean markAndContinue = false ;
313
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream ()) {
314
+ do { // search next consecutive {carriage return, new line} chars and stop
315
+ if ((char ) read == '\r' ) {
316
+ int next = in .read ();
317
+ if (next != -1 ) {
318
+ if (next == '\n' ) {
319
+ break ;
320
+ }
321
+ out .write (read );
322
+ out .write (next );
323
+ continue ;
324
+ }
325
+ }
326
+ out .write (read );
327
+ } while ((read = in .read ()) != -1 );
328
+
329
+ final String line = new String (out .toByteArray (), UTF_8 );
330
+ if (line .length () == 0 || line .equals ("\r \n " ) || line .startsWith ("--" )
331
+ || line .toLowerCase (Locale .ROOT ).startsWith ("content" )) {
332
+ markAndContinue = true ;
333
+ } else if (line .startsWith ("{\" bucket\" :\" bucket\" " )) {
334
+ markAndContinue = true ;
335
+ Matcher matcher = Pattern .compile ("\" name\" :\" ([^\" ]*)\" " ).matcher (line );
336
+ if (matcher .find ()) {
337
+ blob = matcher .group (1 );
338
+ response = line .getBytes (UTF_8 );
339
+ }
340
+ }
341
+ if (markAndContinue ) {
342
+ in .mark (Integer .MAX_VALUE );
343
+ continue ;
344
+ }
345
+ }
346
+ if (blob != null ) {
347
+ in .reset ();
348
+ try (ByteArrayOutputStream binary = new ByteArrayOutputStream ()) {
349
+ while ((read = in .read ()) != -1 ) {
350
+ binary .write (read );
351
+ }
352
+ binary .flush ();
353
+ byte [] tmp = binary .toByteArray ();
354
+ // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
355
+ blobs .put (blob , new BytesArray (Arrays .copyOf (tmp , tmp .length - 23 )));
356
+ } finally {
357
+ blob = null ;
358
+ }
359
+ }
360
+ }
361
+ }
362
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/json" );
363
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
364
+ exchange .getResponseBody ().write (response );
365
+
366
+ } else {
367
+ exchange .sendResponseHeaders (RestStatus .INTERNAL_SERVER_ERROR .getStatus (), -1 );
368
+ }
369
+ } finally {
370
+ exchange .close ();
371
+ }
372
+ }
373
+ }
374
+
375
+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate a fake OAuth2 authentication service" )
376
+ private static class FakeOAuth2HttpHandler implements HttpHandler {
377
+ @ Override
378
+ public void handle (final HttpExchange exchange ) throws IOException {
379
+ byte [] response = ("{\" access_token\" :\" foo\" ,\" token_type\" :\" Bearer\" ,\" expires_in\" :3600}" ).getBytes (UTF_8 );
380
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/json" );
381
+ exchange .sendResponseHeaders (HttpStatus .SC_OK , response .length );
382
+ exchange .getResponseBody ().write (response );
383
+ exchange .close ();
384
+ }
385
+ }
124
386
}
0 commit comments