diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/util/SocketAccess.java b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/util/SocketAccess.java index b35ad53b84e22..51f884713420e 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/util/SocketAccess.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/util/SocketAccess.java @@ -39,6 +39,15 @@ public final class SocketAccess { private SocketAccess() {} + public static T doPrivilegedIOException(PrivilegedExceptionAction operation) throws IOException { + SpecialPermission.check(); + try { + return AccessController.doPrivileged(operation); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } + public static T doPrivilegedException(PrivilegedExceptionAction operation) throws StorageException, URISyntaxException { SpecialPermission.check(); try { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageService.java index ec2eec18deb65..9b334ae85bc21 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageService.java @@ -21,6 +21,7 @@ import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.StorageException; +import org.elasticsearch.cloud.azure.blobstore.util.SocketAccess; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -82,4 +83,23 @@ void moveBlob(String account, LocationMode mode, String container, String source void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException; + + static InputStream giveSocketPermissionsToStream(InputStream stream) { + return new InputStream() { + @Override + public int read() throws IOException { + return SocketAccess.doPrivilegedIOException(stream::read); + } + + @Override + public int read(byte[] b) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> stream.read(b)); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> stream.read(b, off, len)); + } + }; + } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceImpl.java b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceImpl.java index 70a03ae22aeeb..b4cdfe39ccc39 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceImpl.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceImpl.java @@ -25,6 +25,7 @@ import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicy; import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.CloudBlobClient; @@ -292,7 +293,9 @@ public InputStream getInputStream(String account, LocationMode mode, String cont logger.trace("reading container [{}], blob [{}]", container, blob); CloudBlobClient client = this.getSelectedClient(account, mode); CloudBlockBlob blockBlobReference = client.getContainerReference(container).getBlockBlobReference(blob); - return SocketAccess.doPrivilegedException(() -> blockBlobReference.openInputStream(null, null, generateOperationContext(account))); + BlobInputStream stream = SocketAccess.doPrivilegedException(() -> + blockBlobReference.openInputStream(null, null, generateOperationContext(account))); + return AzureStorageService.giveSocketPermissionsToStream(stream); } @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java b/plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java index 7aa37bd5e303c..950284b4c4b2b 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java @@ -32,8 +32,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.net.SocketPermission; import java.net.URISyntaxException; import java.nio.file.NoSuchFileException; +import java.security.AccessController; import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -81,7 +83,7 @@ public InputStream getInputStream(String account, LocationMode mode, String cont if (!blobExists(account, mode, container, blob)) { throw new NoSuchFileException("missing blob [" + blob + "]"); } - return new ByteArrayInputStream(blobs.get(blob).toByteArray()); + return AzureStorageService.giveSocketPermissionsToStream(new PermissionRequiringInputStream(blobs.get(blob).toByteArray())); } @Override @@ -169,4 +171,29 @@ public static boolean endsWithIgnoreCase(String str, String suffix) { String lcPrefix = suffix.toLowerCase(Locale.ROOT); return lcStr.equals(lcPrefix); } + + private static class PermissionRequiringInputStream extends ByteArrayInputStream { + + private PermissionRequiringInputStream(byte[] buf) { + super(buf); + } + + @Override + public synchronized int read() { + AccessController.checkPermission(new SocketPermission("*", "connect")); + return super.read(); + } + + @Override + public int read(byte[] b) throws IOException { + AccessController.checkPermission(new SocketPermission("*", "connect")); + return super.read(b); + } + + @Override + public synchronized int read(byte[] b, int off, int len) { + AccessController.checkPermission(new SocketPermission("*", "connect")); + return super.read(b, off, len); + } + } }