Skip to content

Commit 1ca7ca1

Browse files
committed
RequestUnits for read by Kafka protocol
1 parent 8a1c47b commit 1ca7ca1

11 files changed

+90
-14
lines changed

ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
3535
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
3636
PrepareFetchRequestData(topicIndex, partPQRequests);
3737

38-
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, *Context->UserToken);
38+
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken);
3939

4040
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
4141
auto actorId = ctx.Register(fetchActor);

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,6 @@ void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthRes
466466
TopicsInfo[internalName] = NGRpcProxy::TTopicHolder::FromTopicInfo(t);
467467
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
468468
FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter;
469-
// savnik: metering mode
470469
}
471470

472471
Send(Context->ConnectionId, new TEvKafka::TEvReadSessionInfo(GroupId));

ydb/core/persqueue/fetch_request_actor.cpp

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
#include <ydb/core/base/tablet_pipe.h>
88
#include <ydb/core/client/server/msgbus_server_pq_metacache.h>
9+
#include <ydb/core/persqueue/pq_rl_helpers.h>
910
#include <ydb/core/persqueue/user_info.h>
11+
#include <ydb/core/persqueue/write_meta.h>
1012

1113
#include <ydb/public/lib/base/msgbus_status.h>
1214

@@ -21,7 +23,7 @@ using namespace NSchemeCache;
2123

2224

2325
namespace {
24-
const ui32 DefaultTimeout = 30000;
26+
static constexpr TDuration DefaultTimeout = TDuration::MilliSeconds(30000);
2527
}
2628

2729
struct TTabletInfo { // ToDo !! remove
@@ -52,7 +54,22 @@ struct TTopicInfo {
5254

5355
using namespace NActors;
5456

55-
class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
57+
class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor>
58+
, private TRlHelpers {
59+
60+
struct TEvPrivate {
61+
enum EEv {
62+
EvTimeout = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
63+
EvEnd
64+
};
65+
66+
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
67+
68+
struct TEvTimeout : NActors::TEventLocal<TEvTimeout, EvTimeout> {
69+
};
70+
71+
};
72+
5673
private:
5774
TFetchRequestSettings Settings;
5875

@@ -74,14 +91,16 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
7491
ui32 PartTabletsRequested;
7592
TString ErrorReason;
7693
TActorId RequesterId;
94+
ui64 PendingQuotaAmount;
7795

7896
public:
7997
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
8098
return NKikimrServices::TActivity::PQ_FETCH_REQUEST;
8199
}
82100

83101
TPQFetchRequestActor(const TFetchRequestSettings& settings, const TActorId& schemeCacheId, const TActorId& requesterId)
84-
: Settings(settings)
102+
: TRlHelpers({}, settings.RlCtx, 8_KB, false, TDuration::Seconds(1)) //savnik: check duration
103+
, Settings(settings)
85104
, CanProcessFetchRequest(false)
86105
, FetchRequestReadsDone(0)
87106
, FetchRequestCurrentReadTablet(0)
@@ -115,7 +134,25 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
115134
TopicInfo[path].FetchInfo[p.Partition] = fetchInfo;
116135
}
117136
}
118-
// FIXME(savnik) handle request timeout
137+
138+
void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
139+
const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag);
140+
OnWakeup(tag);
141+
switch (tag) {
142+
case EWakeupTag::RlAllowed:
143+
ProceedFetchRequest(ctx);
144+
PendingQuotaAmount = 0;
145+
break;
146+
147+
case EWakeupTag::RlNoResource:
148+
// Re-requesting the quota. We do this until we get a quota.
149+
RequestDataQuota(PendingQuotaAmount, ctx);
150+
break;
151+
152+
default:
153+
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
154+
}
155+
}
119156

120157
void Bootstrap(const TActorContext& ctx) {
121158
LOG_INFO_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "Fetch request actor boostrapped. Request is valid: " << (!Response));
@@ -128,7 +165,8 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
128165
ctx.Schedule(TDuration::MilliSeconds(Min<ui32>(Settings.MaxWaitTimeMs, 30000)), new TEvPersQueue::TEvHasDataInfoResponse);
129166

130167
SendSchemeCacheRequest(ctx);
131-
Become(&TPQFetchRequestActor::StateFunc, ctx, TDuration::MilliSeconds(DefaultTimeout), new TEvents::TEvWakeup());
168+
Schedule(DefaultTimeout, new TEvPrivate::TEvTimeout());
169+
Become(&TPQFetchRequestActor::StateFunc);
132170
}
133171

134172
void SendSchemeCacheRequest(const TActorContext& ctx) {
@@ -332,6 +370,7 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
332370
for (auto& actor: PQClient) {
333371
NTabletPipe::CloseClient(ctx, actor);
334372
}
373+
TRlHelpers::PassAway(SelfId());
335374
TActorBootstrapped<TPQFetchRequestActor>::Die(ctx);
336375
}
337376

@@ -433,7 +472,32 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
433472
read->SetErrorReason(record.GetErrorReason());
434473

435474
++FetchRequestReadsDone;
436-
ProceedFetchRequest(ctx);
475+
476+
auto it = TopicInfo.find(CanonizePath(topic));
477+
Y_ABORT_UNLESS(it != TopicInfo.end());
478+
479+
SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode());
480+
481+
if (IsQuotaRequired()) {
482+
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record));
483+
RequestDataQuota(PendingQuotaAmount, ctx);
484+
} else {
485+
ProceedFetchRequest(ctx);
486+
}
487+
488+
}
489+
490+
ui64 GetPayloadSize(const NKikimrClient::TResponse& record) const {
491+
ui64 readBytesSize = 0;
492+
const auto& response = record.GetPartitionResponse();
493+
if (response.HasCmdReadResult()) {
494+
const auto& results = response.GetCmdReadResult().GetResult();
495+
for (auto& r : results) {
496+
auto proto(NKikimr::GetDeserializedData(r.GetData()));
497+
readBytesSize += proto.GetData().Size();
498+
}
499+
}
500+
return readBytesSize;
437501
}
438502

439503
bool CheckAccess(const TSecurityObject& access) {
@@ -466,9 +530,10 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
466530
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
467531
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
468532
HFunc(TEvPersQueue::TEvResponse, Handle);
533+
HFunc(TEvents::TEvWakeup, Handle);
469534
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse);
470535
HFunc(TEvPersQueue::TEvHasDataInfoResponse, Handle);
471-
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
536+
CFunc(TEvPrivate::EvTimeout, HandleTimeout);
472537
CFunc(NActors::TEvents::TSystem::PoisonPill, Die);
473538
)
474539
};

ydb/core/persqueue/fetch_request_actor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <ydb/core/persqueue/pq_rl_helpers.h>
34
#include <ydb/library/actors/core/actor.h>
45
#include <ydb/library/aclib/aclib.h>
56

@@ -31,17 +32,19 @@ struct TFetchRequestSettings {
3132
TMaybe<NACLib::TUserToken> User;
3233
ui64 MaxWaitTimeMs;
3334
ui64 TotalMaxBytes;
35+
TRlContext RlCtx;
3436

3537
ui64 RequestId = 0;
3638
TFetchRequestSettings(
37-
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes,
39+
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx,
3840
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0
3941
)
4042
: Database(database)
4143
, Partitions(partitions)
4244
, User(user)
4345
, MaxWaitTimeMs(maxWaitTimeMs)
4446
, TotalMaxBytes(totalMaxBytes)
47+
, RlCtx(rlCtx)
4548
, RequestId(requestId)
4649
{}
4750
};

ydb/core/persqueue/pq_rl_helpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class TRlHelpers: public NMetering::TStreamRequestUnitsCalculator {
5353
};
5454

5555
void Bootstrap(const TActorId selfId, const NActors::TActorContext& ctx);
56-
void PassAway(const TActorId selfId);
56+
void PassAway(const TActorId selfId); //savnik а где он вызывается в текущих акторах, которые наследуют этот класс?
5757

5858
bool IsQuotaRequired() const;
5959
bool IsQuotaInflight() const;

ydb/core/persqueue/writer/writer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
735735
NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient);
736736
}
737737
SendError("Unexpected termination");
738+
TRlHelpers::PassAway(SelfId());
738739
TActorBootstrapped::PassAway();
739740
}
740741

ydb/services/datastreams/datastreams_proxy.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,6 +1560,7 @@ namespace NKikimr::NDataStreams::V1 {
15601560

15611561
void TGetRecordsActor::Die(const TActorContext& ctx) {
15621562
NTabletPipe::CloseClient(ctx, PipeClient);
1563+
TRlHelpers::PassAway(SelfId());
15631564
TBase::Die(ctx);
15641565
}
15651566

ydb/services/datastreams/put_records_actor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ namespace NKikimr::NDataStreams::V1 {
224224
void Bootstrap(const NActors::TActorContext &ctx);
225225
void PreparePartitionActors(const NActors::TActorContext& ctx);
226226
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
227+
void Die(const TActorContext& ctx) override;
227228

228229
protected:
229230
void Write(const TActorContext& ctx);
@@ -258,6 +259,12 @@ namespace NKikimr::NDataStreams::V1 {
258259
}
259260
};
260261

262+
template<class TDerived, class TProto>
263+
void TPutRecordsActorBase<TDerived, TProto>::Die(const TActorContext& ctx) {
264+
TRlHelpers::PassAway(TDerived::SelfId());
265+
TBase::Die(ctx);
266+
}
267+
261268
template<class TDerived, class TProto>
262269
TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request)
263270
: TBase(request, dynamic_cast<const typename TProto::TRequest*>(request->GetRequest())->stream_name())

ydb/services/persqueue_v1/actors/direct_read_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void TDirectReadSessionActor::Die(const TActorContext& ctx) {
154154
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, LOG_PREFIX << " proxy is DEAD");
155155
ctx.Send(GetPQReadServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie));
156156
ctx.Send(NPQ::MakePQDReadCacheServiceActorId(), new TEvPQProxy::TEvDirectReadDataSessionDead(Session));
157-
157+
TRlHelpers::PassAway(SelfId());
158158
TActorBootstrapped<TDirectReadSessionActor>::Die(ctx);
159159
}
160160

ydb/services/persqueue_v1/actors/read_session_actor.ipp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
369369

370370
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD");
371371
ctx.Send(GetPQReadServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie));
372-
372+
TRlHelpers::PassAway(TActorBootstrapped<TReadSessionActor>::SelfId());
373373
TActorBootstrapped<TReadSessionActor>::Die(ctx);
374374
}
375375

ydb/services/persqueue_v1/actors/write_session_actor.ipp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
299299
}
300300

301301
State = ES_DYING;
302-
302+
TRlHelpers::PassAway(TActorBootstrapped<TWriteSessionActor>::SelfId());
303303
TActorBootstrapped<TWriteSessionActor>::Die(ctx);
304304
}
305305

0 commit comments

Comments
 (0)