18
18
*/
19
19
package org .elasticsearch .repositories .s3 ;
20
20
21
- import com .amazonaws .services .s3 .AmazonS3 ;
22
- import com .amazonaws .services .s3 .model .CannedAccessControlList ;
23
- import com .amazonaws .services .s3 .model .StorageClass ;
21
+ import com .sun .net .httpserver .HttpExchange ;
22
+ import com .sun .net .httpserver .HttpHandler ;
23
+ import com .sun .net .httpserver .HttpServer ;
24
+ import org .elasticsearch .common .SuppressForbidden ;
25
+ import org .elasticsearch .common .bytes .BytesReference ;
26
+ import org .elasticsearch .common .io .Streams ;
27
+ import org .elasticsearch .common .network .InetAddresses ;
28
+ import org .elasticsearch .common .regex .Regex ;
29
+ import org .elasticsearch .common .settings .MockSecureSettings ;
30
+ import org .elasticsearch .common .settings .Setting ;
24
31
import org .elasticsearch .common .settings .Settings ;
25
- import org .elasticsearch .common .unit .ByteSizeUnit ;
26
- import org .elasticsearch .common .unit .ByteSizeValue ;
27
- import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
28
- import org .elasticsearch .env .Environment ;
32
+ import org .elasticsearch .mocksocket .MockHttpServer ;
29
33
import org .elasticsearch .plugins .Plugin ;
30
- import org .elasticsearch .repositories .Repository ;
31
34
import org .elasticsearch .repositories .blobstore .ESBlobStoreRepositoryIntegTestCase ;
32
- import org .elasticsearch .threadpool .ThreadPool ;
35
+ import org .elasticsearch .rest .RestStatus ;
36
+ import org .elasticsearch .rest .RestUtils ;
33
37
import org .junit .After ;
38
+ import org .junit .AfterClass ;
39
+ import org .junit .Before ;
34
40
import org .junit .BeforeClass ;
35
41
42
+ import java .io .IOException ;
43
+ import java .io .InputStreamReader ;
44
+ import java .net .InetAddress ;
45
+ import java .net .InetSocketAddress ;
46
+ import java .nio .charset .StandardCharsets ;
47
+ import java .util .ArrayList ;
36
48
import java .util .Collection ;
37
49
import java .util .Collections ;
38
- import java .util .Locale ;
50
+ import java .util .HashMap ;
51
+ import java .util .Iterator ;
52
+ import java .util .List ;
39
53
import java .util .Map ;
40
54
import java .util .concurrent .ConcurrentHashMap ;
41
55
import java .util .concurrent .ConcurrentMap ;
42
56
57
+ import static java .nio .charset .StandardCharsets .UTF_8 ;
58
+ import static org .hamcrest .Matchers .nullValue ;
59
+
60
+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate an S3 endpoint" )
43
61
public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
44
62
45
- private static final ConcurrentMap <String , byte []> blobs = new ConcurrentHashMap <>();
46
- private static String bucket ;
47
- private static ByteSizeValue bufferSize ;
48
- private static boolean serverSideEncryption ;
49
- private static String cannedACL ;
50
- private static String storageClass ;
63
+ private static HttpServer httpServer ;
51
64
52
65
@ BeforeClass
53
- public static void setUpRepositorySettings () {
54
- bucket = randomAlphaOfLength (randomIntBetween (1 , 10 )).toLowerCase (Locale .ROOT );
55
- bufferSize = new ByteSizeValue (randomIntBetween (5 , 50 ), ByteSizeUnit .MB );
56
- serverSideEncryption = randomBoolean ();
57
- if (randomBoolean ()) {
58
- cannedACL = randomFrom (CannedAccessControlList .values ()).toString ();
59
- }
60
- if (randomBoolean ()) {
61
- storageClass = randomValueOtherThan (StorageClass .Glacier , () -> randomFrom (StorageClass .values ())).toString ();
62
- }
66
+ public static void startHttpServer () throws Exception {
67
+ httpServer = MockHttpServer .createHttp (new InetSocketAddress (InetAddress .getLoopbackAddress (), 0 ), 0 );
68
+ httpServer .start ();
69
+ }
70
+
71
+ @ Before
72
+ public void setUpHttpServer () {
73
+ httpServer .createContext ("/bucket" , new InternalHttpHandler ());
74
+ }
75
+
76
+ @ AfterClass
77
+ public static void stopHttpServer () {
78
+ httpServer .stop (0 );
79
+ httpServer = null ;
63
80
}
64
81
65
82
@ After
66
- public void wipeRepository () {
67
- blobs . clear ( );
83
+ public void tearDownHttpServer () {
84
+ httpServer . removeContext ( "/bucket" );
68
85
}
69
86
70
87
@ Override
@@ -75,11 +92,8 @@ protected String repositoryType() {
75
92
@ Override
76
93
protected Settings repositorySettings () {
77
94
return Settings .builder ()
78
- .put (S3Repository .BUCKET_SETTING .getKey (), bucket )
79
- .put (S3Repository .BUFFER_SIZE_SETTING .getKey (), bufferSize )
80
- .put (S3Repository .SERVER_SIDE_ENCRYPTION_SETTING .getKey (), serverSideEncryption )
81
- .put (S3Repository .CANNED_ACL_SETTING .getKey (), cannedACL )
82
- .put (S3Repository .STORAGE_CLASS_SETTING .getKey (), storageClass )
95
+ .put (S3Repository .BUCKET_SETTING .getKey (), "bucket" )
96
+ .put (S3Repository .CLIENT_NAME .getKey (), "test" )
83
97
.build ();
84
98
}
85
99
@@ -88,22 +102,131 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
88
102
return Collections .singletonList (TestS3RepositoryPlugin .class );
89
103
}
90
104
105
+ @ Override
106
+ protected Settings nodeSettings (int nodeOrdinal ) {
107
+ final MockSecureSettings secureSettings = new MockSecureSettings ();
108
+ secureSettings .setString (S3ClientSettings .ACCESS_KEY_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), "access" );
109
+ secureSettings .setString (S3ClientSettings .SECRET_KEY_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), "secret" );
110
+
111
+ final InetSocketAddress address = httpServer .getAddress ();
112
+ final String endpoint = "http://" + InetAddresses .toUriString (address .getAddress ()) + ":" + address .getPort ();
113
+
114
+ return Settings .builder ()
115
+ .put (Settings .builder ()
116
+ .put (S3ClientSettings .ENDPOINT_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), endpoint )
117
+ .put (S3ClientSettings .DISABLE_CHUNKED_ENCODING .getConcreteSettingForNamespace ("test" ).getKey (), true )
118
+ .build ())
119
+ .put (super .nodeSettings (nodeOrdinal ))
120
+ .setSecureSettings (secureSettings )
121
+ .build ();
122
+ }
123
+
91
124
public static class TestS3RepositoryPlugin extends S3RepositoryPlugin {
92
125
93
126
public TestS3RepositoryPlugin (final Settings settings ) {
94
127
super (settings );
95
128
}
96
129
97
130
@ Override
98
- public Map <String , Repository .Factory > getRepositories (final Environment env , final NamedXContentRegistry registry ,
99
- final ThreadPool threadPool ) {
100
- return Collections .singletonMap (S3Repository .TYPE ,
101
- metadata -> new S3Repository (metadata , registry , new S3Service () {
102
- @ Override
103
- AmazonS3 buildClient (S3ClientSettings clientSettings ) {
104
- return new MockAmazonS3 (blobs , bucket , serverSideEncryption , cannedACL , storageClass );
131
+ public List <Setting <?>> getSettings () {
132
+ final List <Setting <?>> settings = new ArrayList <>(super .getSettings ());
133
+ // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
134
+ settings .add (S3ClientSettings .DISABLE_CHUNKED_ENCODING );
135
+ return settings ;
136
+ }
137
+ }
138
+
139
+ /**
140
+ * Minimal HTTP handler that acts as a S3 compliant server
141
+ */
142
+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate an S3 endpoint" )
143
+ private static class InternalHttpHandler implements HttpHandler {
144
+
145
+ private final ConcurrentMap <String , BytesReference > blobs = new ConcurrentHashMap <>();
146
+
147
+ @ Override
148
+ public void handle (final HttpExchange exchange ) throws IOException {
149
+ final String request = exchange .getRequestMethod () + " " + exchange .getRequestURI ().toString ();
150
+ try {
151
+ if (Regex .simpleMatch ("PUT /bucket/*" , request )) {
152
+ blobs .put (exchange .getRequestURI ().toString (), Streams .readFully (exchange .getRequestBody ()));
153
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), -1 );
154
+
155
+ } else if (Regex .simpleMatch ("GET /bucket/?prefix=*" , request )) {
156
+ final Map <String , String > params = new HashMap <>();
157
+ RestUtils .decodeQueryString (exchange .getRequestURI ().getQuery (), 0 , params );
158
+ assertThat ("Test must be adapted for GET Bucket (List Objects) Version 2" , params .get ("list-type" ), nullValue ());
159
+
160
+ final StringBuilder list = new StringBuilder ();
161
+ list .append ("<?xml version=\" 1.0\" encoding=\" UTF-8\" ?>" );
162
+ list .append ("<ListBucketResult>" );
163
+ final String prefix = params .get ("prefix" );
164
+ if (prefix != null ) {
165
+ list .append ("<Prefix>" ).append (prefix ).append ("</Prefix>" );
166
+ }
167
+ for (Map .Entry <String , BytesReference > blob : blobs .entrySet ()) {
168
+ if (prefix == null || blob .getKey ().startsWith ("/bucket/" + prefix )) {
169
+ list .append ("<Contents>" );
170
+ list .append ("<Key>" ).append (blob .getKey ().replace ("/bucket/" , "" )).append ("</Key>" );
171
+ list .append ("<Size>" ).append (blob .getValue ().length ()).append ("</Size>" );
172
+ list .append ("</Contents>" );
105
173
}
106
- }, threadPool ));
174
+ }
175
+ list .append ("</ListBucketResult>" );
176
+
177
+ byte [] response = list .toString ().getBytes (StandardCharsets .UTF_8 );
178
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/xml" );
179
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
180
+ exchange .getResponseBody ().write (response );
181
+
182
+ } else if (Regex .simpleMatch ("GET /bucket/*" , request )) {
183
+ final BytesReference blob = blobs .get (exchange .getRequestURI ().toString ());
184
+ if (blob != null ) {
185
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/octet-stream" );
186
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), blob .length ());
187
+ blob .writeTo (exchange .getResponseBody ());
188
+ } else {
189
+ exchange .sendResponseHeaders (RestStatus .NOT_FOUND .getStatus (), -1 );
190
+ }
191
+
192
+ } else if (Regex .simpleMatch ("DELETE /bucket/*" , request )) {
193
+ int deletions = 0 ;
194
+ for (Iterator <Map .Entry <String , BytesReference >> iterator = blobs .entrySet ().iterator (); iterator .hasNext (); ) {
195
+ Map .Entry <String , BytesReference > blob = iterator .next ();
196
+ if (blob .getKey ().startsWith (exchange .getRequestURI ().toString ())) {
197
+ iterator .remove ();
198
+ deletions ++;
199
+ }
200
+ }
201
+ exchange .sendResponseHeaders ((deletions > 0 ? RestStatus .OK : RestStatus .NO_CONTENT ).getStatus (), -1 );
202
+
203
+ } else if (Regex .simpleMatch ("POST /bucket/?delete" , request )) {
204
+ final String requestBody = Streams .copyToString (new InputStreamReader (exchange .getRequestBody (), UTF_8 ));
205
+
206
+ final StringBuilder deletes = new StringBuilder ();
207
+ deletes .append ("<?xml version=\" 1.0\" encoding=\" UTF-8\" ?>" );
208
+ deletes .append ("<DeleteResult>" );
209
+ for (Iterator <Map .Entry <String , BytesReference >> iterator = blobs .entrySet ().iterator (); iterator .hasNext (); ) {
210
+ Map .Entry <String , BytesReference > blob = iterator .next ();
211
+ String key = blob .getKey ().replace ("/bucket/" , "" );
212
+ if (requestBody .contains ("<Key>" + key + "</Key>" )) {
213
+ deletes .append ("<Deleted><Key>" ).append (key ).append ("</Key></Deleted>" );
214
+ iterator .remove ();
215
+ }
216
+ }
217
+ deletes .append ("</DeleteResult>" );
218
+
219
+ byte [] response = deletes .toString ().getBytes (StandardCharsets .UTF_8 );
220
+ exchange .getResponseHeaders ().add ("Content-Type" , "application/xml" );
221
+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
222
+ exchange .getResponseBody ().write (response );
223
+
224
+ } else {
225
+ exchange .sendResponseHeaders (RestStatus .INTERNAL_SERVER_ERROR .getStatus (), -1 );
226
+ }
227
+ } finally {
228
+ exchange .close ();
229
+ }
107
230
}
108
231
}
109
232
}
0 commit comments