@@ -148,29 +148,37 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
148
148
, Parts(MakeCompressorQueue(compression))
149
149
, DirtyWrite(dirtyWrite)
150
150
, Token(token)
151
- , AuthInfo(Credentials.GetAuthInfo())
152
151
{
153
152
YQL_ENSURE (Parts, " Compression '" << compression << " ' is not supported." );
154
153
}
155
154
156
155
void Bootstrap (const TActorId& parentId) {
157
156
ParentId = parentId;
158
157
LOG_D (" TS3FileWriteActor" , " Bootstrap by " << ParentId << " for Key: [" << Key << " ], Url: [" << Url << " ], request id: [" << RequestId << " ]" );
158
+ try {
159
+ BeginPartsUpload (Credentials.GetAuthInfo ());
160
+ }
161
+ catch (const yexception& ex) {
162
+ FailOnException ();
163
+ }
164
+ }
165
+
166
+ void BeginPartsUpload (const TS3Credentials::TAuthInfo& authInfo) {
159
167
if (DirtyWrite && Parts->IsSealed () && Parts->Size () <= 1 ) {
160
- Become (&TS3FileWriteActor::SinglepartWorkingStateFunc);
168
+ Become (&TS3FileWriteActor::StateFuncWrapper<&TS3FileWriteActor:: SinglepartWorkingStateFunc> );
161
169
const size_t size = Max<size_t >(Parts->Volume (), 1 );
162
170
InFlight += size;
163
171
SentSize += size;
164
172
Gateway->Upload (Url,
165
- IHTTPGateway::MakeYcHeaders (RequestId, AuthInfo .GetToken (), {}, AuthInfo .GetAwsUserPwd (), AuthInfo .GetAwsSigV4 ()),
173
+ IHTTPGateway::MakeYcHeaders (RequestId, authInfo .GetToken (), {}, authInfo .GetAwsUserPwd (), authInfo .GetAwsSigV4 ()),
166
174
Parts->Pop (),
167
175
std::bind (&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId (), ParentId, Key, Url, RequestId, size, std::placeholders::_1),
168
176
true ,
169
177
RetryPolicy);
170
178
} else {
171
- Become (&TS3FileWriteActor::MultipartInitialStateFunc);
179
+ Become (&TS3FileWriteActor::StateFuncWrapper<&TS3FileWriteActor:: MultipartInitialStateFunc> );
172
180
Gateway->Upload (Url + " ?uploads" ,
173
- IHTTPGateway::MakeYcHeaders (RequestId, AuthInfo .GetToken (), {}, AuthInfo .GetAwsUserPwd (), AuthInfo .GetAwsSigV4 ()),
181
+ IHTTPGateway::MakeYcHeaders (RequestId, authInfo .GetToken (), {}, authInfo .GetAwsUserPwd (), authInfo .GetAwsSigV4 ()),
174
182
0 ,
175
183
std::bind (&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId (), ParentId, RequestId, std::placeholders::_1),
176
184
false ,
@@ -186,7 +194,7 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
186
194
187
195
void PassAway () override {
188
196
if (InFlight || !Parts->Empty ()) {
189
- AbortMultipartUpload ();
197
+ SafeAbortMultipartUpload ();
190
198
LOG_W (" TS3FileWriteActor" , " PassAway: but NOT finished, InFlight: " << InFlight << " , Parts: " << Parts->Size () << " , Sealed: " << Parts->IsSealed () << " , request id: [" << RequestId << " ]" );
191
199
} else {
192
200
LOG_D (" TS3FileWriteActor" , " PassAway: request id: [" << RequestId << " ]" );
@@ -236,6 +244,15 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
236
244
return InFlight + Parts->Volume ();
237
245
}
238
246
private:
247
+ template <void (TS3FileWriteActor::* DelegatedStateFunc)(STFUNC_SIG)>
248
+ STFUNC (StateFuncWrapper) {
249
+ try {
250
+ (this ->*DelegatedStateFunc)(ev);
251
+ } catch (...) {
252
+ FailOnException ();
253
+ }
254
+ }
255
+
239
256
STRICT_STFUNC (MultipartInitialStateFunc,
240
257
hFunc (TEvPrivate::TEvUploadStarted, Handle );
241
258
)
@@ -347,7 +364,7 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
347
364
348
365
void Handle (TEvPrivate::TEvUploadStarted::TPtr& result) {
349
366
UploadId = result->Get ()->UploadId ;
350
- Become (&TS3FileWriteActor::MultipartWorkingStateFunc);
367
+ Become (&TS3FileWriteActor::StateFuncWrapper<&TS3FileWriteActor:: MultipartWorkingStateFunc> );
351
368
StartUploadParts ();
352
369
}
353
370
@@ -360,29 +377,16 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
360
377
}
361
378
}
362
379
363
- bool UpdateAuthInfo () {
364
- try {
365
- AuthInfo = Credentials.GetAuthInfo ();
366
- return true ;
367
- }
368
- catch (const yexception& ex) {
369
- Send (ParentId, new TEvPrivate::TEvUploadError (NYql::NDqProto::StatusIds::BAD_REQUEST, TStringBuilder () << " Failed to get auth info: " << ex.what ()));
370
- return false ;
371
- }
372
- }
373
-
374
380
void StartUploadParts () {
375
381
while (auto part = Parts->Pop ()) {
376
382
const auto size = part.size ();
377
383
const auto index = Tags.size ();
378
384
Tags.emplace_back ();
379
385
InFlight += size;
380
386
SentSize += size;
381
- if (!UpdateAuthInfo ()) {
382
- return ;
383
- }
387
+ auto authInfo = Credentials.GetAuthInfo ();
384
388
Gateway->Upload (Url + " ?partNumber=" + std::to_string (index + 1 ) + " &uploadId=" + UploadId,
385
- IHTTPGateway::MakeYcHeaders (RequestId, AuthInfo .GetToken (), {}, AuthInfo .GetAwsUserPwd (), AuthInfo .GetAwsSigV4 ()),
389
+ IHTTPGateway::MakeYcHeaders (RequestId, authInfo .GetToken (), {}, authInfo .GetAwsUserPwd (), authInfo .GetAwsSigV4 ()),
386
390
std::move (part),
387
391
std::bind (&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId (), ParentId, size, index , RequestId, std::placeholders::_1),
388
392
true ,
@@ -408,35 +412,44 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
408
412
for (const auto & tag : Tags)
409
413
xml << " <Part><PartNumber>" << ++i << " </PartNumber><ETag>" << tag << " </ETag></Part>" << Endl;
410
414
xml << " </CompleteMultipartUpload>" << Endl;
411
- if (!UpdateAuthInfo ()) {
412
- return ;
413
- }
415
+ auto authInfo = Credentials.GetAuthInfo ();
414
416
Gateway->Upload (Url + " ?uploadId=" + UploadId,
415
- IHTTPGateway::MakeYcHeaders (RequestId, AuthInfo .GetToken (), " application/xml" , AuthInfo .GetAwsUserPwd (), AuthInfo .GetAwsSigV4 ()),
417
+ IHTTPGateway::MakeYcHeaders (RequestId, authInfo .GetToken (), " application/xml" , authInfo .GetAwsUserPwd (), authInfo .GetAwsSigV4 ()),
416
418
xml,
417
419
std::bind (&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId (), ParentId, Key, Url, RequestId, SentSize, std::placeholders::_1),
418
420
false ,
419
421
RetryPolicy);
420
422
}
421
423
422
- void AbortMultipartUpload () {
424
+ void SafeAbortMultipartUpload () {
425
+ try {
426
+ AbortMultipartUpload (Credentials.GetAuthInfo ());
427
+ }
428
+ catch (const yexception& ex) {
429
+ LOG_W (" TS3FileWriteActor" , " Failed to abort multipart upload, error: " << CurrentExceptionMessage ());
430
+ }
431
+ }
432
+
433
+ void AbortMultipartUpload (const TS3Credentials::TAuthInfo& authInfo) {
423
434
// Try to abort multipart upload in case of unexpected termination.
424
435
// In case of error just logs warning.
425
436
426
437
if (!UploadId) {
427
438
return ;
428
439
}
429
440
430
- if (!UpdateAuthInfo ()) {
431
- return ;
432
- }
433
441
Gateway->Delete (Url + " ?uploadId=" + UploadId,
434
- IHTTPGateway::MakeYcHeaders (RequestId, AuthInfo .GetToken (), " application/xml" , AuthInfo .GetAwsUserPwd (), AuthInfo .GetAwsSigV4 ()),
442
+ IHTTPGateway::MakeYcHeaders (RequestId, authInfo .GetToken (), " application/xml" , authInfo .GetAwsUserPwd (), authInfo .GetAwsSigV4 ()),
435
443
std::bind (&TS3FileWriteActor::OnMultipartUploadAbort, ActorSystem, SelfId (), TxId, RequestId, std::placeholders::_1),
436
444
RetryPolicy);
437
445
UploadId.clear ();
438
446
}
439
447
448
+ void FailOnException () {
449
+ Send (ParentId, new TEvPrivate::TEvUploadError (NYql::NDqProto::StatusIds::BAD_REQUEST, CurrentExceptionMessage ()));
450
+ SafeAbortMultipartUpload ();
451
+ }
452
+
440
453
size_t InFlight = 0ULL ;
441
454
size_t SentSize = 0ULL ;
442
455
@@ -458,7 +471,6 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
458
471
TString UploadId;
459
472
bool DirtyWrite;
460
473
TString Token;
461
- TS3Credentials::TAuthInfo AuthInfo;
462
474
};
463
475
464
476
class TS3WriteActor : public TActorBootstrapped <TS3WriteActor>, public IDqComputeActorAsyncOutput {
0 commit comments