Skip to content

Commit 0628f08

Browse files
authored
Added a timeout waiting for a response from the tablet (#8260)
1 parent 69d0b8f commit 0628f08

File tree

4 files changed

+108
-5
lines changed

4 files changed

+108
-5
lines changed

ydb/core/statistics/service/service.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace NStat {
2828
static constexpr TDuration DefaultAggregateKeepAlivePeriod = TDuration::MilliSeconds(500);
2929
static constexpr TDuration DefaultAggregateKeepAliveTimeout = TDuration::Seconds(3);
3030
static constexpr TDuration DefaultAggregateKeepAliveAckTimeout = TDuration::Seconds(3);
31+
static constexpr TDuration DefaultStatisticsRequestTimeout = TDuration::Seconds(5);
3132
static constexpr size_t DefaultMaxInFlightTabletRequests = 5;
3233
static constexpr size_t DefaultFanOutFactor = 5;
3334

@@ -37,6 +38,7 @@ TStatServiceSettings::TStatServiceSettings()
3738
: AggregateKeepAlivePeriod(DefaultAggregateKeepAlivePeriod)
3839
, AggregateKeepAliveTimeout(DefaultAggregateKeepAliveTimeout)
3940
, AggregateKeepAliveAckTimeout(DefaultAggregateKeepAliveAckTimeout)
41+
, StatisticsRequestTimeout(DefaultStatisticsRequestTimeout)
4042
, MaxInFlightTabletRequests(DefaultMaxInFlightTabletRequests)
4143
, FanOutFactor(DefaultFanOutFactor)
4244
{}

ydb/core/statistics/service/service.h

+6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ struct TStatServiceSettings {
99
TDuration AggregateKeepAlivePeriod;
1010
TDuration AggregateKeepAliveTimeout;
1111
TDuration AggregateKeepAliveAckTimeout;
12+
TDuration StatisticsRequestTimeout;
1213
size_t MaxInFlightTabletRequests;
1314
size_t FanOutFactor;
1415

@@ -29,6 +30,11 @@ struct TStatServiceSettings {
2930
return *this;
3031
}
3132

33+
TStatServiceSettings& SetStatisticsRequestTimeout(const TDuration& val) {
34+
StatisticsRequestTimeout = val;
35+
return *this;
36+
}
37+
3238
TStatServiceSettings& SetMaxInFlightTabletRequests(size_t val) {
3339
MaxInFlightTabletRequests = val;
3440
return *this;

ydb/core/statistics/service/service_impl.cpp

+39-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
namespace NKikimr {
2626
namespace NStat {
2727

28-
2928
struct TAggregationStatistics {
3029
using TColumnsStatistics = ::google::protobuf::RepeatedPtrField<::NKikimrStat::TColumnStatistics>;
3130

@@ -128,6 +127,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
128127
EvDispatchKeepAlive,
129128
EvKeepAliveTimeout,
130129
EvKeepAliveAckTimeout,
130+
EvStatisticsRequestTimeout,
131131

132132
EvEnd
133133
};
@@ -155,6 +155,13 @@ class TStatService : public TActorBootstrapped<TStatService> {
155155
ui64 Round;
156156
ui32 NodeId;
157157
};
158+
159+
struct TEvStatisticsRequestTimeout: public NActors::TEventLocal<TEvStatisticsRequestTimeout, EvStatisticsRequestTimeout> {
160+
TEvStatisticsRequestTimeout(ui64 round, ui64 tabletId): Round(round), TabletId(tabletId) {}
161+
162+
ui64 Round;
163+
ui64 TabletId;
164+
};
158165
};
159166

160167
void Bootstrap() {
@@ -195,6 +202,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
195202
hFunc(TEvStatistics::TEvAggregateKeepAlive, Handle);
196203
hFunc(TEvPrivate::TEvDispatchKeepAlive, Handle);
197204
hFunc(TEvPrivate::TEvKeepAliveTimeout, Handle);
205+
hFunc(TEvPrivate::TEvStatisticsRequestTimeout, Handle);
198206
hFunc(TEvStatistics::TEvStatisticsResponse, Handle);
199207
hFunc(TEvStatistics::TEvAggregateStatisticsResponse, Handle);
200208

@@ -902,7 +910,31 @@ class TStatService : public TActorBootstrapped<TStatService> {
902910
}
903911
}
904912

905-
void SendStatisticsRequest(const TActorId& clientId) {
913+
void Handle(TEvPrivate::TEvStatisticsRequestTimeout::TPtr& ev) {
914+
const auto round = ev->Get()->Round;
915+
if (IsNotCurrentRound(round)) {
916+
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
917+
"Skip TEvStatisticsRequestTimeout");
918+
return;
919+
}
920+
921+
const auto tabletId = ev->Get()->TabletId;
922+
auto tabletPipe = AggregationStatistics.LocalTablets.TabletsPipes.find(tabletId);
923+
if (tabletPipe == AggregationStatistics.LocalTablets.TabletsPipes.end()) {
924+
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
925+
"Tablet " << tabletId << " has already been processed");
926+
return;
927+
}
928+
929+
LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
930+
"No result was received from the tablet " << tabletId);
931+
932+
auto clientId = tabletPipe->second;
933+
OnTabletError(tabletId);
934+
NTabletPipe::CloseClient(SelfId(), clientId);
935+
}
936+
937+
void SendStatisticsRequest(const TActorId& clientId, ui64 tabletId) {
906938
auto request = std::make_unique<TEvStatistics::TEvStatisticsRequest>();
907939
auto& record = request->Record;
908940
record.MutableTypes()->Add(NKikimrStat::TYPE_COUNT_MIN_SKETCH);
@@ -916,7 +948,9 @@ class TStatService : public TActorBootstrapped<TStatService> {
916948
columnTags->Add(tag);
917949
}
918950

919-
NTabletPipe::SendData(SelfId(), clientId, request.release(), AggregationStatistics.Round);
951+
const auto round = AggregationStatistics.Round;
952+
NTabletPipe::SendData(SelfId(), clientId, request.release(), round);
953+
Schedule(Settings.StatisticsRequestTimeout, new TEvPrivate::TEvStatisticsRequestTimeout(round, tabletId));
920954

921955
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
922956
"TEvStatisticsRequest send"
@@ -928,7 +962,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
928962
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
929963
"Tablet " << tabletId << " is not local.");
930964

931-
constexpr auto error = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET;
965+
const auto error = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET;
932966
AggregationStatistics.FailedTablets.emplace_back(tabletId, 0, error);
933967

934968
AggregationStatistics.LocalTablets.TabletsPipes.erase(tabletId);
@@ -966,7 +1000,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
9661000

9671001
if (tabletPipe != tabletsPipes.end() && clientId == tabletPipe->second) {
9681002
if (ev->Get()->Status == NKikimrProto::OK) {
969-
SendStatisticsRequest(clientId);
1003+
SendStatisticsRequest(clientId, tabletId);
9701004
} else {
9711005
OnTabletError(tabletId);
9721006
}

ydb/core/statistics/service/ut/ut_service.cpp

+61
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,67 @@ Y_UNIT_TEST_SUITE(StatisticsService) {
532532
UNIT_ASSERT_VALUES_EQUAL(expectedError, actualError);
533533
}
534534
}
535+
536+
Y_UNIT_TEST(ShouldBeCcorrectProcessingTabletTimeout) {
537+
size_t nodeCount = 1;
538+
auto runtime = TTestActorRuntime(nodeCount, 1, false);
539+
auto settings = GetDefaultSettings()
540+
.SetStatisticsRequestTimeout(TDuration::MilliSeconds(10));
541+
auto indexToActorMap = InitializeRuntime(runtime, nodeCount, settings);
542+
auto nodeIdToIndexMap = GetNodeIdToIndexMap(indexToActorMap);
543+
std::vector<ui64> localTabletsIds = {1, 2, 3, 4, 5, 6, 7};
544+
std::vector<TAggregateStatisticsRequest::TTablets> nodesTablets = {{.NodeId = indexToActorMap[0].NodeId(), .Ids{localTabletsIds}}};
545+
546+
std::unordered_map<TActorId, ui64> pipeToTablet;
547+
std::vector<TTestActorRuntimeBase::TEventObserverHolder> observers;
548+
observers.emplace_back(runtime.AddObserver<TEvTabletResolver::TEvForward>([&](TEvTabletResolver::TEvForward::TPtr& ev) {
549+
auto tabletId = ev->Get()->TabletID;
550+
auto recipient = indexToActorMap[nodeIdToIndexMap[ev->Sender.NodeId()]];
551+
pipeToTablet[ev->Sender] = tabletId;
552+
553+
runtime.Send(new IEventHandle(recipient, ev->Sender,
554+
new TEvTabletPipe::TEvClientConnected(tabletId, NKikimrProto::OK, ev->Sender, ev->Sender,
555+
true, false, 0), 0, ev->Cookie), nodeIdToIndexMap[ev->Sender.NodeId()], true);
556+
ev.Reset();
557+
}));
558+
observers.emplace_back(runtime.AddObserver([&](TAutoPtr<IEventHandle>& ev) {
559+
switch (ev->GetTypeRewrite()) {
560+
case TEvTabletPipe::EvSend:
561+
auto msg = ev->Get<TEvStatistics::TEvStatisticsRequest>();
562+
if (msg != nullptr) {
563+
auto tabletId = pipeToTablet[ev->Recipient];
564+
if (tabletId % 2 != 0) {
565+
auto senderNodeIndex = nodeIdToIndexMap[ev->Sender.NodeId()];
566+
runtime.Send(new IEventHandle(ev->Sender, ev->Sender,
567+
CreateStatisticsResponse(TStatisticsResponse{
568+
.TabletId = tabletId,
569+
.Status = NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS
570+
}).release(), 0, ev->Cookie), senderNodeIndex, true);
571+
}
572+
}
573+
break;
574+
}
575+
}));
576+
577+
auto sender = runtime.AllocateEdgeActor();
578+
runtime.Send(indexToActorMap[0], sender, CreateStatisticsRequest(TAggregateStatisticsRequest{
579+
.Round = 1,
580+
.PathId{3, 3},
581+
.Nodes{ nodesTablets },
582+
.ColumnTags{1}
583+
}).release());
584+
585+
auto res = runtime.GrabEdgeEvent<TEvStatistics::TEvAggregateStatisticsResponse>(sender);
586+
const auto& record = res->Get()->Record;
587+
size_t expectedFailedTabletsCount = 3;
588+
UNIT_ASSERT_VALUES_EQUAL(expectedFailedTabletsCount, record.GetFailedTablets().size());
589+
590+
ui32 expectedError = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET;
591+
for (const auto& fail : record.GetFailedTablets()) {
592+
ui32 actualError = fail.GetError();
593+
UNIT_ASSERT_VALUES_EQUAL(expectedError, actualError);
594+
}
595+
}
535596
}
536597

537598
} // NSysView

0 commit comments

Comments
 (0)