@@ -243,7 +243,7 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
243
243
const TString& topicName,
244
244
ui64 cookie,
245
245
const TString& clientDC,
246
- bool chargeExtraRU ) {
246
+ bool ruPerRequest ) {
247
247
auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
248
248
auto & request = ev->Record ;
249
249
@@ -255,8 +255,8 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
255
255
partitionRequest->SetPartition (data.Index );
256
256
// partitionRequest->SetCmdWriteOffset();
257
257
partitionRequest->SetCookie (cookie);
258
- if (chargeExtraRU ) {
259
- partitionRequest->SetChargeExtraRU (true );
258
+ if (ruPerRequest ) {
259
+ partitionRequest->SetMeteringV2Enabled (true );
260
260
}
261
261
262
262
ui64 totalSize = 0 ;
@@ -323,7 +323,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
323
323
pendingRequest->StartTime = ctx.Now ();
324
324
325
325
size_t position = 0 ;
326
- bool chargeExtraRU = Context->Config .GetChargeExtraRUOnRequest ();
326
+ bool ruPerRequest = Context->Config .KafkaMeteringV2Enabled ();
327
327
for (const auto & topicData : r->TopicData ) {
328
328
const TString& topicPath = NormalizePath (Context->DatabasePath , *topicData.Name );
329
329
for (const auto & partitionData : topicData.PartitionData ) {
@@ -340,8 +340,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
340
340
pendingRequest->WaitAcceptingCookies .insert (ownCookie);
341
341
pendingRequest->WaitResultCookies .insert (ownCookie);
342
342
343
- auto ev = Convert (partitionData, *topicData.Name , ownCookie, ClientDC, chargeExtraRU );
344
- chargeExtraRU = false ;
343
+ auto ev = Convert (partitionData, *topicData.Name , ownCookie, ClientDC, ruPerRequest );
344
+ ruPerRequest = false ;
345
345
346
346
Send (writer.second , std::move (ev));
347
347
} else {
0 commit comments