diff --git a/README.md b/README.md index 487cf8ffc5..eed94ccf15 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,7 @@ Use the `addBodyPart` method to add a multipart part to the request. This part can be of type: * `ByteArrayPart` * `FilePart` +* `InputStreamPart` * `StringPart` ### Dealing with Responses diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java index 728e2ec896..1a7d50b3fd 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java @@ -53,7 +53,7 @@ public long getContentLength() { public void write(final Channel channel, NettyResponseFuture future) { Object msg; - if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy()) { + if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy() && getContentLength() > 0) { msg = new BodyFileRegion((RandomAccessBody) body); } else { diff --git a/client/src/main/java/org/asynchttpclient/request/body/multipart/InputStreamPart.java b/client/src/main/java/org/asynchttpclient/request/body/multipart/InputStreamPart.java new file mode 100644 index 0000000000..ca7d0db367 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/request/body/multipart/InputStreamPart.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2018 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.request.body.multipart; + +import java.io.InputStream; +import java.nio.charset.Charset; + +import static org.asynchttpclient.util.Assertions.assertNotNull; + +public class InputStreamPart extends FileLikePart { + + private final InputStream inputStream; + private final long contentLength; + + public InputStreamPart(String name, InputStream inputStream, String fileName) { + this(name, inputStream, fileName, -1); + } + + public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength) { + this(name, inputStream, fileName, contentLength, null); + } + + public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType) { + this(name, inputStream, fileName, contentLength, contentType, null); + } + + public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset) { + this(name, inputStream, fileName, contentLength, contentType, charset, null); + } + + public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset, + String contentId) { + this(name, inputStream, fileName, contentLength, contentType, charset, contentId, null); + } + + public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset, + String contentId, String transferEncoding) { + super(name, + contentType, + charset, + fileName, + contentId, + transferEncoding); + this.inputStream = assertNotNull(inputStream, "inputStream"); + this.contentLength = contentLength; + } + + public InputStream getInputStream() { + return inputStream; + } + + public long getContentLength() { + return contentLength; + } +} diff --git a/client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartUtils.java b/client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartUtils.java index 94bcb295d5..78e2d130a4 100644 --- a/client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartUtils.java +++ b/client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartUtils.java @@ -75,6 +75,9 @@ public static List> generateMultipartParts(List { + + private long position = 0L; + private ByteBuffer buffer; + private ReadableByteChannel channel; + + public InputStreamMultipartPart(InputStreamPart part, byte[] boundary) { + super(part, boundary); + } + + private ByteBuffer getBuffer() { + if (buffer == null) { + buffer = ByteBuffer.allocateDirect(BodyChunkedInput.DEFAULT_CHUNK_SIZE); + } + return buffer; + } + + private ReadableByteChannel getChannel() { + if (channel == null) { + channel = Channels.newChannel(part.getInputStream()); + } + return channel; + } + + @Override + protected long getContentLength() { + return part.getContentLength(); + } + + @Override + protected long transferContentTo(ByteBuf target) throws IOException { + InputStream inputStream = part.getInputStream(); + int transferred = target.writeBytes(inputStream, target.writableBytes()); + if (transferred > 0) { + position += transferred; + } + if (position == getContentLength() || transferred < 0) { + state = MultipartState.POST_CONTENT; + inputStream.close(); + } + return transferred; + } + + @Override + protected long transferContentTo(WritableByteChannel target) throws IOException { + ReadableByteChannel channel = getChannel(); + ByteBuffer buffer = getBuffer(); + + int transferred = 0; + int read = channel.read(buffer); + + if (read > 0) { + buffer.flip(); + while (buffer.hasRemaining()) { + transferred += target.write(buffer); + } + buffer.compact(); + position += transferred; + } + if (position == getContentLength() || read < 0) { + state = MultipartState.POST_CONTENT; + if (channel.isOpen()) { + channel.close(); + } + } + + return transferred; + } + + @Override + public void close() { + super.close(); + closeSilently(part.getInputStream()); + closeSilently(channel); + } + +} diff --git a/client/src/main/java/org/asynchttpclient/request/body/multipart/part/MultipartPart.java b/client/src/main/java/org/asynchttpclient/request/body/multipart/part/MultipartPart.java index 38041338e8..b8c8622680 100644 --- a/client/src/main/java/org/asynchttpclient/request/body/multipart/part/MultipartPart.java +++ b/client/src/main/java/org/asynchttpclient/request/body/multipart/part/MultipartPart.java @@ -106,6 +106,10 @@ public abstract class MultipartPart implements Closeable { } public long length() { + long contentLength = getContentLength(); + if (contentLength < 0) { + return contentLength; + } return preContentLength + postContentLength + getContentLength(); } diff --git a/client/src/test/java/org/asynchttpclient/request/body/InputStreamPartLargeFileTest.java b/client/src/test/java/org/asynchttpclient/request/body/InputStreamPartLargeFileTest.java new file mode 100644 index 0000000000..48d45341b5 --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/request/body/InputStreamPartLargeFileTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2018 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.request.body; + +import org.asynchttpclient.AbstractBasicTest; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Response; +import org.asynchttpclient.request.body.multipart.InputStreamPart; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.testng.annotations.Test; + +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.*; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; +import static org.asynchttpclient.test.TestUtils.LARGE_IMAGE_FILE; +import static org.asynchttpclient.test.TestUtils.createTempFile; +import static org.testng.Assert.assertEquals; + +public class InputStreamPartLargeFileTest extends AbstractBasicTest { + + @Override + public AbstractHandler configureHandler() throws Exception { + return new AbstractHandler() { + + public void handle(String target, Request baseRequest, HttpServletRequest req, HttpServletResponse resp) throws IOException { + + ServletInputStream in = req.getInputStream(); + byte[] b = new byte[8192]; + + int count; + int total = 0; + while ((count = in.read(b)) != -1) { + b = new byte[8192]; + total += count; + } + resp.setStatus(200); + resp.addHeader("X-TRANSFERRED", String.valueOf(total)); + resp.getOutputStream().flush(); + resp.getOutputStream().close(); + + baseRequest.setHandled(true); + } + }; + } + + @Test + public void testPutImageFile() throws Exception { + try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) { + InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE)); + Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get(); + assertEquals(response.getStatusCode(), 200); + } + } + + @Test + public void testPutImageFileUnknownSize() throws Exception { + try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) { + InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE)); + Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get(); + assertEquals(response.getStatusCode(), 200); + } + } + + @Test + public void testPutLargeTextFile() throws Exception { + File file = createTempFile(1024 * 1024); + InputStream inputStream = new BufferedInputStream(new FileInputStream(file)); + + try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) { + Response response = client.preparePut(getTargetUrl()) + .addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get(); + assertEquals(response.getStatusCode(), 200); + } + } + + @Test + public void testPutLargeTextFileUnknownSize() throws Exception { + File file = createTempFile(1024 * 1024); + InputStream inputStream = new BufferedInputStream(new FileInputStream(file)); + + try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) { + Response response = client.preparePut(getTargetUrl()) + .addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get(); + assertEquals(response.getStatusCode(), 200); + } + } +} diff --git a/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java b/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java index 0b6c5fe6f2..fc54d396ac 100644 --- a/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java +++ b/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java @@ -19,8 +19,7 @@ import org.asynchttpclient.request.body.Body.BodyState; import org.testng.annotations.Test; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.net.URISyntaxException; import java.net.URL; import java.nio.ByteBuffer; @@ -63,7 +62,15 @@ private static File getTestfile() throws URISyntaxException { } private static MultipartBody buildMultipart() { - return MultipartUtils.newMultipartBody(PARTS, EmptyHttpHeaders.INSTANCE); + List parts = new ArrayList<>(PARTS); + try { + File testFile = getTestfile(); + InputStream inputStream = new BufferedInputStream(new FileInputStream(testFile)); + parts.add(new InputStreamPart("isPart", inputStream, testFile.getName(), testFile.length())); + } catch (URISyntaxException | FileNotFoundException e) { + throw new ExceptionInInitializerError(e); + } + return MultipartUtils.newMultipartBody(parts, EmptyHttpHeaders.INSTANCE); } private static long transferWithCopy(MultipartBody multipartBody, int bufferSize) throws IOException { diff --git a/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartUploadTest.java b/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartUploadTest.java index 77584ecdf3..879a40a9d7 100644 --- a/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartUploadTest.java +++ b/client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartUploadTest.java @@ -77,21 +77,33 @@ public void testSendingSmallFilesAndByteArray() throws Exception { File testResource1File = getClasspathFile(testResource1); File testResource2File = getClasspathFile(testResource2); File testResource3File = getClasspathFile(testResource3); + InputStream inputStreamFile1 = new BufferedInputStream(new FileInputStream(testResource1File)); + InputStream inputStreamFile2 = new BufferedInputStream(new FileInputStream(testResource2File)); + InputStream inputStreamFile3 = new BufferedInputStream(new FileInputStream(testResource3File)); List testFiles = new ArrayList<>(); testFiles.add(testResource1File); testFiles.add(testResource2File); testFiles.add(testResource3File); + testFiles.add(testResource3File); + testFiles.add(testResource2File); + testFiles.add(testResource1File); List expected = new ArrayList<>(); expected.add(expectedContents); expected.add(expectedContents2); expected.add(expectedContents3); + expected.add(expectedContents3); + expected.add(expectedContents2); + expected.add(expectedContents); List gzipped = new ArrayList<>(); gzipped.add(false); gzipped.add(true); gzipped.add(false); + gzipped.add(false); + gzipped.add(true); + gzipped.add(false); File tmpFile = File.createTempFile("textbytearray", ".txt"); try (OutputStream os = Files.newOutputStream(tmpFile.toPath())) { @@ -109,8 +121,11 @@ public void testSendingSmallFilesAndByteArray() throws Exception { .addBodyPart(new StringPart("Name", "Dominic")) .addBodyPart(new FilePart("file3", testResource3File, "text/plain", UTF_8)) .addBodyPart(new StringPart("Age", "3")).addBodyPart(new StringPart("Height", "shrimplike")) + .addBodyPart(new InputStreamPart("inputStream3", inputStreamFile3, testResource3File.getName(), testResource3File.length(), "text/plain", UTF_8)) + .addBodyPart(new InputStreamPart("inputStream2", inputStreamFile2, testResource2File.getName(), testResource2File.length(), "application/x-gzip", null)) .addBodyPart(new StringPart("Hair", "ridiculous")).addBodyPart(new ByteArrayPart("file4", expectedContents.getBytes(UTF_8), "text/plain", UTF_8, "bytearray.txt")) + .addBodyPart(new InputStreamPart("inputStream1", inputStreamFile1, testResource1File.getName(), testResource1File.length(), "text/plain", UTF_8)) .build(); Response res = c.executeRequest(r).get(); @@ -142,6 +157,65 @@ public void sendEmptyFileZeroCopy() throws Exception { sendEmptyFile0(false); } + private void sendEmptyFileInputStream(boolean disableZeroCopy) throws Exception { + File file = getClasspathFile("empty.txt"); + try (AsyncHttpClient c = asyncHttpClient(config().setDisableZeroCopy(disableZeroCopy))) { + InputStream inputStream = new BufferedInputStream(new FileInputStream(file)); + Request r = post("http://localhost" + ":" + port1 + "/upload") + .addBodyPart(new InputStreamPart("file", inputStream, file.getName(), file.length(), "text/plain", UTF_8)).build(); + + Response res = c.executeRequest(r).get(); + assertEquals(res.getStatusCode(), 200); + } + } + + @Test + public void testSendEmptyFileInputStream() throws Exception { + sendEmptyFileInputStream(true); + } + + @Test + public void testSendEmptyFileInputStreamZeroCopy() throws Exception { + sendEmptyFileInputStream(false); + } + + private void sendFileInputStream(boolean useContentLength, boolean disableZeroCopy) throws Exception { + File file = getClasspathFile("textfile.txt"); + try (AsyncHttpClient c = asyncHttpClient(config().setDisableZeroCopy(disableZeroCopy))) { + InputStream inputStream = new BufferedInputStream(new FileInputStream(file)); + InputStreamPart part; + if (useContentLength) { + part = new InputStreamPart("file", inputStream, file.getName(), file.length()); + } else { + part = new InputStreamPart("file", inputStream, file.getName()); + } + Request r = post("http://localhost" + ":" + port1 + "/upload").addBodyPart(part).build(); + + Response res = c.executeRequest(r).get(); + assertEquals(res.getStatusCode(), 200); + } + } + + @Test + public void testSendFileInputStreamUnknownContentLength() throws Exception { + sendFileInputStream(false, true); + } + + @Test + public void testSendFileInputStreamZeroCopyUnknownContentLength() throws Exception { + sendFileInputStream(false, false); + } + + @Test + public void testSendFileInputStreamKnownContentLength() throws Exception { + sendFileInputStream(true, true); + } + + @Test + public void testSendFileInputStreamZeroCopyKnownContentLength() throws Exception { + sendFileInputStream(true, false); + } + /** * Test that the files were sent, based on the response from the servlet */