Skip to content

Commit 0723267

Browse files
committed
Kafka new metering (#4485)
1 parent 7fd9771 commit 0723267

File tree

7 files changed

+16
-16
lines changed

7 files changed

+16
-16
lines changed

ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
3434
for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) {
3535
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
3636
PrepareFetchRequestData(topicIndex, partPQRequests);
37-
auto chargeExtraRU = topicIndex == 0 && Context->Config.GetChargeExtraRUOnRequest();
38-
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, chargeExtraRU);
37+
auto ruPerRequest = topicIndex == 0 && Context->Config.GetMeteringV2Enabled();
38+
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, ruPerRequest);
3939
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
4040
auto actorId = ctx.Register(fetchActor);
4141
PendingResponses++;

ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
243243
const TString& topicName,
244244
ui64 cookie,
245245
const TString& clientDC,
246-
bool chargeExtraRU) {
246+
bool ruPerRequest) {
247247
auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
248248
auto& request = ev->Record;
249249

@@ -255,8 +255,8 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
255255
partitionRequest->SetPartition(data.Index);
256256
// partitionRequest->SetCmdWriteOffset();
257257
partitionRequest->SetCookie(cookie);
258-
if (chargeExtraRU) {
259-
partitionRequest->SetChargeExtraRU(true);
258+
if (ruPerRequest) {
259+
partitionRequest->SetMeteringV2Enabled(true);
260260
}
261261

262262
ui64 totalSize = 0;
@@ -321,7 +321,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
321321
pendingRequest->StartTime = ctx.Now();
322322

323323
size_t position = 0;
324-
bool chargeExtraRU = Context->Config.GetChargeExtraRUOnRequest();
324+
bool ruPerRequest = Context->Config.GetMeteringV2Enabled();
325325
for(const auto& topicData : r->TopicData) {
326326
const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
327327
for(const auto& partitionData : topicData.PartitionData) {
@@ -338,8 +338,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
338338
pendingRequest->WaitAcceptingCookies.insert(ownCookie);
339339
pendingRequest->WaitResultCookies.insert(ownCookie);
340340

341-
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRU);
342-
chargeExtraRU = false;
341+
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, ruPerRequest);
342+
ruPerRequest = false;
343343

344344
Send(writer.second, std::move(ev));
345345
} else {

ydb/core/persqueue/fetch_request_actor.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,8 @@ struct TEvPrivate {
479479
SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode());
480480

481481
if (IsQuotaRequired()) {
482-
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + (Settings.ChargeExtraRU ? 1 : 0);
483-
Settings.ChargeExtraRU = false;
482+
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + (Settings.RuPerRequest ? 1 : 0);
483+
Settings.RuPerRequest = false;
484484
RequestDataQuota(PendingQuotaAmount, ctx);
485485
} else {
486486
ProceedFetchRequest(ctx);

ydb/core/persqueue/fetch_request_actor.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,20 @@ struct TFetchRequestSettings {
3535
ui64 MaxWaitTimeMs;
3636
ui64 TotalMaxBytes;
3737
TRlContext RlCtx;
38-
bool ChargeExtraRU;
38+
bool RuPerRequest;
3939

4040
ui64 RequestId = 0;
4141
TFetchRequestSettings(
4242
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx,
43-
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool chargeExtraRU = false
43+
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool ruPerRequest = false
4444
)
4545
: Database(database)
4646
, Partitions(partitions)
4747
, User(user)
4848
, MaxWaitTimeMs(maxWaitTimeMs)
4949
, TotalMaxBytes(totalMaxBytes)
5050
, RlCtx(rlCtx)
51-
, ChargeExtraRU(chargeExtraRU)
51+
, RuPerRequest(ruPerRequest)
5252
, RequestId(requestId)
5353
{}
5454
};

ydb/core/persqueue/writer/writer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
489489

490490
if (needToRequestQuota) {
491491
++processed;
492-
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + (it->second.GetPartitionRequest().GetChargeExtraRU() ? 1 : 0);
492+
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + (it->second.GetPartitionRequest().GetMeteringV2Enabled() ? 1 : 0);
493493
PendingQuota.emplace_back(it->first);
494494
}
495495

ydb/core/protos/config.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -1701,7 +1701,7 @@ message TKafkaProxyConfig {
17011701
}
17021702

17031703
optional TProxy Proxy = 7;
1704-
optional bool ChargeExtraRUOnRequest = 10 [default = false];
1704+
optional bool MeteringV2Enabled = 10 [default = false];
17051705
}
17061706

17071707
message TAwsCompatibilityConfig {

ydb/core/protos/msgbus_pq.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ message TPersQueuePartitionRequest {
170170
optional bool IsDirectWrite = 18 [default = false];
171171
optional uint64 PutUnitsSize = 19;
172172
optional int64 InitialSeqNo = 26;
173-
optional bool ChargeExtraRU = 27 [default = false];
173+
optional bool MeteringV2Enabled = 27 [default = false];
174174
}
175175

176176
message TPersQueueMetaRequest {

0 commit comments

Comments
 (0)