Skip to content

Commit 74ed298

Browse files
nikvas0blinkov
authored andcommitted
Fix error status for Oltp Sink (#15364)
1 parent f05d21e commit 74ed298

File tree

5 files changed

+34
-14
lines changed

5 files changed

+34
-14
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
301301
try {
302302
switch(ev->GetTypeRewrite()) {
303303
hFunc(TEvKqp::TEvAbortExecution, HandleFinalize);
304+
hFunc(TEvKqpBuffer::TEvError, Handle);
304305
hFunc(TEvKqpBuffer::TEvResult, HandleFinalize);
305306
hFunc(TEvents::TEvUndelivered, HandleFinalize);
306307

@@ -405,6 +406,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
405406
hFunc(TEvSaveScriptExternalEffectResponse, HandleResolve);
406407
hFunc(TEvDescribeSecretsResponse, HandleResolve);
407408
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
409+
hFunc(TEvKqpBuffer::TEvError, Handle);
408410
default:
409411
UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
410412
}
@@ -456,6 +458,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
456458
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
457459
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
458460
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
461+
hFunc(TEvKqpBuffer::TEvError, Handle);
459462
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
460463
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
461464
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
@@ -1136,6 +1139,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11361139
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
11371140
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
11381141
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
1142+
hFunc(TEvKqpBuffer::TEvError, Handle);
11391143
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
11401144
default:
11411145
UnexpectedEvent("ExecuteState", ev->GetTypeRewrite());
@@ -1192,6 +1196,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11921196
}
11931197
}
11941198

1199+
void Handle(TEvKqpBuffer::TEvError::TPtr& ev) {
1200+
auto& msg = *ev->Get();
1201+
TBase::HandleAbortExecution(msg.StatusCode, msg.Issues, false);
1202+
}
1203+
11951204
void HandleExecute(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev) {
11961205
TEvColumnShard::TEvProposeTransactionResult* res = ev->Get();
11971206
const ui64 shardId = res->Record.GetOrigin();
@@ -2295,6 +2304,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22952304
switch (ev->GetTypeRewrite()) {
22962305
hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle);
22972306
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
2307+
hFunc(TEvKqpBuffer::TEvError, Handle);
22982308
default:
22992309
UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite());
23002310
}

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -760,15 +760,22 @@ class TKqpExecuterBase : public TActor<TDerived> {
760760
void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) {
761761
auto& msg = ev->Get()->Record;
762762
NYql::TIssues issues = ev->Get()->GetIssues();
763-
LOG_D("Got EvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())
763+
HandleAbortExecution(msg.GetStatusCode(), ev->Get()->GetIssues(), ev->Sender != Target);
764+
}
765+
766+
void HandleAbortExecution(
767+
NYql::NDqProto::StatusIds::StatusCode statusCode,
768+
const NYql::TIssues& issues,
769+
const bool sessionSender) {
770+
LOG_D("Got EvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode)
764771
<< ", message: " << issues.ToOneLineString());
765-
auto statusCode = NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode());
766-
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
772+
auto ydbStatusCode = NYql::NDq::DqStatusToYdbStatus(statusCode);
773+
if (ydbStatusCode == Ydb::StatusIds::INTERNAL_ERROR) {
767774
InternalError(issues);
768-
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
769-
TimeoutError(ev->Sender, issues);
775+
} else if (ydbStatusCode == Ydb::StatusIds::TIMEOUT) {
776+
TimeoutError(sessionSender, issues);
770777
} else {
771-
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
778+
RuntimeError(NYql::NDq::DqStatusToYdbStatus(statusCode), issues);
772779
}
773780
}
774781

@@ -1891,7 +1898,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
18911898
ReplyErrorAndDie(status, &issues);
18921899
}
18931900

1894-
void TimeoutError(TActorId abortSender, NYql::TIssues issues) {
1901+
void TimeoutError(bool sessionSender, NYql::TIssues issues) {
18951902
if (AlreadyReplied) {
18961903
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
18971904
return;
@@ -1915,7 +1922,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
19151922
NYql::IssuesToMessage(issues, ResponseEv->Record.MutableResponse()->MutableIssues());
19161923

19171924
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
1918-
if (abortSender != Target) {
1925+
if (!sessionSender) {
19191926
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, issues);
19201927
this->Send(Target, abortEv.Release());
19211928
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1745,7 +1745,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17451745
}
17461746

17471747
void Handle(TEvKqpBuffer::TEvError::TPtr& ev) {
1748-
const auto& msg = *ev->Get();
1748+
auto& msg = *ev->Get();
17491749

17501750
TString logMsg = TStringBuilder() << "got TEvKqpBuffer::TEvError in " << CurrentStateFuncName()
17511751
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << " send to: " << ExecuterId << " from: " << ev->Sender;
@@ -1758,8 +1758,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17581758
}
17591759

17601760
if (ExecuterId) {
1761-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.StatusCode, msg.Issues);
1762-
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
1761+
Send(ExecuterId, new TEvKqpBuffer::TEvError{msg.StatusCode, std::move(msg.Issues)}, IEventHandle::FlagTrackDelivery);
17631762
} else {
17641763
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.StatusCode), logMsg, MessageFromIssues(msg.Issues));
17651764
}

ydb/services/ydb/ydb_common_ut.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,12 @@ class TBasicKikimrWithGrpcAndRootSchema {
139139

140140
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG);
141141
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG);
142-
Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_INFO);
142+
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_INFO);
143143
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG);
144144
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG);
145145
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
146+
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NActors::NLog::PRI_DEBUG);
147+
//Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG);
146148
if (enableYq) {
147149
Server_->GetRuntime()->SetLogPriority(NKikimrServices::YQL_PROXY, NActors::NLog::PRI_DEBUG);
148150
Server_->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);

ydb/services/ydb/ydb_ut.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5703,8 +5703,10 @@ Y_UNIT_TEST_SUITE(TYqlDateTimeTests) {
57035703
#endif
57045704

57055705
Y_UNIT_TEST_SUITE(LocalityOperation) {
5706-
Y_UNIT_TEST(LocksFromAnotherTenants) {
5707-
TKikimrWithGrpcAndRootSchema server;
5706+
Y_UNIT_TEST_TWIN(LocksFromAnotherTenants, UseSink) {
5707+
NKikimrConfig::TAppConfig appConfig;
5708+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
5709+
TKikimrWithGrpcAndRootSchema server(appConfig);
57085710
//server.Server_->SetupLogging(
57095711

57105712
auto connection = NYdb::TDriver(

0 commit comments

Comments
 (0)