Skip to content

Commit 5fa284c

Browse files
nikvas0dorooleg
authored andcommitted
Fix tests for default EvWrite (ydb-platform#15319)
1 parent bdab698 commit 5fa284c

File tree

10 files changed

+173
-104
lines changed

10 files changed

+173
-104
lines changed

.github/config/muted_ya.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ ydb/core/kqp/ut/olap [*/*] chunk chunk
4747
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
4848
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
4949
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
50+
ydb/core/kqp/ut/query KqpLimits.OutOfSpaceYQLUpsertFail+useSink
51+
ydb/core/kqp/ut/query KqpLimits.QSReplySizeEnsureMemoryLimits+useSink
5052
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
5153
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
5254
ydb/core/kqp/ut/scheme [*/*] chunk chunk

ydb/core/kqp/query_data/kqp_query_data.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,12 +276,16 @@ void TQueryData::ValidateParameter(const TString& name, const NKikimrMiniKQL::TT
276276
void TQueryData::PrepareParameters(const TKqpPhyTxHolder::TConstPtr& tx, const TPreparedQueryHolder::TConstPtr& preparedQuery,
277277
NMiniKQL::TTypeEnvironment& txTypeEnv)
278278
{
279-
for (const auto& paramDesc : preparedQuery->GetParameters()) {
280-
ValidateParameter(paramDesc.GetName(), paramDesc.GetType(), txTypeEnv);
279+
if (preparedQuery) {
280+
for (const auto& paramDesc : preparedQuery->GetParameters()) {
281+
ValidateParameter(paramDesc.GetName(), paramDesc.GetType(), txTypeEnv);
282+
}
281283
}
282284

283-
for(const auto& paramBinding: tx->GetParamBindings()) {
284-
MaterializeParamValue(true, paramBinding);
285+
if (tx) {
286+
for(const auto& paramBinding: tx->GetParamBindings()) {
287+
MaterializeParamValue(true, paramBinding);
288+
}
285289
}
286290
}
287291

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
589589
}
590590
case NKikimrDataEvents::TEvWriteResult::STATUS_WRONG_SHARD_STATE:
591591
CA_LOG_E("Got WRONG SHARD STATE for table `"
592-
<< SchemeEntry->TableId.PathId.ToString() << "`."
592+
<< TableId.PathId.ToString() << "`."
593593
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
594594
<< " Sink=" << this->SelfId() << "."
595595
<< getIssues().ToOneLineString());
@@ -600,7 +600,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
600600
RetryResolve();
601601
} else {
602602
RuntimeError(
603-
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
603+
NYql::NDqProto::StatusIds::UNAVAILABLE,
604604
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
605605
TStringBuilder() << "Wrong shard state for table `"
606606
<< TablePath << "`.",
@@ -997,13 +997,13 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
997997
Schedule(reattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(ev->Get()->TabletId));
998998
} else {
999999
TxManager->SetError(ev->Get()->TabletId);
1000-
if (Mode == EMode::IMMEDIATE_COMMIT || Mode == EMode::COMMIT) {
1000+
if (TxManager->GetState(ev->Get()->TabletId) == IKqpTransactionManager::EXECUTING) {
10011001
RuntimeError(
10021002
NYql::NDqProto::StatusIds::UNDETERMINED,
10031003
NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
10041004
TStringBuilder()
10051005
<< "Error writing to table `" << TableId.PathId.ToString() << "`"
1006-
<< ". Transaction state unknown for shard " << ev->Get()->TabletId << ".");
1006+
<< ". Transaction state unknown for tablet " << ev->Get()->TabletId << ".");
10071007
} else {
10081008
RuntimeError(
10091009
NYql::NDqProto::StatusIds::UNAVAILABLE,
@@ -1787,12 +1787,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
17871787
|| State == EState::COMMITTING
17881788
|| State == EState::ROLLINGBACK;
17891789

1790-
if (EnableStreamWrite && outOfMemory) {
1790+
if (!EnableStreamWrite && outOfMemory) {
17911791
ReplyErrorAndDie(
17921792
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
17931793
NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
17941794
TStringBuilder() << "Stream write queries aren't allowed.",
17951795
{});
1796+
return;
17961797
}
17971798

17981799
if (needToFlush) {
@@ -2363,7 +2364,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
23632364
<< getIssues().ToOneLineString());
23642365
TxManager->SetError(ev->Get()->Record.GetOrigin());
23652366
ReplyErrorAndDie(
2366-
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
2367+
NYql::NDqProto::StatusIds::UNAVAILABLE,
23672368
NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
23682369
TStringBuilder() << "Wrong shard state for tables " << getPathes() << ".",
23692370
getIssues());

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12591259
return true;
12601260
}
12611261

1262+
YQL_ENSURE(tx || commit);
12621263
if (tx) {
12631264
switch (tx->GetType()) {
12641265
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
@@ -1285,15 +1286,16 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12851286

12861287
QueryState->QueryData->AddUVParam(paramDesc.GetName(), paramType, value);
12871288
}
1289+
}
12881290

1289-
try {
1290-
QueryState->QueryData->PrepareParameters(tx, QueryState->PreparedQuery, txCtx.TxAlloc->TypeEnv);
1291-
} catch (const yexception& ex) {
1292-
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
1293-
}
1291+
try {
1292+
QueryState->QueryData->PrepareParameters(tx, QueryState->PreparedQuery, txCtx.TxAlloc->TypeEnv);
1293+
} catch (const yexception& ex) {
1294+
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
1295+
}
1296+
1297+
if (tx) {
12941298
request.Transactions.emplace_back(tx, QueryState->QueryData);
1295-
} else {
1296-
YQL_ENSURE(commit);
12971299
}
12981300

12991301
QueryState->TxCtx->OnNewExecutor(literal);
@@ -1468,7 +1470,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
14681470
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(),
14691471
QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
14701472
QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup,
1471-
(!Settings.TableService.GetEnableOltpSink() && QueryState && QueryState->RequestEv->GetSyntax() == Ydb::Query::Syntax::SYNTAX_PG)
1473+
(QueryState && QueryState->RequestEv->GetSyntax() == Ydb::Query::Syntax::SYNTAX_PG)
14721474
? GUCSettings : nullptr,
14731475
txCtx->ShardIdToTableInfo, txCtx->TxManager, txCtx->BufferActorId);
14741476

ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp

Lines changed: 32 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ namespace {
1919
}
2020

2121
Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
22-
Y_UNIT_TEST_TWIN(Upsert, LogEnabled) {
22+
Y_UNIT_TEST_QUAD(Upsert, LogEnabled, UseSink) {
2323
TStringStream ss;
2424
{
25+
NKikimrConfig::TAppConfig appConfig;
26+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
2527
TKikimrSettings serverSettings;
28+
serverSettings.SetAppConfig(appConfig);
2629
serverSettings.LogStream = &ss;
2730
TKikimrRunner kikimr(serverSettings);
2831

@@ -44,8 +47,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
4447
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
4548
}
4649

47-
// check executer logs
48-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0);
50+
if (UseSink) {
51+
// check write actor logs
52+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), LogEnabled ? 1 : 0);
53+
} else {
54+
// check executer logs
55+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0);
56+
}
4957
// check session actor logs
5058
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0);
5159
// check grpc logs
@@ -54,49 +62,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
5462
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), LogEnabled ? 2 : 0);
5563
}
5664

57-
Y_UNIT_TEST(UpsertEvWrite) {
65+
Y_UNIT_TEST_QUAD(UpsertEvWriteQueryService, isOlap, useOltpSink) {
5866
TStringStream ss;
5967
{
6068
NKikimrConfig::TAppConfig AppConfig;
61-
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
62-
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
63-
serverSettings.LogStream = &ss;
64-
TKikimrRunner kikimr(serverSettings);
65-
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
66-
67-
auto db = kikimr.GetTableClient();
68-
auto session = db.CreateSession().GetValueSync().GetSession();
69-
70-
auto result = session.ExecuteDataQuery(R"(
71-
--!syntax_v1
72-
73-
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
74-
(3u, "Value3"),
75-
(101u, "Value101"),
76-
(201u, "Value201");
77-
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
78-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
79-
}
69+
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(useOltpSink);
70+
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(isOlap);
8071

81-
// check write actor logs
82-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
83-
// check session actor logs
84-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
85-
// check grpc logs
86-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
87-
// check datashard logs
88-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
89-
}
90-
91-
Y_UNIT_TEST_TWIN(UpsertEvWriteQueryService, isOlap) {
92-
TStringStream ss;
93-
{
94-
NKikimrConfig::TAppConfig AppConfig;
95-
if (!isOlap) {
96-
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
97-
} else {
98-
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
99-
}
10072
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
10173
serverSettings.LogStream = &ss;
10274
TKikimrRunner kikimr(serverSettings);
@@ -132,8 +104,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
132104
}
133105

134106
if (!isOlap) {
135-
// check write actor logs
136-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
107+
if (useOltpSink) {
108+
// check write actor logs
109+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
110+
} else {
111+
// check executer logs
112+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 2);
113+
}
137114
// check session actor logs
138115
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
139116
// check grpc logs
@@ -143,8 +120,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
143120
} else {
144121
// check write actor logs
145122
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 3);
146-
// check executer logs
147-
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 11);
123+
if (useOltpSink) {
124+
// check executer logs
125+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
126+
} else {
127+
// check executer logs
128+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 11);
129+
}
148130
// check session actor logs
149131
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
150132
// check grpc logs
@@ -215,10 +197,13 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
215197
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
216198
}
217199

218-
Y_UNIT_TEST(BrokenReadLock) {
200+
Y_UNIT_TEST_TWIN(BrokenReadLock, UseSink) {
219201
TStringStream ss;
220202
{
203+
NKikimrConfig::TAppConfig AppConfig;
204+
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
221205
TKikimrSettings serverSettings;
206+
serverSettings.SetAppConfig(AppConfig);
222207
serverSettings.LogStream = &ss;
223208
TKikimrRunner kikimr(serverSettings);
224209
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);

ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,9 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
159159
}
160160
}
161161

162-
Y_UNIT_TEST(InsertNotNullPkPg) {
162+
Y_UNIT_TEST_TWIN(InsertNotNullPkPg, useSink) {
163+
NKikimrConfig::TAppConfig appConfig;
164+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
163165
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
164166
auto client = kikimr.GetTableClient();
165167
auto session = client.CreateSession().GetValueSync().GetSession();
@@ -609,10 +611,13 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
609611
}
610612
}
611613

612-
Y_UNIT_TEST(InsertNotNullPg) {
614+
Y_UNIT_TEST_TWIN(InsertNotNullPg, useSink) {
615+
NKikimrConfig::TAppConfig appConfig;
616+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
613617
auto settings = TKikimrSettings()
614618
.SetWithSampleTables(false)
615-
.SetEnableNotNullDataColumns(true);
619+
.SetEnableNotNullDataColumns(true)
620+
.SetAppConfig(appConfig);
616621

617622
TKikimrRunner kikimr(settings);
618623
auto client = kikimr.GetTableClient();
@@ -648,8 +653,13 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
648653
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
649654
UNIT_ASSERT(!result.IsSuccess());
650655
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
651-
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
652-
" <main>: Error: Tried to insert NULL value into NOT NULL column: Value, code: 2031\n");
656+
if (useSink) {
657+
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(),
658+
"<main>: Error: Tried to insert NULL value into NOT NULL column: Value, code: 2031\n");
659+
} else {
660+
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
661+
" <main>: Error: Tried to insert NULL value into NOT NULL column: Value, code: 2031\n");
662+
}
653663
}
654664
}
655665

ydb/core/kqp/ut/pg/pg_catalog_ut.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,10 @@ Y_UNIT_TEST_SUITE(PgCatalog) {
352352
}
353353
}
354354

355-
Y_UNIT_TEST(PgDatabase) {
356-
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
355+
Y_UNIT_TEST_TWIN(PgDatabase, useSink) {
356+
NKikimrConfig::TAppConfig appConfig;
357+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
358+
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig));
357359
auto db = kikimr.GetQueryClient();
358360
auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
359361
{

0 commit comments

Comments
 (0)