Skip to content

Commit a3c5afc

Browse files
authored
Merge 35d2479 into 1dd36c2
2 parents 1dd36c2 + 35d2479 commit a3c5afc

File tree

7 files changed

+16
-16
lines changed

7 files changed

+16
-16
lines changed

ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
3434
for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) {
3535
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
3636
PrepareFetchRequestData(topicIndex, partPQRequests);
37-
auto chargeExtraRU = topicIndex == 0 && Context->Config.GetChargeExtraRUOnRequest();
38-
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, chargeExtraRU);
37+
auto ruPerRequest = topicIndex == 0 && Context->Config.GetMeteringV2Enabled();
38+
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, ruPerRequest);
3939
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
4040
auto actorId = ctx.Register(fetchActor);
4141
PendingResponses++;

ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
243243
const TString& topicName,
244244
ui64 cookie,
245245
const TString& clientDC,
246-
bool chargeExtraRU) {
246+
bool ruPerRequest) {
247247
auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
248248
auto& request = ev->Record;
249249

@@ -255,8 +255,8 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
255255
partitionRequest->SetPartition(data.Index);
256256
// partitionRequest->SetCmdWriteOffset();
257257
partitionRequest->SetCookie(cookie);
258-
if (chargeExtraRU) {
259-
partitionRequest->SetChargeExtraRU(true);
258+
if (ruPerRequest) {
259+
partitionRequest->SetMeteringV2Enabled(true);
260260
}
261261

262262
ui64 totalSize = 0;
@@ -323,7 +323,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
323323
pendingRequest->StartTime = ctx.Now();
324324

325325
size_t position = 0;
326-
bool chargeExtraRU = Context->Config.GetChargeExtraRUOnRequest();
326+
bool ruPerRequest = Context->Config.GetMeteringV2Enabled();
327327
for(const auto& topicData : r->TopicData) {
328328
const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
329329
for(const auto& partitionData : topicData.PartitionData) {
@@ -340,8 +340,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
340340
pendingRequest->WaitAcceptingCookies.insert(ownCookie);
341341
pendingRequest->WaitResultCookies.insert(ownCookie);
342342

343-
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRU);
344-
chargeExtraRU = false;
343+
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, ruPerRequest);
344+
ruPerRequest = false;
345345

346346
Send(writer.second, std::move(ev));
347347
} else {

ydb/core/persqueue/fetch_request_actor.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -480,8 +480,8 @@ struct TEvPrivate {
480480
SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode());
481481

482482
if (IsQuotaRequired()) {
483-
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + (Settings.ChargeExtraRU ? 1 : 0);
484-
Settings.ChargeExtraRU = false;
483+
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + (Settings.RuPerRequest ? 1 : 0);
484+
Settings.RuPerRequest = false;
485485
RequestDataQuota(PendingQuotaAmount, ctx);
486486
} else {
487487
ProceedFetchRequest(ctx);

ydb/core/persqueue/fetch_request_actor.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,20 @@ struct TFetchRequestSettings {
3535
ui64 MaxWaitTimeMs;
3636
ui64 TotalMaxBytes;
3737
TRlContext RlCtx;
38-
bool ChargeExtraRU;
38+
bool RuPerRequest;
3939

4040
ui64 RequestId = 0;
4141
TFetchRequestSettings(
4242
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx,
43-
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool chargeExtraRU = false
43+
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool ruPerRequest = false
4444
)
4545
: Database(database)
4646
, Partitions(partitions)
4747
, User(user)
4848
, MaxWaitTimeMs(maxWaitTimeMs)
4949
, TotalMaxBytes(totalMaxBytes)
5050
, RlCtx(rlCtx)
51-
, ChargeExtraRU(chargeExtraRU)
51+
, RuPerRequest(ruPerRequest)
5252
, RequestId(requestId)
5353
{}
5454
};

ydb/core/persqueue/writer/writer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
526526

527527
if (needToRequestQuota) {
528528
++processed;
529-
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + (it->second.GetPartitionRequest().GetChargeExtraRU() ? 1 : 0);
529+
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + (it->second.GetPartitionRequest().GetMeteringV2Enabled() ? 1 : 0);
530530
PendingQuota.emplace_back(it->first);
531531
}
532532

ydb/core/protos/config.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -1715,7 +1715,7 @@ message TKafkaProxyConfig {
17151715
}
17161716

17171717
optional TProxy Proxy = 7;
1718-
optional bool ChargeExtraRUOnRequest = 10 [default = false];
1718+
optional bool MeteringV2Enabled = 10 [default = false];
17191719
}
17201720

17211721
message TAwsCompatibilityConfig {

ydb/core/protos/msgbus_pq.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ message TPersQueuePartitionRequest {
170170
optional bool IsDirectWrite = 18 [default = false];
171171
optional uint64 PutUnitsSize = 19;
172172
optional int64 InitialSeqNo = 26;
173-
optional bool ChargeExtraRU = 27 [default = false];
173+
optional bool MeteringV2Enabled = 27 [default = false];
174174
}
175175

176176
message TPersQueueMetaRequest {

0 commit comments

Comments
 (0)