Skip to content

Commit 7febcbd

Browse files
authored
[Stable-25-1-1] StreamLookup fixes (#15994)
1 parent 836fd4f commit 7febcbd

File tree

3 files changed

+157
-24
lines changed

3 files changed

+157
-24
lines changed

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -723,25 +723,38 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
723723
}
724724
}
725725

726+
bool CheckTotalRetriesExeeded() {
727+
const auto limit = MaxTotalRetries();
728+
return limit && TotalRetries + 1 > *limit;
729+
}
730+
731+
bool CheckShardRetriesExeeded(ui64 id) {
732+
if (!Reads[id] || Reads[id].Finished) {
733+
return false;
734+
}
735+
736+
const auto& state = Reads[id].Shard;
737+
return state->RetryAttempt + 1 > MaxShardRetries();
738+
}
739+
726740
void RetryRead(ui64 id, bool allowInstantRetry = true) {
727741
if (!Reads[id] || Reads[id].Finished) {
728742
return;
729743
}
730744

731745
auto state = Reads[id].Shard;
732746

733-
TotalRetries += 1;
734-
auto limit = MaxTotalRetries();
735-
if (limit && TotalRetries > *limit) {
747+
if (CheckTotalRetriesExeeded()) {
736748
return RuntimeError(TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded",
737749
NDqProto::StatusIds::UNAVAILABLE);
738750
}
751+
++TotalRetries;
739752

740-
state->RetryAttempt += 1;
741-
if (state->RetryAttempt > MaxShardRetries()) {
753+
if (CheckShardRetriesExeeded(id)) {
742754
ResetRead(id);
743755
return ResolveShard(state);
744756
}
757+
++state->RetryAttempt;
745758

746759
auto delay = CalcDelay(state->RetryAttempt, allowInstantRetry);
747760
if (delay == TDuration::Zero()) {
@@ -954,12 +967,16 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
954967
Reads[id].Shard->Issues.push_back(issue);
955968
}
956969

970+
auto replyError = [&](auto message, auto status) {
971+
NYql::TIssues issues;
972+
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
973+
return RuntimeError(message, status, issues);
974+
};
975+
957976
if (UseFollowers && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS && Reads[id].Shard->SuccessBatches > 0) {
958977
// read from follower is interrupted with error after several successful responses.
959978
// in this case read is not safe because we can return inconsistent data.
960-
NYql::TIssues issues;
961-
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
962-
return RuntimeError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE, issues);
979+
return replyError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE);
963980
}
964981

965982
switch (record.GetStatus().GetCode()) {
@@ -968,20 +985,33 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
968985
break;
969986
}
970987
case Ydb::StatusIds::OVERLOADED: {
988+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
989+
return replyError(
990+
TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
991+
NYql::NDqProto::StatusIds::OVERLOADED);
992+
}
971993
return RetryRead(id, false);
972994
}
973995
case Ydb::StatusIds::INTERNAL_ERROR: {
996+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
997+
return replyError(
998+
TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
999+
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
1000+
}
9741001
return RetryRead(id);
9751002
}
9761003
case Ydb::StatusIds::NOT_FOUND: {
1004+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
1005+
return replyError(
1006+
TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
1007+
NYql::NDqProto::StatusIds::UNAVAILABLE);
1008+
}
9771009
auto shard = Reads[id].Shard;
9781010
ResetRead(id);
9791011
return ResolveShard(shard);
9801012
}
9811013
default: {
982-
NYql::TIssues issues;
983-
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
984-
return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
1014+
return replyError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED);
9851015
}
9861016
}
9871017

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
118118
enum class EReadState {
119119
Initial,
120120
Running,
121+
Blocked, // Read can't accept new data, but not finished yet
121122
Finished,
122123
};
123124

124125
std::string_view ReadStateToString(EReadState state) {
125126
switch (state) {
126127
case EReadState::Initial: return "Initial"sv;
127128
case EReadState::Running: return "Running"sv;
129+
case EReadState::Blocked: return "Blocked"sv;
128130
case EReadState::Finished: return "Finished"sv;
129131
}
130132
}
@@ -143,6 +145,10 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
143145
return (State == EReadState::Finished);
144146
}
145147

148+
void SetBlocked() {
149+
State = EReadState::Blocked;
150+
}
151+
146152
const ui64 Id;
147153
const ui64 ShardId;
148154
EReadState State;
@@ -277,6 +283,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
277283
}
278284

279285
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
286+
ResoleShardsInProgress = false;
280287
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
281288
if (ev->Get()->Request->ErrorCount > 0) {
282289
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
@@ -301,15 +308,16 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
301308

302309
auto readIt = Reads.find(record.GetReadId());
303310
if (readIt == Reads.end() || readIt->second.State != EReadState::Running) {
304-
CA_LOG_D("Drop read with readId: " << record.GetReadId() << ", because it's already completed");
311+
CA_LOG_D("Drop read with readId: " << record.GetReadId() << ", because it's already completed or blocked");
305312
return;
306313
}
307314

308315
auto& read = readIt->second;
309316

310317
CA_LOG_D("Recv TEvReadResult (stream lookup) from ShardID=" << read.ShardId
311318
<< ", Table = " << StreamLookupWorker->GetTablePath()
312-
<< ", ReadId=" << record.GetReadId()
319+
<< ", ReadId=" << record.GetReadId() << " (current ReadId=" << ReadId << ")"
320+
<< ", SeqNo=" << record.GetSeqNo()
313321
<< ", Status=" << Ydb::StatusIds::StatusCode_Name(record.GetStatus().GetCode())
314322
<< ", Finished=" << record.GetFinished()
315323
<< ", RowCount=" << record.GetRowCount()
@@ -345,27 +353,55 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
345353
Counters->DataShardIteratorFails->Inc();
346354
}
347355

356+
auto getIssues = [&record]() {
357+
NYql::TIssues issues;
358+
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
359+
return issues;
360+
};
361+
362+
auto replyError = [&](auto message, auto status) {
363+
return RuntimeError(message, status, getIssues());
364+
};
365+
348366
switch (record.GetStatus().GetCode()) {
349367
case Ydb::StatusIds::SUCCESS:
350368
break;
351-
case Ydb::StatusIds::NOT_FOUND: {
369+
case Ydb::StatusIds::NOT_FOUND:
370+
{
352371
StreamLookupWorker->ResetRowsProcessing(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey);
353372
read.SetFinished();
373+
CA_LOG_D("NOT_FOUND was received from tablet: " << read.ShardId << ". "
374+
<< getIssues().ToOneLineString());
354375
return ResolveTableShards();
355376
}
356377
case Ydb::StatusIds::OVERLOADED: {
378+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
379+
return replyError(
380+
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
381+
NYql::NDqProto::StatusIds::OVERLOADED);
382+
}
383+
CA_LOG_D("OVERLOADED was received from tablet: " << read.ShardId << "."
384+
<< getIssues().ToOneLineString());
385+
read.SetBlocked();
357386
return RetryTableRead(read, /*allowInstantRetry = */false);
358387
}
359388
case Ydb::StatusIds::INTERNAL_ERROR: {
389+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
390+
return replyError(
391+
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
392+
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
393+
}
394+
CA_LOG_D("INTERNAL_ERROR was received from tablet: " << read.ShardId << "."
395+
<< getIssues().ToOneLineString());
396+
read.SetBlocked();
360397
return RetryTableRead(read);
361398
}
362399
default: {
363-
NYql::TIssues issues;
364-
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
365-
return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
400+
return replyError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED);
366401
}
367402
}
368403

404+
YQL_ENSURE(read.LastSeqNo < record.GetSeqNo());
369405
read.LastSeqNo = record.GetSeqNo();
370406

371407
if (record.GetFinished()) {
@@ -380,6 +416,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
380416
if (continuationToken.HasLastProcessedKey()) {
381417
TSerializedCellVec lastKey(continuationToken.GetLastProcessedKey());
382418
read.LastProcessedKey = TOwnedCellVec(lastKey.GetCells());
419+
} else {
420+
read.LastProcessedKey.Clear();
383421
}
384422

385423
Counters->SentIteratorAcks->Inc();
@@ -425,6 +463,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
425463
}
426464
}
427465
for (auto* read : toRetry) {
466+
read->SetBlocked();
428467
RetryTableRead(*read);
429468
}
430469
}
@@ -436,6 +475,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
436475
if (!Partitioning) {
437476
LookupActorStateSpan.EndError("timeout exceeded");
438477
CA_LOG_D("Retry attempt to resolve shards for table: " << StreamLookupWorker->GetTablePath());
478+
ResoleShardsInProgress = false;
439479
ResolveTableShards();
440480
}
441481
}
@@ -445,7 +485,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
445485
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
446486
auto& read = readIt->second;
447487

448-
if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
488+
YQL_ENSURE(read.State != EReadState::Blocked || read.LastSeqNo <= ev->Get()->LastSeqNo);
489+
490+
if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) {
449491
if (ev->Get()->InstantStart) {
450492
read.SetFinished();
451493
auto requests = StreamLookupWorker->RebuildRequest(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey, ReadId);
@@ -538,24 +580,33 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
538580
}
539581
}
540582

583+
bool CheckTotalRetriesExeeded() {
584+
const auto limit = MaxTotalRetries();
585+
return limit && TotalRetryAttempts + 1 > *limit;
586+
}
587+
588+
bool CheckShardRetriesExeeded(TReadState& failedRead) {
589+
const auto& shardState = ReadsPerShard[failedRead.ShardId];
590+
return shardState.RetryAttempts + 1 > MaxShardRetries();
591+
}
592+
541593
void RetryTableRead(TReadState& failedRead, bool allowInstantRetry = true) {
542594
CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id
543595
<< ", shardId: " << failedRead.ShardId);
544596

545-
++TotalRetryAttempts;
546-
auto totalRetriesLimit = MaxTotalRetries();
547-
if (totalRetriesLimit && TotalRetryAttempts > *totalRetriesLimit) {
597+
if (CheckTotalRetriesExeeded()) {
548598
return RuntimeError(TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded",
549599
NYql::NDqProto::StatusIds::UNAVAILABLE);
550600
}
601+
++TotalRetryAttempts;
551602

552-
auto& shardState = ReadsPerShard[failedRead.ShardId];
553-
++shardState.RetryAttempts;
554-
if (shardState.RetryAttempts > MaxShardRetries()) {
603+
if (CheckShardRetriesExeeded(failedRead)) {
555604
StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey);
556605
failedRead.SetFinished();
557606
return ResolveTableShards();
558607
}
608+
auto& shardState = ReadsPerShard[failedRead.ShardId];
609+
++shardState.RetryAttempts;
559610

560611
auto delay = CalcDelay(shardState.RetryAttempts, allowInstantRetry);
561612
if (delay == TDuration::Zero()) {
@@ -573,12 +624,17 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
573624
}
574625

575626
void ResolveTableShards() {
627+
if (ResoleShardsInProgress) {
628+
return;
629+
}
630+
576631
if (++TotalResolveShardsAttempts > MaxShardResolves()) {
577632
return RuntimeError(TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' resolve attempts limit exceeded",
578633
NYql::NDqProto::StatusIds::UNAVAILABLE);
579634
}
580635

581636
CA_LOG_D("Resolve shards for table: " << StreamLookupWorker->GetTablePath());
637+
ResoleShardsInProgress = true;
582638

583639
Partitioning.reset();
584640

@@ -658,6 +714,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
658714
ui64 ReadId = 0;
659715
size_t TotalRetryAttempts = 0;
660716
size_t TotalResolveShardsAttempts = 0;
717+
bool ResoleShardsInProgress = false;
661718

662719
// stats
663720
ui64 ReadRowsCount = 0;

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22

33
#include <ydb/core/tx/datashard/datashard_failpoints.h>
4+
#include <ydb/core/tx/datashard/datashard.h>
45
#include <ydb/core/testlib/common_helper.h>
56
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
67
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -2142,6 +2143,51 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
21422143
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0)));
21432144
}
21442145
}
2146+
2147+
Y_UNIT_TEST_TWIN(ReadOverloaded, StreamLookup) {
2148+
NKikimrConfig::TAppConfig appConfig;
2149+
auto setting = NKikimrKqp::TKqpSetting();
2150+
TKikimrSettings settings;
2151+
settings.SetAppConfig(appConfig);
2152+
settings.SetUseRealThreads(false);
2153+
TKikimrRunner kikimr(settings);
2154+
auto db = kikimr.GetTableClient();
2155+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
2156+
auto writeSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
2157+
2158+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
2159+
2160+
kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; });
2161+
2162+
{
2163+
const TString query(StreamLookup
2164+
? Q1_(R"(
2165+
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1
2166+
)")
2167+
: Q1_(R"(
2168+
SELECT COUNT(a.Key) FROM `/Root/SecondaryKeys` as a;
2169+
)"));
2170+
2171+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
2172+
if (ev->GetTypeRewrite() == TEvDataShard::TEvReadResult::EventType) {
2173+
auto* msg = ev->Get<TEvDataShard::TEvReadResult>();
2174+
msg->Record.MutableStatus()->SetCode(::Ydb::StatusIds::OVERLOADED);
2175+
}
2176+
2177+
return TTestActorRuntime::EEventAction::PROCESS;
2178+
};
2179+
2180+
runtime.SetObserverFunc(grab);
2181+
auto future = kikimr.RunInThreadPool([&]{
2182+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
2183+
return session.ExecuteDataQuery(query, txc).ExtractValueSync();
2184+
});
2185+
2186+
auto result = runtime.WaitFuture(future);
2187+
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
2188+
UNIT_ASSERT(result.GetStatus() == NYdb::EStatus::OVERLOADED);
2189+
}
2190+
}
21452191
}
21462192

21472193
} // namespace NKqp

0 commit comments

Comments
 (0)