Skip to content

Commit 1f1826c

Browse files
committed
Fix multipart zero-copy thread usage fairness, close #1018
1 parent 5d79c27 commit 1f1826c

27 files changed

+703
-717
lines changed

Diff for: client/src/main/java/org/asynchttpclient/netty/request/body/BodyChunkedInput.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
*/
2929
public class BodyChunkedInput implements ChunkedInput<ByteBuf> {
3030

31-
private static final int DEFAULT_CHUNK_SIZE = 8 * 1024;
31+
public static final int DEFAULT_CHUNK_SIZE = 8 * 1024;
3232

3333
private final Body body;
3434
private final int contentLength;
@@ -54,7 +54,7 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
5454

5555
// FIXME pass a visitor so we can directly pass a pooled ByteBuf
5656
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
57-
Body.BodyState state = body.read(buffer);
57+
Body.BodyState state = body.transferTo(buffer);
5858
switch (state) {
5959
case STOP:
6060
endOfInput = true;

Diff for: client/src/main/java/org/asynchttpclient/request/body/Body.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,5 @@ enum BodyState {
5555
* @throws IOException If the chunk could not be read.
5656
*/
5757
// FIXME introduce a visitor pattern so that Netty can pass a pooled buffer
58-
BodyState read(ByteBuffer buffer) throws IOException;
58+
BodyState transferTo(ByteBuffer buffer) throws IOException;
5959
}

Diff for: client/src/main/java/org/asynchttpclient/request/body/generator/ByteArrayBodyGenerator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public long getContentLength() {
3636
return bytes.length;
3737
}
3838

39-
public BodyState read(ByteBuffer byteBuffer) throws IOException {
39+
public BodyState transferTo(ByteBuffer byteBuffer) throws IOException {
4040

4141
if (eof) {
4242
return BodyState.STOP;

Diff for: client/src/main/java/org/asynchttpclient/request/body/generator/InputStreamBodyGenerator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public long getContentLength() {
6161
return -1L;
6262
}
6363

64-
public BodyState read(ByteBuffer buffer) throws IOException {
64+
public BodyState transferTo(ByteBuffer buffer) throws IOException {
6565

6666
// To be safe.
6767
chunk = new byte[buffer.remaining() - 10];

Diff for: client/src/main/java/org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ public long getContentLength() {
7878
}
7979

8080
@Override
81-
public BodyState read(ByteBuffer buffer) throws IOException {
81+
public BodyState transferTo(ByteBuffer buffer) throws IOException {
8282
if(initialized.compareAndSet(false, true))
8383
publisher.subscribe(subscriber);
8484

85-
return body.read(buffer);
85+
return body.transferTo(buffer);
8686
}
8787
}
8888

Diff for: client/src/main/java/org/asynchttpclient/request/body/generator/SimpleFeedableBodyGenerator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public long getContentLength() {
5252
}
5353

5454
@Override
55-
public BodyState read(final ByteBuffer buffer) throws IOException {
55+
public BodyState transferTo(final ByteBuffer buffer) throws IOException {
5656
switch (state) {
5757
case CONTINUE:
5858
return readNextPart(buffer);

Diff for: client/src/main/java/org/asynchttpclient/request/body/multipart/ByteArrayPart.java

+2-26
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@
1212
*/
1313
package org.asynchttpclient.request.body.multipart;
1414

15-
import static org.asynchttpclient.util.Assertions.*;
15+
import static org.asynchttpclient.util.Assertions.assertNotNull;
1616

17-
import java.io.IOException;
18-
import java.nio.channels.WritableByteChannel;
1917
import java.nio.charset.Charset;
2018

21-
public class ByteArrayPart extends AbstractFilePart {
19+
public class ByteArrayPart extends FileLikePart {
2220

2321
private final byte[] bytes;
2422

@@ -48,30 +46,8 @@ public ByteArrayPart(String name, byte[] bytes, String contentType, Charset char
4846
this.bytes = bytes;
4947
setFileName(fileName);
5048
}
51-
52-
@Override
53-
protected long getDataLength() {
54-
return bytes.length;
55-
}
5649

5750
public byte[] getBytes() {
5851
return bytes;
5952
}
60-
61-
@Override
62-
public long write(WritableByteChannel target, byte[] boundary) throws IOException {
63-
FilePartStallHandler handler = new FilePartStallHandler(getStalledTime(), this);
64-
65-
try {
66-
handler.start();
67-
68-
long length = MultipartUtils.writeBytesToChannel(target, generateFileStart(boundary));
69-
length += MultipartUtils.writeBytesToChannel(target, bytes);
70-
length += MultipartUtils.writeBytesToChannel(target, generateFileEnd());
71-
72-
return length;
73-
} finally {
74-
handler.completed();
75-
}
76-
}
7753
}

Diff for: client/src/main/java/org/asynchttpclient/request/body/multipart/CounterPartVisitor.java

-34
This file was deleted.

Diff for: client/src/main/java/org/asynchttpclient/request/body/multipart/AbstractFilePart.java renamed to client/src/main/java/org/asynchttpclient/request/body/multipart/FileLikePart.java

+2-41
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,12 @@
1212
*/
1313
package org.asynchttpclient.request.body.multipart;
1414

15-
import static java.nio.charset.StandardCharsets.*;
16-
17-
import java.io.ByteArrayOutputStream;
18-
import java.io.IOException;
1915
import java.nio.charset.Charset;
2016

2117
/**
2218
* This class is an adaptation of the Apache HttpClient implementation
2319
*/
24-
public abstract class AbstractFilePart extends PartBase {
20+
public abstract class FileLikePart extends PartBase {
2521

2622
/**
2723
* Default content encoding of file attachments.
@@ -33,11 +29,6 @@ public abstract class AbstractFilePart extends PartBase {
3329
*/
3430
public static final String DEFAULT_TRANSFER_ENCODING = "binary";
3531

36-
/**
37-
* Attachment's file name as a byte array
38-
*/
39-
private static final byte[] FILE_NAME_BYTES = "; filename=".getBytes(US_ASCII);
40-
4132
private long stalledTime = -1L;
4233

4334
private String fileName;
@@ -51,44 +42,14 @@ public abstract class AbstractFilePart extends PartBase {
5142
* @param contentId the content id
5243
* @param transfertEncoding the transfer encoding
5344
*/
54-
public AbstractFilePart(String name, String contentType, Charset charset, String contentId, String transfertEncoding) {
45+
public FileLikePart(String name, String contentType, Charset charset, String contentId, String transfertEncoding) {
5546
super(name,//
5647
contentType == null ? DEFAULT_CONTENT_TYPE : contentType,//
5748
charset,//
5849
contentId,//
5950
transfertEncoding == null ? DEFAULT_TRANSFER_ENCODING : transfertEncoding);
6051
}
6152

62-
protected void visitDispositionHeader(PartVisitor visitor) throws IOException {
63-
super.visitDispositionHeader(visitor);
64-
if (fileName != null) {
65-
visitor.withBytes(FILE_NAME_BYTES);
66-
visitor.withByte(QUOTE_BYTE);
67-
visitor.withBytes(fileName.getBytes(getCharset() != null ? getCharset() : US_ASCII));
68-
visitor.withByte(QUOTE_BYTE);
69-
}
70-
}
71-
72-
protected byte[] generateFileStart(byte[] boundary) throws IOException {
73-
ByteArrayOutputStream out = new ByteArrayOutputStream();
74-
OutputStreamPartVisitor visitor = new OutputStreamPartVisitor(out);
75-
visitStart(visitor, boundary);
76-
visitDispositionHeader(visitor);
77-
visitContentTypeHeader(visitor);
78-
visitTransferEncodingHeader(visitor);
79-
visitContentIdHeader(visitor);
80-
visitCustomHeaders(visitor);
81-
visitEndOfHeaders(visitor);
82-
return out.toByteArray();
83-
}
84-
85-
protected byte[] generateFileEnd() throws IOException {
86-
ByteArrayOutputStream out = new ByteArrayOutputStream();
87-
OutputStreamPartVisitor visitor = new OutputStreamPartVisitor(out);
88-
visitEnd(visitor);
89-
return out.toByteArray();
90-
}
91-
9253
public void setStalledTime(long ms) {
9354
stalledTime = ms;
9455
}

Diff for: client/src/main/java/org/asynchttpclient/request/body/multipart/FilePart.java

+2-66
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,12 @@
1212
*/
1313
package org.asynchttpclient.request.body.multipart;
1414

15-
import static org.asynchttpclient.util.Assertions.*;
15+
import static org.asynchttpclient.util.Assertions.assertNotNull;
1616

1717
import java.io.File;
18-
import java.io.IOException;
19-
import java.io.RandomAccessFile;
20-
import java.nio.channels.FileChannel;
21-
import java.nio.channels.WritableByteChannel;
2218
import java.nio.charset.Charset;
2319

24-
import org.slf4j.Logger;
25-
import org.slf4j.LoggerFactory;
26-
27-
public class FilePart extends AbstractFilePart {
28-
29-
private static final Logger LOGGER = LoggerFactory.getLogger(FilePart.class);
20+
public class FilePart extends FileLikePart {
3021

3122
private final File file;
3223

@@ -60,63 +51,8 @@ public FilePart(String name, File file, String contentType, Charset charset, Str
6051
this.file = file;
6152
setFileName(fileName != null ? fileName : file.getName());
6253
}
63-
64-
@Override
65-
protected long getDataLength() {
66-
return file.length();
67-
}
6854

6955
public File getFile() {
7056
return file;
7157
}
72-
73-
@Override
74-
public long write(WritableByteChannel target, byte[] boundary) throws IOException {
75-
FilePartStallHandler handler = new FilePartStallHandler(getStalledTime(), this);
76-
77-
handler.start();
78-
79-
int length = 0;
80-
81-
length += MultipartUtils.writeBytesToChannel(target, generateFileStart(boundary));
82-
83-
RandomAccessFile raf = new RandomAccessFile(file, "r");
84-
FileChannel fc = raf.getChannel();
85-
86-
final long fileLength = file.length();
87-
int position = 0;
88-
long nWrite = 0;
89-
try {
90-
// FIXME why sync?
91-
synchronized (fc) {
92-
while (position != fileLength) {
93-
if (handler.isFailed()) {
94-
LOGGER.debug("Stalled error");
95-
throw new FileUploadStalledException();
96-
}
97-
nWrite = fc.transferTo(position, fileLength, target);
98-
99-
if (nWrite == 0) {
100-
LOGGER.info("Waiting for writing...");
101-
try {
102-
fc.wait(50);
103-
} catch (InterruptedException e) {
104-
LOGGER.trace(e.getMessage(), e);
105-
}
106-
} else {
107-
handler.writeHappened();
108-
}
109-
position += nWrite;
110-
}
111-
}
112-
} finally {
113-
handler.completed();
114-
raf.close();
115-
}
116-
117-
length += fileLength;
118-
length += MultipartUtils.writeBytesToChannel(target, generateFileEnd());
119-
120-
return length;
121-
}
12258
}

Diff for: client/src/main/java/org/asynchttpclient/request/body/multipart/FilePartStallHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class FilePartStallHandler extends TimerTask {
2525
private volatile boolean failed;
2626
private volatile boolean written;
2727

28-
public FilePartStallHandler(long waitTime, AbstractFilePart filePart) {
28+
public FilePartStallHandler(long waitTime, FileLikePart filePart) {
2929
this.waitTime = waitTime;
3030
failed = false;
3131
written = false;

0 commit comments

Comments
 (0)