Skip to content

Commit 68ea245

Browse files
authored
Merge a633036 into 1fa5b08
2 parents 1fa5b08 + a633036 commit 68ea245

File tree

6 files changed

+75
-9
lines changed

6 files changed

+75
-9
lines changed

ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp

+17-5
Original file line numberDiff line numberDiff line change
@@ -805,10 +805,16 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
805805

806806
return future.Apply(
807807
[checkpointContext](const TFuture<NYdb::TStatus>& future) {
808-
if (NYql::TIssues issues = StatusToIssues(future.GetValue())) {
809-
return TCreateCheckpointResult(TString(), std::move(issues));
810-
} else {
811-
return TCreateCheckpointResult(checkpointContext->CheckpointGraphDescriptionContext->GraphDescId, NYql::TIssues());
808+
try {
809+
if (NYql::TIssues issues = StatusToIssues(future.GetValue())) {
810+
return TCreateCheckpointResult(TString(), std::move(issues));
811+
} else {
812+
return TCreateCheckpointResult(checkpointContext->CheckpointGraphDescriptionContext->GraphDescId, NYql::TIssues());
813+
}
814+
} catch (...) {
815+
TIssues issues;
816+
issues.AddIssue(CurrentExceptionMessage());
817+
return TCreateCheckpointResult(TString(), issues);
812818
}
813819
});
814820
}
@@ -1099,7 +1105,13 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStor
10991105
});
11001106
return future.Apply(
11011107
[result](const TFuture<TStatus>& status) {
1102-
return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues()));
1108+
try {
1109+
return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues()));
1110+
} catch (...) {
1111+
TIssues issues;
1112+
issues.AddIssue(CurrentExceptionMessage());
1113+
return ICheckpointStorage::TGetTotalCheckpointsStateSizeResult(0, issues);
1114+
}
11031115
});
11041116
}
11051117

ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp

+12-3
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ TFuture<TStatus> ProcessState(
115115
processed[taskIndex] = true;
116116
}
117117
LoadState(context->States[taskIndex], *parser.ColumnParser("blob").GetOptionalString());
118+
// throw std::runtime_error("ddddd");
118119
}
119120
} else {
120121
errorMessage << "Not all states exist in database";
@@ -286,6 +287,7 @@ TFuture<IStateStorage::TGetStateResult> TStateStorage::GetState(
286287
return MakeFuture<IStateStorage::TGetStateResult>(result);
287288
}
288289

290+
289291
auto context = MakeIntrusive<TContext>(
290292
YdbConnection->TablePathPrefix,
291293
taskIds,
@@ -366,11 +368,18 @@ TFuture<IStateStorage::TCountStatesResult> TStateStorage::CountStates(
366368
[context] (const TFuture<TStatus>& future) {
367369
TCountStatesResult countResult;
368370
countResult.first = context->Count;
369-
const auto& status = future.GetValue();
370-
if (!status.IsSuccess()) {
371-
countResult.second = status.GetIssues();
371+
try {
372+
const auto& status = future.GetValue();
373+
if (!status.IsSuccess()) {
374+
countResult.second = status.GetIssues();
375+
}
376+
} catch (...) {
377+
TIssues issues;
378+
issues.AddIssue(CurrentExceptionMessage());
379+
countResult.second = issues;
372380
}
373381
return countResult;
382+
374383
});
375384
}
376385
TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() {

ydb/core/fq/libs/ydb/ut/ya.make

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
UNITTEST_FOR(ydb/core/fq/libs/ydb)
2+
3+
SRCS(
4+
ydb_ut.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/fq/libs/ydb
9+
)
10+
11+
END()
12+

ydb/core/fq/libs/ydb/ut/ydb_ut.cpp

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include <ydb/core/fq/libs/ydb/ydb.h>
2+
3+
#include <library/cpp/testing/unittest/registar.h>
4+
5+
namespace NFq {
6+
7+
Y_UNIT_TEST_SUITE(TFqYdbTest) {
8+
9+
Y_UNIT_TEST(ShouldStatusToIssuesProcessExceptions)
10+
{
11+
auto promise = NThreading::NewPromise<NYdb::TStatus>();
12+
auto future = promise.GetFuture();
13+
TString text("Test exception");
14+
promise.SetException(text);
15+
NThreading::TFuture<NYql::TIssues> future2 = NFq::StatusToIssues(future);
16+
17+
NYql::TIssues issues = future2.GetValueSync();
18+
UNIT_ASSERT(issues.Size() == 1);
19+
UNIT_ASSERT(issues.ToString().Contains(text));
20+
}
21+
};
22+
23+
} // namespace NFq

ydb/core/fq/libs/ydb/ya.make

+4
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,7 @@ GENERATE_ENUM_SERIALIZATION(ydb.h)
2424
YQL_LAST_ABI_VERSION()
2525

2626
END()
27+
28+
RECURSE_FOR_TESTS(
29+
ut
30+
)

ydb/core/fq/libs/ydb/ydb.cpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,13 @@ NYql::TIssues StatusToIssues(const NYdb::TStatus& status) {
210210
TFuture<TIssues> StatusToIssues(const TFuture<TStatus>& future) {
211211
return future.Apply(
212212
[] (const TFuture<TStatus>& future) {
213-
return StatusToIssues(future.GetValue());
213+
try {
214+
return StatusToIssues(future.GetValue());
215+
} catch (...) {
216+
TIssues issues;
217+
issues.AddIssue(CurrentExceptionMessage());
218+
return issues;
219+
}
214220
});
215221
}
216222

0 commit comments

Comments
 (0)