Skip to content

#857 Support multipart files using InputStream from source file #1593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 27, 2018
Merged
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Use the `addBodyPart` method to add a multipart part to the request.
This part can be of type:
* `ByteArrayPart`
* `FilePart`
* `InputStreamPart`
* `StringPart`

### Dealing with Responses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public long getContentLength() {
public void write(final Channel channel, NettyResponseFuture<?> future) {

Object msg;
if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy()) {
if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy() && getContentLength() > 0) {
msg = new BodyFileRegion((RandomAccessBody) body);

} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.request.body.multipart;

import java.io.InputStream;
import java.nio.charset.Charset;

import static org.asynchttpclient.util.Assertions.assertNotNull;

public class InputStreamPart extends FileLikePart {

private final InputStream inputStream;
private final long contentLength;

public InputStreamPart(String name, InputStream inputStream, String fileName) {
this(name, inputStream, fileName, -1);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength) {
this(name, inputStream, fileName, contentLength, null);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType) {
this(name, inputStream, fileName, contentLength, contentType, null);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset) {
this(name, inputStream, fileName, contentLength, contentType, charset, null);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset,
String contentId) {
this(name, inputStream, fileName, contentLength, contentType, charset, contentId, null);
}

public InputStreamPart(String name, InputStream inputStream, String fileName, long contentLength, String contentType, Charset charset,
String contentId, String transferEncoding) {
super(name,
contentType,
charset,
fileName,
contentId,
transferEncoding);
this.inputStream = assertNotNull(inputStream, "inputStream");
this.contentLength = contentLength;
}

public InputStream getInputStream() {
return inputStream;
}

public long getContentLength() {
return contentLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public static List<MultipartPart<? extends Part>> generateMultipartParts(List<Pa
} else if (part instanceof StringPart) {
multipartParts.add(new StringMultipartPart((StringPart) part, boundary));

} else if (part instanceof InputStreamPart) {
multipartParts.add(new InputStreamMultipartPart((InputStreamPart) part, boundary));

} else {
throw new IllegalArgumentException("Unknown part type: " + part);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.request.body.multipart.part;

import io.netty.buffer.ByteBuf;
import org.asynchttpclient.netty.request.body.BodyChunkedInput;
import org.asynchttpclient.request.body.multipart.InputStreamPart;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

import static org.asynchttpclient.util.MiscUtils.closeSilently;

public class InputStreamMultipartPart extends FileLikeMultipartPart<InputStreamPart> {

private long position = 0L;
private ByteBuffer buffer;
private ReadableByteChannel channel;

public InputStreamMultipartPart(InputStreamPart part, byte[] boundary) {
super(part, boundary);
}

private ByteBuffer getBuffer() {
if (buffer == null) {
buffer = ByteBuffer.allocateDirect(BodyChunkedInput.DEFAULT_CHUNK_SIZE);
}
return buffer;
}

private ReadableByteChannel getChannel() {
if (channel == null) {
channel = Channels.newChannel(part.getInputStream());
}
return channel;
}

@Override
protected long getContentLength() {
return part.getContentLength();
}

@Override
protected long transferContentTo(ByteBuf target) throws IOException {
InputStream inputStream = part.getInputStream();
int transferred = target.writeBytes(inputStream, target.writableBytes());
if (transferred > 0) {
position += transferred;
}
if (position == getContentLength() || transferred < 0) {
state = MultipartState.POST_CONTENT;
inputStream.close();
}
return transferred;
}

@Override
protected long transferContentTo(WritableByteChannel target) throws IOException {
ReadableByteChannel channel = getChannel();
ByteBuffer buffer = getBuffer();

int transferred = 0;
int read = channel.read(buffer);

if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
transferred += target.write(buffer);
}
buffer.compact();
position += transferred;
}
if (position == getContentLength() || read < 0) {
state = MultipartState.POST_CONTENT;
if (channel.isOpen()) {
channel.close();
}
}

return transferred;
}

@Override
public void close() {
super.close();
closeSilently(part.getInputStream());
closeSilently(channel);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public abstract class MultipartPart<T extends PartBase> implements Closeable {
}

public long length() {
long contentLength = getContentLength();
if (contentLength < 0) {
return contentLength;
}
return preContentLength + postContentLength + getContentLength();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix header: Sonatype is no longer involved in this project and new code should be donated to AHC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

package org.asynchttpclient.request.body;

import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.asynchttpclient.request.body.multipart.InputStreamPart;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.testng.annotations.Test;

import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.*;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.asynchttpclient.Dsl.asyncHttpClient;
import static org.asynchttpclient.Dsl.config;
import static org.asynchttpclient.test.TestUtils.LARGE_IMAGE_FILE;
import static org.asynchttpclient.test.TestUtils.createTempFile;
import static org.testng.Assert.assertEquals;

public class InputStreamPartLargeFileTest extends AbstractBasicTest {

@Override
public AbstractHandler configureHandler() throws Exception {
return new AbstractHandler() {

public void handle(String target, Request baseRequest, HttpServletRequest req, HttpServletResponse resp) throws IOException {

ServletInputStream in = req.getInputStream();
byte[] b = new byte[8192];

int count;
int total = 0;
while ((count = in.read(b)) != -1) {
b = new byte[8192];
total += count;
}
resp.setStatus(200);
resp.addHeader("X-TRANSFERRED", String.valueOf(total));
resp.getOutputStream().flush();
resp.getOutputStream().close();

baseRequest.setHandled(true);
}
};
}

@Test
public void testPutImageFile() throws Exception {
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}

@Test
public void testPutImageFileUnknownSize() throws Exception {
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}

@Test
public void testPutLargeTextFile() throws Exception {
File file = createTempFile(1024 * 1024);
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));

try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
Response response = client.preparePut(getTargetUrl())
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}

@Test
public void testPutLargeTextFileUnknownSize() throws Exception {
File file = createTempFile(1024 * 1024);
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));

try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
Response response = client.preparePut(getTargetUrl())
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
assertEquals(response.getStatusCode(), 200);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
import org.asynchttpclient.request.body.Body.BodyState;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.io.*;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -63,7 +62,15 @@ private static File getTestfile() throws URISyntaxException {
}

private static MultipartBody buildMultipart() {
return MultipartUtils.newMultipartBody(PARTS, EmptyHttpHeaders.INSTANCE);
List<Part> parts = new ArrayList<>(PARTS);
try {
File testFile = getTestfile();
InputStream inputStream = new BufferedInputStream(new FileInputStream(testFile));
parts.add(new InputStreamPart("isPart", inputStream, testFile.getName(), testFile.length()));
} catch (URISyntaxException | FileNotFoundException e) {
throw new ExceptionInInitializerError(e);
}
return MultipartUtils.newMultipartBody(parts, EmptyHttpHeaders.INSTANCE);
}

private static long transferWithCopy(MultipartBody multipartBody, int bufferSize) throws IOException {
Expand Down
Loading