23
23
import org .apache .lucene .index .IndexFormatTooNewException ;
24
24
import org .apache .lucene .index .IndexFormatTooOldException ;
25
25
import org .apache .lucene .store .OutputStreamIndexOutput ;
26
+ import org .elasticsearch .common .CheckedConsumer ;
26
27
import org .elasticsearch .common .CheckedFunction ;
27
28
import org .elasticsearch .common .blobstore .BlobContainer ;
28
29
import org .elasticsearch .common .bytes .BytesArray ;
52
53
*/
53
54
public class ChecksumBlobStoreFormat <T extends ToXContent > extends BlobStoreFormat <T > {
54
55
55
- private static final String TEMP_FILE_PREFIX = "pending-" ;
56
-
57
56
private static final XContentType DEFAULT_X_CONTENT_TYPE = XContentType .SMILE ;
58
57
59
58
// The format version
@@ -120,7 +119,7 @@ public T readBlob(BlobContainer blobContainer, String blobName) throws IOExcepti
120
119
}
121
120
122
121
/**
123
- * Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods .
122
+ * Writes blob in atomic manner with resolving the blob name using {@link #blobName} method .
124
123
* <p>
125
124
* The blob will be compressed and checksum will be written if required.
126
125
*
@@ -131,20 +130,12 @@ public T readBlob(BlobContainer blobContainer, String blobName) throws IOExcepti
131
130
* @param name blob name
132
131
*/
133
132
public void writeAtomic (T obj , BlobContainer blobContainer , String name ) throws IOException {
134
- String blobName = blobName (name );
135
- String tempBlobName = tempBlobName (name );
136
- writeBlob (obj , blobContainer , tempBlobName );
137
- try {
138
- blobContainer .move (tempBlobName , blobName );
139
- } catch (IOException ex ) {
140
- // Move failed - try cleaning up
141
- try {
142
- blobContainer .deleteBlob (tempBlobName );
143
- } catch (Exception e ) {
144
- ex .addSuppressed (e );
133
+ final String blobName = blobName (name );
134
+ writeTo (obj , blobName , bytesArray -> {
135
+ try (InputStream stream = bytesArray .streamInput ()) {
136
+ blobContainer .writeBlobAtomic (blobName , stream , bytesArray .length ());
145
137
}
146
- throw ex ;
147
- }
138
+ });
148
139
}
149
140
150
141
/**
@@ -157,51 +148,35 @@ public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws
157
148
* @param name blob name
158
149
*/
159
150
public void write (T obj , BlobContainer blobContainer , String name ) throws IOException {
160
- String blobName = blobName (name );
161
- writeBlob (obj , blobContainer , blobName );
151
+ final String blobName = blobName (name );
152
+ writeTo (obj , blobName , bytesArray -> {
153
+ try (InputStream stream = bytesArray .streamInput ()) {
154
+ blobContainer .writeBlob (blobName , stream , bytesArray .length ());
155
+ }
156
+ });
162
157
}
163
158
164
- /**
165
- * Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method.
166
- * <p>
167
- * The blob will be compressed and checksum will be written if required.
168
- *
169
- * @param obj object to be serialized
170
- * @param blobContainer blob container
171
- * @param blobName blob name
172
- */
173
- protected void writeBlob (T obj , BlobContainer blobContainer , String blobName ) throws IOException {
174
- BytesReference bytes = write (obj );
175
- try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream ()) {
159
+ private void writeTo (final T obj , final String blobName , final CheckedConsumer <BytesArray , IOException > consumer ) throws IOException {
160
+ final BytesReference bytes = write (obj );
161
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream ()) {
176
162
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\" " + blobName + "\" )" ;
177
- try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput (resourceDesc , blobName , byteArrayOutputStream , BUFFER_SIZE )) {
163
+ try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput (resourceDesc , blobName , outputStream , BUFFER_SIZE )) {
178
164
CodecUtil .writeHeader (indexOutput , codec , VERSION );
179
165
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream (indexOutput ) {
180
166
@ Override
181
167
public void close () throws IOException {
182
168
// this is important since some of the XContentBuilders write bytes on close.
183
169
// in order to write the footer we need to prevent closing the actual index input.
184
- } }) {
170
+ }
171
+ }) {
185
172
bytes .writeTo (indexOutputOutputStream );
186
173
}
187
174
CodecUtil .writeFooter (indexOutput );
188
175
}
189
- BytesArray bytesArray = new BytesArray (byteArrayOutputStream .toByteArray ());
190
- try (InputStream stream = bytesArray .streamInput ()) {
191
- blobContainer .writeBlob (blobName , stream , bytesArray .length ());
192
- }
176
+ consumer .accept (new BytesArray (outputStream .toByteArray ()));
193
177
}
194
178
}
195
179
196
- /**
197
- * Returns true if the blob is a leftover temporary blob.
198
- *
199
- * The temporary blobs might be left after failed atomic write operation.
200
- */
201
- public boolean isTempBlobName (String blobName ) {
202
- return blobName .startsWith (ChecksumBlobStoreFormat .TEMP_FILE_PREFIX );
203
- }
204
-
205
180
protected BytesReference write (T obj ) throws IOException {
206
181
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput ()) {
207
182
if (compress ) {
@@ -222,10 +197,4 @@ protected void write(T obj, StreamOutput streamOutput) throws IOException {
222
197
builder .endObject ();
223
198
}
224
199
}
225
-
226
-
227
- protected String tempBlobName (String name ) {
228
- return TEMP_FILE_PREFIX + String .format (Locale .ROOT , blobNameFormat , name );
229
- }
230
-
231
200
}
0 commit comments