@@ -46,6 +46,7 @@ class S3RetryingInputStream extends InputStream {
46
46
private final int maxAttempts ;
47
47
48
48
private InputStream currentStream ;
49
+ private int attempt = 1 ;
49
50
private long currentOffset ;
50
51
51
52
S3RetryingInputStream (S3BlobStore blobStore , String blobKey ) throws IOException {
@@ -75,24 +76,20 @@ private InputStream openStream() throws IOException {
75
76
76
77
@ Override
77
78
public int read () throws IOException {
78
- int attempt = 0 ;
79
79
while (true ) {
80
- attempt += 1 ;
81
80
try {
82
81
final int result = currentStream .read ();
83
82
currentOffset += 1 ;
84
83
return result ;
85
84
} catch (IOException e ) {
86
- reopenStreamOrFail (attempt , e );
85
+ reopenStreamOrFail (e );
87
86
}
88
87
}
89
88
}
90
89
91
90
@ Override
92
91
public int read (byte [] b , int off , int len ) throws IOException {
93
- int attempt = 0 ;
94
92
while (true ) {
95
- attempt += 1 ;
96
93
try {
97
94
final int bytesRead = currentStream .read (b , off , len );
98
95
if (bytesRead == -1 ) {
@@ -101,17 +98,18 @@ public int read(byte[] b, int off, int len) throws IOException {
101
98
currentOffset += bytesRead ;
102
99
return bytesRead ;
103
100
} catch (IOException e ) {
104
- reopenStreamOrFail (attempt , e );
101
+ reopenStreamOrFail (e );
105
102
}
106
103
}
107
104
}
108
105
109
- private void reopenStreamOrFail (int attempt , IOException e ) throws IOException {
106
+ private void reopenStreamOrFail (IOException e ) throws IOException {
110
107
if (attempt >= maxAttempts ) {
111
108
throw e ;
112
109
}
113
110
logger .debug (new ParameterizedMessage ("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying" ,
114
111
blobStore .bucket (), blobKey , currentOffset , attempt , maxAttempts ), e );
112
+ attempt += 1 ;
115
113
IOUtils .closeWhileHandlingException (currentStream );
116
114
currentStream = openStream ();
117
115
}
0 commit comments