19
19
20
20
package org .elasticsearch .repositories .azure ;
21
21
22
+ import com .microsoft .azure .storage .AccessCondition ;
23
+ import com .microsoft .azure .storage .CloudStorageAccount ;
22
24
import com .microsoft .azure .storage .OperationContext ;
25
+ import com .microsoft .azure .storage .RetryExponentialRetry ;
26
+ import com .microsoft .azure .storage .RetryPolicy ;
27
+ import com .microsoft .azure .storage .StorageErrorCodeStrings ;
23
28
import com .microsoft .azure .storage .StorageException ;
29
+ import com .microsoft .azure .storage .blob .BlobInputStream ;
30
+ import com .microsoft .azure .storage .blob .BlobListingDetails ;
31
+ import com .microsoft .azure .storage .blob .BlobProperties ;
24
32
import com .microsoft .azure .storage .blob .CloudBlobClient ;
25
-
33
+ import com .microsoft .azure .storage .blob .CloudBlobContainer ;
34
+ import com .microsoft .azure .storage .blob .CloudBlockBlob ;
35
+ import com .microsoft .azure .storage .blob .DeleteSnapshotsOption ;
36
+ import com .microsoft .azure .storage .blob .ListBlobItem ;
37
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
26
38
import org .elasticsearch .common .blobstore .BlobMetaData ;
39
+ import org .elasticsearch .common .blobstore .support .PlainBlobMetaData ;
40
+ import org .elasticsearch .common .collect .MapBuilder ;
27
41
import org .elasticsearch .common .collect .Tuple ;
42
+ import org .elasticsearch .common .component .AbstractComponent ;
43
+ import org .elasticsearch .common .settings .Settings ;
44
+ import org .elasticsearch .common .settings .SettingsException ;
28
45
import org .elasticsearch .common .unit .ByteSizeUnit ;
29
46
import org .elasticsearch .common .unit .ByteSizeValue ;
30
47
31
48
import java .io .IOException ;
32
49
import java .io .InputStream ;
50
+ import java .net .HttpURLConnection ;
51
+ import java .net .URI ;
33
52
import java .net .URISyntaxException ;
34
53
import java .nio .file .FileAlreadyExistsException ;
54
+ import java .security .InvalidKeyException ;
55
+ import java .util .EnumSet ;
35
56
import java .util .Map ;
36
57
import java .util .function .Supplier ;
37
58
38
- /**
39
- * Azure Storage Service interface
40
- * @see AzureStorageServiceImpl for Azure REST API implementation
41
- */
42
- public interface AzureStorageService {
59
+ import static java .util .Collections .emptyMap ;
60
+
61
+ public class AzureStorageService extends AbstractComponent {
62
+
63
+ public static ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue (1 , ByteSizeUnit .BYTES );
64
+ public static ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue (64 , ByteSizeUnit .MB );
65
+
66
+ // 'package' for testing
67
+ volatile Map <String , AzureStorageSettings > storageSettings = emptyMap ();
68
+
69
+ public AzureStorageService (Settings settings ) {
70
+ super (settings );
71
+ // eagerly load client settings so that secure settings are read
72
+ final Map <String , AzureStorageSettings > clientsSettings = AzureStorageSettings .load (settings );
73
+ refreshAndClearCache (clientsSettings );
74
+ }
43
75
44
76
/**
45
77
* Creates a {@code CloudBlobClient} on each invocation using the current client
@@ -48,7 +80,46 @@ public interface AzureStorageService {
48
80
* thread for logically coupled ops. The {@code OperationContext} is used to
49
81
* specify the proxy, but a new context is *required* for each call.
50
82
*/
51
- Tuple <CloudBlobClient , Supplier <OperationContext >> client (String clientName );
83
+ public Tuple <CloudBlobClient , Supplier <OperationContext >> client (String clientName ) {
84
+ final AzureStorageSettings azureStorageSettings = this .storageSettings .get (clientName );
85
+ if (azureStorageSettings == null ) {
86
+ throw new SettingsException ("Unable to find client with name [" + clientName + "]" );
87
+ }
88
+ try {
89
+ return new Tuple <>(buildClient (azureStorageSettings ), () -> buildOperationContext (azureStorageSettings ));
90
+ } catch (InvalidKeyException | URISyntaxException | IllegalArgumentException e ) {
91
+ throw new SettingsException ("Invalid azure client settings with name [" + clientName + "]" , e );
92
+ }
93
+ }
94
+
95
+ protected CloudBlobClient buildClient (AzureStorageSettings azureStorageSettings ) throws InvalidKeyException , URISyntaxException {
96
+ final CloudBlobClient client = createClient (azureStorageSettings );
97
+ // Set timeout option if the user sets cloud.azure.storage.timeout or
98
+ // cloud.azure.storage.xxx.timeout (it's negative by default)
99
+ final long timeout = azureStorageSettings .getTimeout ().getMillis ();
100
+ if (timeout > 0 ) {
101
+ if (timeout > Integer .MAX_VALUE ) {
102
+ throw new IllegalArgumentException ("Timeout [" + azureStorageSettings .getTimeout () + "] exceeds 2,147,483,647ms." );
103
+ }
104
+ client .getDefaultRequestOptions ().setTimeoutIntervalInMs ((int ) timeout );
105
+ }
106
+ // We define a default exponential retry policy
107
+ client .getDefaultRequestOptions ()
108
+ .setRetryPolicyFactory (new RetryExponentialRetry (RetryPolicy .DEFAULT_CLIENT_BACKOFF , azureStorageSettings .getMaxRetries ()));
109
+ client .getDefaultRequestOptions ().setLocationMode (azureStorageSettings .getLocationMode ());
110
+ return client ;
111
+ }
112
+
113
+ protected CloudBlobClient createClient (AzureStorageSettings azureStorageSettings ) throws InvalidKeyException , URISyntaxException {
114
+ final String connectionString = azureStorageSettings .buildConnectionString ();
115
+ return CloudStorageAccount .parse (connectionString ).createCloudBlobClient ();
116
+ }
117
+
118
+ protected OperationContext buildOperationContext (AzureStorageSettings azureStorageSettings ) {
119
+ final OperationContext context = new OperationContext ();
120
+ context .setProxy (azureStorageSettings .getProxy ());
121
+ return context ;
122
+ }
52
123
53
124
/**
54
125
* Updates settings for building clients. Any client cache is cleared. Future
@@ -57,32 +128,134 @@ public interface AzureStorageService {
57
128
* @param clientsSettings the settings for new clients
58
129
* @return the old settings
59
130
*/
60
- Map <String , AzureStorageSettings > refreshAndClearCache (Map <String , AzureStorageSettings > clientsSettings );
61
-
62
- ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue ( 1 , ByteSizeUnit . BYTES );
63
- ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue ( 64 , ByteSizeUnit . MB );
64
-
65
- boolean doesContainerExist ( String account , String container ) throws URISyntaxException , StorageException ;
131
+ public Map <String , AzureStorageSettings > refreshAndClearCache (Map <String , AzureStorageSettings > clientsSettings ) {
132
+ final Map < String , AzureStorageSettings > prevSettings = this . storageSettings ;
133
+ this . storageSettings = MapBuilder . newMapBuilder ( clientsSettings ). immutableMap ( );
134
+ // clients are built lazily by {@link client(String)}
135
+ return prevSettings ;
136
+ }
66
137
67
- void removeContainer (String account , String container ) throws URISyntaxException , StorageException ;
138
+ public boolean doesContainerExist (String account , String container ) throws URISyntaxException , StorageException {
139
+ final Tuple <CloudBlobClient , Supplier <OperationContext >> client = client (account );
140
+ final CloudBlobContainer blobContainer = client .v1 ().getContainerReference (container );
141
+ return SocketAccess .doPrivilegedException (() -> blobContainer .exists (null , null , client .v2 ().get ()));
142
+ }
68
143
69
- void createContainer (String account , String container ) throws URISyntaxException , StorageException ;
144
+ public void deleteFiles (String account , String container , String path ) throws URISyntaxException , StorageException {
145
+ final Tuple <CloudBlobClient , Supplier <OperationContext >> client = client (account );
146
+ // container name must be lower case.
147
+ logger .trace (() -> new ParameterizedMessage ("delete files container [{}], path [{}]" , container , path ));
148
+ SocketAccess .doPrivilegedVoidException (() -> {
149
+ // list the blobs using a flat blob listing mode
150
+ final CloudBlobContainer blobContainer = client .v1 ().getContainerReference (container );
151
+ for (final ListBlobItem blobItem : blobContainer .listBlobs (path , true , EnumSet .noneOf (BlobListingDetails .class ), null ,
152
+ client .v2 ().get ())) {
153
+ final String blobName = blobNameFromUri (blobItem .getUri ());
154
+ logger .trace (() -> new ParameterizedMessage ("removing blob [{}] full URI was [{}]" , blobName , blobItem .getUri ()));
155
+ // don't call {@code #deleteBlob}, use the same client
156
+ final CloudBlockBlob azureBlob = blobContainer .getBlockBlobReference (blobName );
157
+ azureBlob .delete (DeleteSnapshotsOption .NONE , null , null , client .v2 ().get ());
158
+ }
159
+ });
160
+ }
70
161
71
- void deleteFiles (String account , String container , String path ) throws URISyntaxException , StorageException ;
162
+ /**
163
+ * Extract the blob name from a URI like https://myservice.azure.net/container/path/to/myfile
164
+ * It should remove the container part (first part of the path) and gives path/to/myfile
165
+ * @param uri URI to parse
166
+ * @return The blob name relative to the container
167
+ */
168
+ static String blobNameFromUri (URI uri ) {
169
+ final String path = uri .getPath ();
170
+ // We remove the container name from the path
171
+ // The 3 magic number cames from the fact if path is /container/path/to/myfile
172
+ // First occurrence is empty "/"
173
+ // Second occurrence is "container
174
+ // Last part contains "path/to/myfile" which is what we want to get
175
+ final String [] splits = path .split ("/" , 3 );
176
+ // We return the remaining end of the string
177
+ return splits [2 ];
178
+ }
72
179
73
- boolean blobExists (String account , String container , String blob ) throws URISyntaxException , StorageException ;
180
+ public boolean blobExists (String account , String container , String blob ) throws URISyntaxException , StorageException {
181
+ // Container name must be lower case.
182
+ final Tuple <CloudBlobClient , Supplier <OperationContext >> client = client (account );
183
+ final CloudBlobContainer blobContainer = client .v1 ().getContainerReference (container );
184
+ return SocketAccess .doPrivilegedException (() -> {
185
+ final CloudBlockBlob azureBlob = blobContainer .getBlockBlobReference (blob );
186
+ return azureBlob .exists (null , null , client .v2 ().get ());
187
+ });
188
+ }
74
189
75
- void deleteBlob (String account , String container , String blob ) throws URISyntaxException , StorageException ;
190
+ public void deleteBlob (String account , String container , String blob ) throws URISyntaxException , StorageException {
191
+ final Tuple <CloudBlobClient , Supplier <OperationContext >> client = client (account );
192
+ // Container name must be lower case.
193
+ final CloudBlobContainer blobContainer = client .v1 ().getContainerReference (container );
194
+ logger .trace (() -> new ParameterizedMessage ("delete blob for container [{}], blob [{}]" , container , blob ));
195
+ SocketAccess .doPrivilegedVoidException (() -> {
196
+ final CloudBlockBlob azureBlob = blobContainer .getBlockBlobReference (blob );
197
+ logger .trace (() -> new ParameterizedMessage ("container [{}]: blob [{}] found. removing." , container , blob ));
198
+ azureBlob .delete (DeleteSnapshotsOption .NONE , null , null , client .v2 ().get ());
199
+ });
200
+ }
76
201
77
- InputStream getInputStream (String account , String container , String blob ) throws URISyntaxException , StorageException , IOException ;
202
+ public InputStream getInputStream (String account , String container , String blob )
203
+ throws URISyntaxException , StorageException , IOException {
204
+ final Tuple <CloudBlobClient , Supplier <OperationContext >> client = client (account );
205
+ final CloudBlockBlob blockBlobReference = client .v1 ().getContainerReference (container ).getBlockBlobReference (blob );
206
+ logger .trace (() -> new ParameterizedMessage ("reading container [{}], blob [{}]" , container , blob ));
207
+ final BlobInputStream is = SocketAccess .doPrivilegedException (() ->
208
+ blockBlobReference .openInputStream (null , null , client .v2 ().get ()));
209
+ return giveSocketPermissionsToStream (is );
210
+ }
78
211
79
- Map <String , BlobMetaData > listBlobsByPrefix (String account , String container , String keyPath , String prefix )
80
- throws URISyntaxException , StorageException ;
212
+ public Map <String , BlobMetaData > listBlobsByPrefix (String account , String container , String keyPath , String prefix )
213
+ throws URISyntaxException , StorageException {
214
+ // NOTE: this should be here: if (prefix == null) prefix = "";
215
+ // however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
216
+ // then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
217
+ final MapBuilder <String , BlobMetaData > blobsBuilder = MapBuilder .newMapBuilder ();
218
+ final EnumSet <BlobListingDetails > enumBlobListingDetails = EnumSet .of (BlobListingDetails .METADATA );
219
+ final Tuple <CloudBlobClient , Supplier <OperationContext >> client = client (account );
220
+ final CloudBlobContainer blobContainer = client .v1 ().getContainerReference (container );
221
+ logger .trace (() -> new ParameterizedMessage ("listing container [{}], keyPath [{}], prefix [{}]" , container , keyPath , prefix ));
222
+ SocketAccess .doPrivilegedVoidException (() -> {
223
+ for (final ListBlobItem blobItem : blobContainer .listBlobs (keyPath + (prefix == null ? "" : prefix ), false ,
224
+ enumBlobListingDetails , null , client .v2 ().get ())) {
225
+ final URI uri = blobItem .getUri ();
226
+ logger .trace (() -> new ParameterizedMessage ("blob url [{}]" , uri ));
227
+ // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
228
+ // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
229
+ final String blobPath = uri .getPath ().substring (1 + container .length () + 1 );
230
+ final BlobProperties properties = ((CloudBlockBlob ) blobItem ).getProperties ();
231
+ final String name = blobPath .substring (keyPath .length ());
232
+ logger .trace (() -> new ParameterizedMessage ("blob url [{}], name [{}], size [{}]" , uri , name , properties .getLength ()));
233
+ blobsBuilder .put (name , new PlainBlobMetaData (name , properties .getLength ()));
234
+ }
235
+ });
236
+ return blobsBuilder .immutableMap ();
237
+ }
81
238
82
- void writeBlob (String account , String container , String blobName , InputStream inputStream , long blobSize )
83
- throws URISyntaxException , StorageException , FileAlreadyExistsException ;
239
+ public void writeBlob (String account , String container , String blobName , InputStream inputStream , long blobSize )
240
+ throws URISyntaxException , StorageException , FileAlreadyExistsException {
241
+ logger .trace (() -> new ParameterizedMessage ("writeBlob({}, stream, {})" , blobName , blobSize ));
242
+ final Tuple <CloudBlobClient , Supplier <OperationContext >> client = client (account );
243
+ final CloudBlobContainer blobContainer = client .v1 ().getContainerReference (container );
244
+ final CloudBlockBlob blob = blobContainer .getBlockBlobReference (blobName );
245
+ try {
246
+ SocketAccess .doPrivilegedVoidException (() ->
247
+ blob .upload (inputStream , blobSize , AccessCondition .generateIfNotExistsCondition (), null , client .v2 ().get ()));
248
+ } catch (final StorageException se ) {
249
+ if (se .getHttpStatusCode () == HttpURLConnection .HTTP_CONFLICT &&
250
+ StorageErrorCodeStrings .BLOB_ALREADY_EXISTS .equals (se .getErrorCode ())) {
251
+ throw new FileAlreadyExistsException (blobName , null , se .getMessage ());
252
+ }
253
+ throw se ;
254
+ }
255
+ logger .trace (() -> new ParameterizedMessage ("writeBlob({}, stream, {}) - done" , blobName , blobSize ));
256
+ }
84
257
85
- static InputStream giveSocketPermissionsToStream (InputStream stream ) {
258
+ static InputStream giveSocketPermissionsToStream (final InputStream stream ) {
86
259
return new InputStream () {
87
260
@ Override
88
261
public int read () throws IOException {
0 commit comments