Skip to content

Commit 1623fb3

Browse files
committed
1 parent 5c53ee8 commit 1623fb3

File tree

3 files changed

+119
-20
lines changed

3 files changed

+119
-20
lines changed

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -723,25 +723,38 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
723723
}
724724
}
725725

726+
bool CheckTotalRetriesExeeded() {
727+
const auto limit = MaxTotalRetries();
728+
return limit && TotalRetries + 1 > *limit;
729+
}
730+
731+
bool CheckShardRetriesExeeded(ui64 id) {
732+
if (!Reads[id] || Reads[id].Finished) {
733+
return false;
734+
}
735+
736+
const auto& state = Reads[id].Shard;
737+
return state->RetryAttempt + 1 > MaxShardRetries();
738+
}
739+
726740
void RetryRead(ui64 id, bool allowInstantRetry = true) {
727741
if (!Reads[id] || Reads[id].Finished) {
728742
return;
729743
}
730744

731745
auto state = Reads[id].Shard;
732746

733-
TotalRetries += 1;
734-
auto limit = MaxTotalRetries();
735-
if (limit && TotalRetries > *limit) {
747+
if (CheckTotalRetriesExeeded()) {
736748
return RuntimeError(TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded",
737749
NDqProto::StatusIds::UNAVAILABLE);
738750
}
751+
++TotalRetries;
739752

740-
state->RetryAttempt += 1;
741-
if (state->RetryAttempt > MaxShardRetries()) {
753+
if (CheckShardRetriesExeeded(id)) {
742754
ResetRead(id);
743755
return ResolveShard(state);
744756
}
757+
++state->RetryAttempt;
745758

746759
auto delay = CalcDelay(state->RetryAttempt, allowInstantRetry);
747760
if (delay == TDuration::Zero()) {
@@ -954,12 +967,16 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
954967
Reads[id].Shard->Issues.push_back(issue);
955968
}
956969

970+
auto replyError = [&](auto message, auto status) {
971+
NYql::TIssues issues;
972+
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
973+
return RuntimeError(message, status, issues);
974+
};
975+
957976
if (UseFollowers && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS && Reads[id].Shard->SuccessBatches > 0) {
958977
// read from follower is interrupted with error after several successful responses.
959978
// in this case read is not safe because we can return inconsistent data.
960-
NYql::TIssues issues;
961-
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
962-
return RuntimeError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE, issues);
979+
return replyError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE);
963980
}
964981

965982
switch (record.GetStatus().GetCode()) {
@@ -968,20 +985,33 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
968985
break;
969986
}
970987
case Ydb::StatusIds::OVERLOADED: {
988+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
989+
return replyError(
990+
TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
991+
NYql::NDqProto::StatusIds::OVERLOADED);
992+
}
971993
return RetryRead(id, false);
972994
}
973995
case Ydb::StatusIds::INTERNAL_ERROR: {
996+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
997+
return replyError(
998+
TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
999+
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
1000+
}
9741001
return RetryRead(id);
9751002
}
9761003
case Ydb::StatusIds::NOT_FOUND: {
1004+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(id)) {
1005+
return replyError(
1006+
TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded.",
1007+
NYql::NDqProto::StatusIds::UNAVAILABLE);
1008+
}
9771009
auto shard = Reads[id].Shard;
9781010
ResetRead(id);
9791011
return ResolveShard(shard);
9801012
}
9811013
default: {
982-
NYql::TIssues issues;
983-
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
984-
return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
1014+
return replyError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED);
9851015
}
9861016
}
9871017

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
345345
Counters->DataShardIteratorFails->Inc();
346346
}
347347

348+
auto replyError = [&](auto message, auto status) {
349+
NYql::TIssues issues;
350+
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
351+
return RuntimeError(message, status, issues);
352+
};
353+
348354
switch (record.GetStatus().GetCode()) {
349355
case Ydb::StatusIds::SUCCESS:
350356
break;
@@ -354,15 +360,23 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
354360
return ResolveTableShards();
355361
}
356362
case Ydb::StatusIds::OVERLOADED: {
363+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
364+
return replyError(
365+
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
366+
NYql::NDqProto::StatusIds::OVERLOADED);
367+
}
357368
return RetryTableRead(read, /*allowInstantRetry = */false);
358369
}
359370
case Ydb::StatusIds::INTERNAL_ERROR: {
371+
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
372+
return replyError(
373+
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
374+
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
375+
}
360376
return RetryTableRead(read);
361377
}
362378
default: {
363-
NYql::TIssues issues;
364-
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
365-
return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
379+
return replyError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED);
366380
}
367381
}
368382

@@ -538,24 +552,33 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
538552
}
539553
}
540554

555+
bool CheckTotalRetriesExeeded() {
556+
const auto limit = MaxTotalRetries();
557+
return limit && TotalRetryAttempts + 1 > *limit;
558+
}
559+
560+
bool CheckShardRetriesExeeded(TReadState& failedRead) {
561+
const auto& shardState = ReadsPerShard[failedRead.ShardId];
562+
return shardState.RetryAttempts + 1 > MaxShardRetries();
563+
}
564+
541565
void RetryTableRead(TReadState& failedRead, bool allowInstantRetry = true) {
542566
CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id
543567
<< ", shardId: " << failedRead.ShardId);
544568

545-
++TotalRetryAttempts;
546-
auto totalRetriesLimit = MaxTotalRetries();
547-
if (totalRetriesLimit && TotalRetryAttempts > *totalRetriesLimit) {
569+
if (CheckTotalRetriesExeeded()) {
548570
return RuntimeError(TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded",
549571
NYql::NDqProto::StatusIds::UNAVAILABLE);
550572
}
573+
++TotalRetryAttempts;
551574

552-
auto& shardState = ReadsPerShard[failedRead.ShardId];
553-
++shardState.RetryAttempts;
554-
if (shardState.RetryAttempts > MaxShardRetries()) {
575+
if (CheckShardRetriesExeeded(failedRead)) {
555576
StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey);
556577
failedRead.SetFinished();
557578
return ResolveTableShards();
558579
}
580+
auto& shardState = ReadsPerShard[failedRead.ShardId];
581+
++shardState.RetryAttempts;
559582

560583
auto delay = CalcDelay(shardState.RetryAttempts, allowInstantRetry);
561584
if (delay == TDuration::Zero()) {

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22

33
#include <ydb/core/tx/datashard/datashard_failpoints.h>
4+
#include <ydb/core/tx/datashard/datashard.h>
45
#include <ydb/core/testlib/common_helper.h>
56
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
67
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -2142,6 +2143,51 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
21422143
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0)));
21432144
}
21442145
}
2146+
2147+
Y_UNIT_TEST_TWIN(ReadOverloaded, StreamLookup) {
2148+
NKikimrConfig::TAppConfig appConfig;
2149+
auto setting = NKikimrKqp::TKqpSetting();
2150+
TKikimrSettings settings;
2151+
settings.SetAppConfig(appConfig);
2152+
settings.SetUseRealThreads(false);
2153+
TKikimrRunner kikimr(settings);
2154+
auto db = kikimr.GetTableClient();
2155+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
2156+
auto writeSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
2157+
2158+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
2159+
2160+
kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; });
2161+
2162+
{
2163+
const TString query(StreamLookup
2164+
? Q1_(R"(
2165+
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1
2166+
)")
2167+
: Q1_(R"(
2168+
SELECT COUNT(a.Key) FROM `/Root/SecondaryKeys` as a;
2169+
)"));
2170+
2171+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
2172+
if (ev->GetTypeRewrite() == TEvDataShard::TEvReadResult::EventType) {
2173+
auto* msg = ev->Get<TEvDataShard::TEvReadResult>();
2174+
msg->Record.MutableStatus()->SetCode(::Ydb::StatusIds::OVERLOADED);
2175+
}
2176+
2177+
return TTestActorRuntime::EEventAction::PROCESS;
2178+
};
2179+
2180+
runtime.SetObserverFunc(grab);
2181+
auto future = kikimr.RunInThreadPool([&]{
2182+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
2183+
return session.ExecuteDataQuery(query, txc).ExtractValueSync();
2184+
});
2185+
2186+
auto result = runtime.WaitFuture(future);
2187+
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
2188+
UNIT_ASSERT(result.GetStatus() == NYdb::EStatus::OVERLOADED);
2189+
}
2190+
}
21452191
}
21462192

21472193
} // namespace NKqp

0 commit comments

Comments
 (0)