Skip to content

#857 Support multipart files using InputStream from source file #1593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 27, 2018
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Use the `addBodyPart` method to add a multipart part to the request.
This part can be of type:
* `ByteArrayPart`
* `FilePart`
* `InputStreamPart`
* `StringPart`

### Dealing with Responses
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.asynchttpclient.request.body.multipart;

import java.io.InputStream;
import java.nio.charset.Charset;

import static org.asynchttpclient.util.Assertions.assertNotNull;

public class InputStreamPart extends FileLikePart {

private final InputStream inputStream;
private final long contentLength;

public InputStreamPart(String name, InputStream inputStream, String fileName) {
this(name, inputStream, fileName, -1);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength) {
this(name, inputStream, fileName, contentLength, null);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType) {
this(name, inputStream, fileName, contentLength, contentType, null);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset) {
this(name, inputStream, fileName, contentLength, contentType, charset, null);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset,
String contentId) {
this(name, inputStream, fileName, contentLength, contentType, charset, contentId, null);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset,
String contentId, String transferEncoding) {
super(name,
contentType,
charset,
fileName,
contentId,
transferEncoding);
this.inputStream = assertNotNull(inputStream, "inputStream");
this.contentLength = contentLength;
}

public InputStream getInputStream() {
return inputStream;
}

public long getContentLength() {
return contentLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public static List<MultipartPart<? extends Part>> generateMultipartParts(List<Pa
} else if (part instanceof StringPart) {
multipartParts.add(new StringMultipartPart((StringPart) part, boundary));

} else if (part instanceof InputStreamPart) {
multipartParts.add(new InputStreamMultipartPart((InputStreamPart) part, boundary));

} else {
throw new IllegalArgumentException("Unknown part type: " + part);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.asynchttpclient.request.body.multipart.part;

import io.netty.buffer.ByteBuf;
import org.asynchttpclient.request.body.multipart.InputStreamPart;

import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.WritableByteChannel;

import static org.asynchttpclient.util.MiscUtils.closeSilently;

public class InputStreamMultipartPart extends FileLikeMultipartPart<InputStreamPart> {

private long position = 0L;

public InputStreamMultipartPart(InputStreamPart part, byte[] boundary) {
super(part, boundary);
}

@Override
protected long getContentLength() {
return part.getContentLength();
}

@Override
protected long transferContentTo(ByteBuf target) throws IOException {
InputStream inputStream = part.getInputStream();
int transferred = target.writeBytes(inputStream, target.writableBytes());
if (transferred > 0) {
position += transferred;
}
if (position == getContentLength() || transferred < 0) {
state = MultipartState.POST_CONTENT;
inputStream.close();
}
return transferred;
}

@Override
protected long transferContentTo(WritableByteChannel target) throws IOException {
throw new UnsupportedOperationException("InputStreamPart does not support zero-copy transfers");
}

@Override
public void close() {
super.close();
closeSilently(part.getInputStream());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public abstract class MultipartPart<T extends PartBase> implements Closeable {
}

public long length() {
long contentLength = getContentLength();
if (contentLength < 0) {
return contentLength;
}
return preContentLength + postContentLength + getContentLength();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix header: Sonatype is no longer involved in this project and new code should be donated to AHC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

package org.asynchttpclient.request.body;

import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.asynchttpclient.request.body.multipart.InputStreamPart;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.testng.annotations.Test;

import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.util.concurrent.ExecutionException;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.asynchttpclient.Dsl.asyncHttpClient;
import static org.asynchttpclient.Dsl.config;
import static org.asynchttpclient.test.TestUtils.LARGE_IMAGE_FILE;
import static org.asynchttpclient.test.TestUtils.createTempFile;
import static org.testng.Assert.assertEquals;

public class InputStreamPartLargeFileTest extends AbstractBasicTest {

@Override
public AbstractHandler configureHandler() throws Exception {
return new AbstractHandler() {

public void handle(String target, Request baseRequest, HttpServletRequest req, HttpServletResponse resp) throws IOException {

ServletInputStream in = req.getInputStream();
byte[] b = new byte[8192];

int count;
int total = 0;
while ((count = in.read(b)) != -1) {
b = new byte[8192];
total += count;
}
resp.setStatus(200);
resp.addHeader("X-TRANSFERRED", String.valueOf(total));
resp.getOutputStream().flush();
resp.getOutputStream().close();

baseRequest.setHandled(true);
}
};
}

@Test(expectedExceptions = ExecutionException.class)
public void testPutImageFileThrowsExecutionException() throws Exception {
// Should throw ExecutionException when zero-copy is enabled
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
}
}

@Test(expectedExceptions = ExecutionException.class)
public void testPutImageFileUnknownSizeThrowsExecutionException() throws Exception {
// Should throw ExecutionException when zero-copy is enabled
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
}
}

@Test
public void testPutImageFile() throws Exception {
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
}
}

@Test
public void testPutImageFileUnknownSize() throws Exception {
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}

@Test(expectedExceptions = ExecutionException.class)
public void testPutLargeTextFileThrowsExecutionException() throws Exception {
File file = createTempFile(1024 * 1024);
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));

try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
Response response = client.preparePut(getTargetUrl())
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}

@Test(expectedExceptions = ExecutionException.class)
public void testPutLargeTextFileUnknownSizeThrowsExecutionException() throws Exception {
File file = createTempFile(1024 * 1024);
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));

try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
Response response = client.preparePut(getTargetUrl())
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}

@Test
public void testPutLargeTextFile() throws Exception {
File file = createTempFile(1024 * 1024);
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));

try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
Response response = client.preparePut(getTargetUrl())
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}

@Test
public void testPutLargeTextFileUnknownSize() throws Exception {
File file = createTempFile(1024 * 1024);
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));

try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
Response response = client.preparePut(getTargetUrl())
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
import org.asynchttpclient.request.body.Body.BodyState;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.io.*;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
Expand All @@ -37,6 +36,7 @@ public class MultipartBodyTest {

private static final List<Part> PARTS = new ArrayList<>();
private static long MAX_MULTIPART_CONTENT_LENGTH_ESTIMATE;
private static long MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE;

static {
try {
Expand All @@ -55,6 +55,13 @@ public class MultipartBodyTest {
}
}

static {
try (MultipartBody dummyBody = buildMultipartWithInputStreamPart()) {
// separator is random
MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE = dummyBody.getContentLength() + 100;
}
}

private static File getTestfile() throws URISyntaxException {
final ClassLoader cl = MultipartBodyTest.class.getClassLoader();
final URL url = cl.getResource("textfile.txt");
Expand All @@ -66,6 +73,18 @@ private static MultipartBody buildMultipart() {
return MultipartUtils.newMultipartBody(PARTS, EmptyHttpHeaders.INSTANCE);
}

private static MultipartBody buildMultipartWithInputStreamPart() {
List<Part> parts = new ArrayList<>(PARTS);
try {
File testFile = getTestfile();
InputStream inputStream = new BufferedInputStream(new FileInputStream(testFile));
parts.add(new InputStreamPart("isPart", inputStream, testFile.getName(), testFile.length()));
} catch (URISyntaxException | FileNotFoundException e) {
throw new ExceptionInInitializerError(e);
}
return MultipartUtils.newMultipartBody(parts, EmptyHttpHeaders.INSTANCE);
}

private static long transferWithCopy(MultipartBody multipartBody, int bufferSize) throws IOException {
long transferred = 0;
final ByteBuf buffer = Unpooled.buffer(bufferSize);
Expand Down Expand Up @@ -121,6 +140,16 @@ public void transferWithCopy() throws Exception {
}
}

@Test
public void transferWithCopyAndInputStreamPart() throws Exception {
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE + 1; bufferLength++) {
try (MultipartBody multipartBody = buildMultipartWithInputStreamPart()) {
long transferred = transferWithCopy(multipartBody, bufferLength);
assertEquals(transferred, multipartBody.getContentLength());
}
}
}

@Test
public void transferZeroCopy() throws Exception {
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_ESTIMATE + 1; bufferLength++) {
Expand All @@ -130,4 +159,13 @@ public void transferZeroCopy() throws Exception {
}
}
}

@Test(expectedExceptions = UnsupportedOperationException.class)
public void transferZeroCopyWithInputStreamPart() throws Exception {
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE + 1; bufferLength++) {
try (MultipartBody multipartBody = buildMultipartWithInputStreamPart()) {
transferZeroCopy(multipartBody, bufferLength);
}
}
}
}
Loading