Skip to content

Commit 31e0e46

Browse files
authored
mon page has been added for rd read actors (#12074)
1 parent 5c44bca commit 31e0e46

File tree

4 files changed

+87
-1
lines changed

4 files changed

+87
-1
lines changed

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

+12
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ struct TEvRowDispatcher {
2929
EvCoordinatorResult,
3030
EvSessionStatistic,
3131
EvHeartbeat,
32+
EvGetInternalStateRequest,
33+
EvGetInternalStateResponse,
3234
EvEnd,
3335
};
3436

@@ -138,6 +140,16 @@ struct TEvRowDispatcher {
138140
Record.SetPartitionId(partitionId);
139141
}
140142
};
143+
144+
struct TEvGetInternalStateRequest : public NActors::TEventPB<TEvGetInternalStateRequest,
145+
NFq::NRowDispatcherProto::TEvGetInternalStateRequest, EEv::EvGetInternalStateRequest> {
146+
TEvGetInternalStateRequest() = default;
147+
};
148+
149+
struct TEvGetInternalStateResponse : public NActors::TEventPB<TEvGetInternalStateResponse,
150+
NFq::NRowDispatcherProto::TEvGetInternalStateResponse, EEv::EvGetInternalStateResponse> {
151+
TEvGetInternalStateResponse() = default;
152+
};
141153
};
142154

143155
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/protos/events.proto

+7
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,10 @@ message TEvHeartbeat {
8181
uint32 PartitionId = 1;
8282
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
8383
}
84+
85+
message TEvGetInternalStateRequest {
86+
}
87+
88+
message TEvGetInternalStateResponse {
89+
string InternalState = 1;
90+
}

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

+54
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,16 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
316316
TMap<TActorId, SessionInfo> Sessions; // key - TopicSession actor id
317317
};
318318

319+
struct ReadActorInfo {
320+
TString InternalState;
321+
TInstant RequestTime;
322+
TInstant ResponseTime;
323+
};
324+
319325
THashMap<ConsumerSessionKey, TAtomicSharedPtr<ConsumerInfo>, ConsumerSessionKeyHash> Consumers;
320326
TMap<ui64, TAtomicSharedPtr<ConsumerInfo>> ConsumersByEventQueueId;
321327
THashMap<TopicSessionKey, TopicSessionInfo, TopicSessionKeyHash> TopicSessions;
328+
TMap<TActorId, ReadActorInfo> ReadActorsInternalState;
322329

323330
public:
324331
explicit TRowDispatcher(
@@ -352,6 +359,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
352359
void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev);
353360
void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev);
354361
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);
362+
void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev);
355363

356364
void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
357365
void Handle(const TEvPrivate::TEvTryConnect::TPtr&);
@@ -363,6 +371,8 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
363371
void DeleteConsumer(const ConsumerSessionKey& key);
364372
void UpdateMetrics();
365373
TString GetInternalState();
374+
TString GetReadActorsInternalState();
375+
void UpdateReadActorsInternalState();
366376
template <class TEventPtr>
367377
bool CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev);
368378
void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax);
@@ -384,6 +394,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
384394
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
385395
hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle);
386396
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
397+
hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateResponse, Handle);
387398
hFunc(TEvPrivate::TEvTryConnect, Handle);
388399
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
389400
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
@@ -656,6 +667,39 @@ TString TRowDispatcher::GetInternalState() {
656667
return str.Str();
657668
}
658669

670+
TString TRowDispatcher::GetReadActorsInternalState() {
671+
TStringStream str;
672+
for (const auto& [_, internalState]: ReadActorsInternalState) {
673+
str << "ResponseTime: " << internalState.ResponseTime << " " << internalState.InternalState << Endl;
674+
}
675+
return str.Str();
676+
}
677+
678+
void TRowDispatcher::UpdateReadActorsInternalState() {
679+
TSet<TActorId> ReadActors;
680+
for (const auto& [key, _]: Consumers) {
681+
ReadActors.insert(key.ReadActorId);
682+
}
683+
684+
for(auto it = ReadActorsInternalState.begin(); it != ReadActorsInternalState.end();) {
685+
if (!ReadActors.contains(it->first)) {
686+
it = ReadActorsInternalState.erase(it);
687+
} else {
688+
++it;
689+
}
690+
}
691+
692+
auto now = TInstant::Now();
693+
for (const auto& readActor: ReadActors) {
694+
auto& internalStateInfo = ReadActorsInternalState[readActor];
695+
if (now - internalStateInfo.RequestTime < TDuration::Seconds(30)) {
696+
continue;
697+
}
698+
internalStateInfo.RequestTime = now;
699+
Send(readActor, new NFq::TEvRowDispatcher::TEvGetInternalStateRequest{}, 0, 0);
700+
}
701+
}
702+
659703
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
660704
LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
661705
" part id " << ev->Get()->Record.GetPartitionId() << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie);
@@ -916,11 +960,15 @@ void TRowDispatcher::PrintStateToLog() {
916960
}
917961

918962
void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) {
963+
UpdateReadActorsInternalState();
919964
TStringStream str;
920965
HTML(str) {
921966
PRE() {
967+
str << "Current Time: " << TInstant::Now() << Endl;
922968
str << "Current state:" << Endl;
923969
str << GetInternalState() << Endl;
970+
str << "Read actors state: " << Endl;
971+
str << GetReadActorsInternalState() << Endl;
924972
str << Endl;
925973
}
926974
}
@@ -954,6 +1002,12 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
9541002
}
9551003
}
9561004

1005+
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev) {
1006+
auto& readActorInternalState = ReadActorsInternalState[ev->Sender];
1007+
readActorInternalState.InternalState = ev->Get()->Record.GetInternalState();
1008+
readActorInternalState.ResponseTime = TInstant::Now();
1009+
}
1010+
9571011
} // namespace
9581012

9591013
////////////////////////////////////////////////////////////////////////////////

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

+14-1
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
226226
void Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev);
227227
void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev);
228228
void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev);
229+
void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev);
229230

230231
void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
231232
void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev);
@@ -245,6 +246,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
245246
hFunc(NFq::TEvRowDispatcher::TEvStartSessionAck, Handle);
246247
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
247248
hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle);
249+
hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle);
248250

249251
hFunc(NActors::TEvents::TEvPong, Handle);
250252
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);
@@ -269,6 +271,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
269271
void StopSessions();
270272
void ReInit(const TString& reason);
271273
void PrintInternalState();
274+
TString GetInternalState();
272275
void TrySendGetNextBatch(SessionInfo& sessionInfo);
273276
template <class TEventPtr>
274277
bool CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId);
@@ -532,6 +535,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
532535
}
533536
}
534537

538+
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev) {
539+
auto response = std::make_unique<NFq::TEvRowDispatcher::TEvGetInternalStateResponse>();
540+
response->Record.SetInternalState(GetInternalState());
541+
Send(ev->Sender, response.release(), 0, ev->Cookie);
542+
}
543+
535544
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {
536545
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
537546
SRC_LOG_T("TEvNewDataArrived from " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
@@ -772,6 +781,10 @@ void TDqPqRdReadActor::Handle(TEvPrivate::TEvPrintState::TPtr&) {
772781
}
773782

774783
void TDqPqRdReadActor::PrintInternalState() {
784+
SRC_LOG_I(GetInternalState());
785+
}
786+
787+
TString TDqPqRdReadActor::GetInternalState() {
775788
TStringStream str;
776789
str << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n";
777790
str << "Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult
@@ -789,7 +802,7 @@ void TDqPqRdReadActor::PrintInternalState() {
789802
<< " has pending data " << sessionInfo.HasPendingData << " connection id " << sessionInfo.Generation << " ";
790803
sessionInfo.EventsQueue.PrintInternalState(str);
791804
}
792-
SRC_LOG_I(str.Str());
805+
return str.Str();
793806
}
794807

795808
void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) {

0 commit comments

Comments
 (0)