Skip to content

Commit ad84190

Browse files
nikvas0blinkov
authored andcommitted
Fix index errors (sink) (#15409)
1 parent b463698 commit ad84190

File tree

10 files changed

+109
-27
lines changed

10 files changed

+109
-27
lines changed

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ void TKqpComputeActor::DoBootstrap() {
8888
try {
8989
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, ArrayBufferMinFillPercentage, std::move(wakeupCallback), std::move(errorCallback)));
9090
} catch (const NMiniKQL::TKqpEnsureFail& e) {
91-
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
91+
ErrorFromIssue((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
9292
return;
9393
}
9494

@@ -124,7 +124,7 @@ void TKqpComputeActor::DoBootstrap() {
124124
auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ScanData->TablePath, ranges, columns, UserToken, Database, reverse);
125125

126126
if (!scanActor) {
127-
InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
127+
ErrorFromIssue(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
128128
<< "Failed to create system view scan, table id: " << ScanData->TableId);
129129
return;
130130
}
@@ -152,9 +152,9 @@ STFUNC(TKqpComputeActor::StateFunc) {
152152
} catch (const TMemoryLimitExceededException& e) {
153153
TBase::OnMemoryLimitExceptionHandler();
154154
} catch (const NMiniKQL::TKqpEnsureFail& e) {
155-
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
155+
ErrorFromIssue((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
156156
} catch (const std::exception& e) {
157-
InternalError(TIssuesIds::DEFAULT_ERROR, e.what());
157+
ErrorFromIssue(TIssuesIds::DEFAULT_ERROR, e.what());
158158
}
159159

160160
ReportEventElapsedTime();

ydb/core/kqp/opt/kqp_opt_kql.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,19 +214,20 @@ TCoAtomList BuildUpsertInputColumns(const TCoAtomList& inputColumns,
214214
}
215215

216216
std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, const TKikimrTableDescription& table,
217-
const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, const bool isSink,
217+
const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, const bool /*isSink*/,
218218
TPositionHandle pos, TExprContext& ctx)
219219
{
220220
auto input = write.Input();
221-
const bool isWriteReplace = (GetTableOp(write) == TYdbOperation::Replace) && !isSink;
222221

223222
TCoAtomList inputCols = BuildUpsertInputColumns(inputColumns, autoIncrement, pos, ctx);
224223

225224
if (autoIncrement.Ref().ChildrenSize() > 0) {
226225
input = BuildKqlSequencer(input, table, inputCols, autoIncrement, pos, ctx);
227226
}
228227

228+
const bool isWriteReplace = (GetTableOp(write) == TYdbOperation::Replace);
229229
if (isWriteReplace) {
230+
// TODO: don't need it for sinks (can be disabled when secondary indexes are supported inside write actor)
230231
std::tie(input, inputCols) = CreateRowsToReplace(input, inputCols, table, write.Pos(), ctx);
231232
}
232233

ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1767,8 +1767,9 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
17671767
}
17681768
}
17691769

1770-
Y_UNIT_TEST(DataColumnWrite) {
1770+
Y_UNIT_TEST_TWIN(DataColumnWrite, UseSink) {
17711771
NKikimrConfig::TAppConfig appConfig;
1772+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
17721773

17731774
auto setting = NKikimrKqp::TKqpSetting();
17741775
auto serverSettings = TKikimrSettings()

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2873,10 +2873,13 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
28732873
}
28742874
}
28752875

2876-
Y_UNIT_TEST(SecondaryIndexReplace) {
2876+
Y_UNIT_TEST_TWIN(SecondaryIndexReplace, UseSink) {
2877+
NKikimrConfig::TAppConfig app;
2878+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
28772879
auto setting = NKikimrKqp::TKqpSetting();
28782880
auto serverSettings = TKikimrSettings()
2879-
.SetKqpSettings({setting});
2881+
.SetKqpSettings({setting})
2882+
.SetAppConfig(app);
28802883
TKikimrRunner kikimr(serverSettings);
28812884
auto db = kikimr.GetTableClient();
28822885
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3370,10 +3373,13 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
33703373
}
33713374

33723375

3373-
Y_UNIT_TEST(MultipleSecondaryIndexWithSameComulns) {
3376+
Y_UNIT_TEST_TWIN(MultipleSecondaryIndexWithSameComulns, UseSink) {
3377+
NKikimrConfig::TAppConfig app;
3378+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
33743379
auto setting = NKikimrKqp::TKqpSetting();
33753380
auto serverSettings = TKikimrSettings()
3376-
.SetKqpSettings({setting});
3381+
.SetKqpSettings({setting})
3382+
.SetAppConfig(app);
33773383
TKikimrRunner kikimr(serverSettings);
33783384
auto db = kikimr.GetTableClient();
33793385
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3706,10 +3712,13 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
37063712
}
37073713
}
37083714

3709-
Y_UNIT_TEST(SecondaryIndexWithPrimaryKeySameComulns) {
3715+
Y_UNIT_TEST_TWIN(SecondaryIndexWithPrimaryKeySameComulns, UseSink) {
3716+
NKikimrConfig::TAppConfig app;
3717+
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
37103718
auto setting = NKikimrKqp::TKqpSetting();
37113719
auto serverSettings = TKikimrSettings()
3712-
.SetKqpSettings({setting});
3720+
.SetKqpSettings({setting})
3721+
.SetAppConfig(app);
37133722
TKikimrRunner kikimr(serverSettings);
37143723
auto db = kikimr.GetTableClient();
37153724
auto session = db.CreateSession().GetValueSync().GetSession();

ydb/core/kqp/ut/olap/json_ut.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -768,27 +768,27 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
768768
------
769769
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "a1" ORDER BY Col1;
770770
EXPECTED: [[1u;["{\"a.b.c\":\"a1\"}"]]]
771-
IDX_ND_SKIP_APPROVE: 0, 3, 1
771+
IDX_ND_SKIP_APPROVE: 0, 4, 1
772772
------
773773
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "1a1" ORDER BY Col1;
774774
EXPECTED: [[11u;["{\"a.b.c\":\"1a1\"}"]]]
775-
IDX_ND_SKIP_APPROVE: 0, 3, 1
775+
IDX_ND_SKIP_APPROVE: 0, 4, 1
776776
------
777777
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b4" ORDER BY Col1;
778778
EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]]
779-
IDX_ND_SKIP_APPROVE: 0, 3, 1
779+
IDX_ND_SKIP_APPROVE: 0, 4, 1
780780
------
781781
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b5" ORDER BY Col1;
782782
EXPECTED: []
783-
IDX_ND_SKIP_APPROVE: 0, 4, 0
783+
IDX_ND_SKIP_APPROVE: 0, 5, 0
784784
------
785785
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b3" ORDER BY Col1;
786786
EXPECTED: [[13u;["{\"b.c.d\":\"1b3\"}"]]]
787-
IDX_ND_SKIP_APPROVE: 0, 3, 1
787+
IDX_ND_SKIP_APPROVE: 0, 4, 1
788788
------
789789
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b5" ORDER BY Col1;
790790
EXPECTED: []
791-
IDX_ND_SKIP_APPROVE: 0, 4, 0
791+
IDX_ND_SKIP_APPROVE: 0, 5, 0
792792
793793
)";
794794
TScriptVariator(script).Execute();

ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
162162
Y_UNIT_TEST_TWIN(InsertNotNullPkPg, useSink) {
163163
NKikimrConfig::TAppConfig appConfig;
164164
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
165-
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
165+
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig));
166166
auto client = kikimr.GetTableClient();
167167
auto session = client.CreateSession().GetValueSync().GetSession();
168168
{
@@ -195,8 +195,12 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
195195
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
196196
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
197197
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
198-
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
199-
" <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
198+
if (useSink) {
199+
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
200+
} else {
201+
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
202+
" <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
203+
}
200204
}
201205

202206
{ /* set NULL to not null pk column */
@@ -208,8 +212,12 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
208212
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
209213
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
210214
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
211-
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
212-
" <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
215+
if (useSink) {
216+
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
217+
} else {
218+
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
219+
" <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
220+
}
213221
}
214222

215223
{ /* set NULL to nullable column */

ydb/core/kqp/ut/tx/kqp_tx_ut.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ Y_UNIT_TEST_SUITE(KqpTx) {
8282
}
8383

8484
Y_UNIT_TEST(LocksAbortOnCommit) {
85-
auto kikimr = DefaultKikimrRunner();
85+
NKikimrConfig::TAppConfig app;
86+
// See KqpSinkTx::LocksAbortOnCommit for sink version of this test
87+
app.MutableTableServiceConfig()->SetEnableOltpSink(false);
88+
auto kikimr = DefaultKikimrRunner({}, app);
8689
auto db = kikimr.GetTableClient();
8790
auto session = db.CreateSession().GetValueSync().GetSession();
8891
{

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -653,8 +653,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
653653
Terminate(State == NDqProto::COMPUTE_STATE_FINISHED, NDqProto::EComputeState_Name(State));
654654
}
655655

656-
void InternalError(TIssuesIds::EIssueCode issueCode, const TString& message) {
657-
InternalError(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, issueCode, message);
656+
void ErrorFromIssue(TIssuesIds::EIssueCode issueCode, const TString& message) {
657+
TIssue issue(message);
658+
SetIssueCode(issueCode, issue);
659+
const auto statusCode = GetDqStatus(issue).GetOrElse(NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
660+
InternalError(statusCode, std::move(issue));
658661
}
659662

660663
void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssuesIds::EIssueCode issueCode, const TString& message) {

ydb/library/yql/dq/actors/dq.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "dq.h"
22

3+
#include <yql/essentials/core/issue/yql_issue.h>
4+
35
namespace NYql::NDq {
46

57
Ydb::StatusIds::StatusCode DqStatusToYdbStatus(NYql::NDqProto::StatusIds::StatusCode statusCode) {
@@ -84,4 +86,58 @@ NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::Status
8486
}
8587
}
8688

89+
TMaybe<NYql::NDqProto::StatusIds::StatusCode> GetDqStatus(const TIssue& issue) {
90+
if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
91+
return NYql::NDqProto::StatusIds::INTERNAL_ERROR;
92+
}
93+
94+
switch (issue.GetCode()) {
95+
case NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED:
96+
case NYql::TIssuesIds::KIKIMR_LOCKS_ACQUIRE_FAILURE:
97+
case NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED:
98+
case NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH:
99+
return NYql::NDqProto::StatusIds::ABORTED;
100+
101+
case NYql::TIssuesIds::KIKIMR_SCHEME_ERROR:
102+
return NYql::NDqProto::StatusIds::SCHEME_ERROR;
103+
104+
case NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE:
105+
return NYql::NDqProto::StatusIds::UNAVAILABLE;
106+
107+
case NYql::TIssuesIds::KIKIMR_OVERLOADED:
108+
case NYql::TIssuesIds::KIKIMR_MULTIPLE_SCHEME_MODIFICATIONS:
109+
return NYql::NDqProto::StatusIds::OVERLOADED;
110+
111+
case NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION:
112+
case NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED:
113+
return NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
114+
115+
case NYql::TIssuesIds::KIKIMR_BAD_REQUEST:
116+
case NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE:
117+
case NYql::TIssuesIds::KIKIMR_NO_COLUMN_DEFAULT_VALUE:
118+
return NYql::NDqProto::StatusIds::BAD_REQUEST;
119+
120+
case NYql::TIssuesIds::KIKIMR_ACCESS_DENIED:
121+
return NYql::NDqProto::StatusIds::UNAUTHORIZED;
122+
123+
case NYql::TIssuesIds::KIKIMR_TIMEOUT:
124+
return NYql::NDqProto::StatusIds::TIMEOUT;
125+
126+
case NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED:
127+
return NYql::NDqProto::StatusIds::CANCELLED;
128+
129+
case NYql::TIssuesIds::KIKIMR_RESULT_UNAVAILABLE:
130+
case NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN:
131+
return NYql::NDqProto::StatusIds::UNDETERMINED;
132+
133+
case NYql::TIssuesIds::KIKIMR_UNSUPPORTED:
134+
return NYql::NDqProto::StatusIds::UNSUPPORTED;
135+
136+
default:
137+
break;
138+
}
139+
140+
return Nothing();
141+
}
142+
87143
} // namespace NYql::NDq

ydb/library/yql/dq/actors/dq.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ enum class EStatusCompatibilityLevel {
1818

1919
Ydb::StatusIds::StatusCode DqStatusToYdbStatus(NYql::NDqProto::StatusIds::StatusCode statusCode);
2020
NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::StatusCode statusCode, EStatusCompatibilityLevel compatibility = EStatusCompatibilityLevel::Basic);
21+
TMaybe<NYql::NDqProto::StatusIds::StatusCode> GetDqStatus(const TIssue& issue);
2122

2223
struct TEvDq {
2324

0 commit comments

Comments
 (0)