Skip to content

Commit d655aaa

Browse files
authored
Kafka api charge extra RU on request (#3929) (#4472)
1 parent da78771 commit d655aaa

File tree

7 files changed

+36
-28
lines changed

7 files changed

+36
-28
lines changed

ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp

+9-11
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
3434
for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) {
3535
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
3636
PrepareFetchRequestData(topicIndex, partPQRequests);
37-
38-
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken);
39-
37+
auto ruPerRequest = topicIndex == 0 && Context->Config.GetMeteringV2Enabled();
38+
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, ruPerRequest);
4039
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
4140
auto actorId = ctx.Register(fetchActor);
4241
PendingResponses++;
@@ -55,7 +54,6 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector<
5554
for (size_t partIndex = 0; partIndex < topicKafkaRequest.Partitions.size(); partIndex++) {
5655
auto& partKafkaRequest = topicKafkaRequest.Partitions[partIndex];
5756
KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. Topic: " << topicKafkaRequest.Topic.value() << " Partition: " << partKafkaRequest.Partition << " FetchOffset: " << partKafkaRequest.FetchOffset << " PartitionMaxBytes: " << partKafkaRequest.PartitionMaxBytes);
58-
5957
auto& partPQRequest = partPQRequests[partIndex];
6058
partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); // FIXME(savnik): handle empty topic
6159
partPQRequest.Partition = partKafkaRequest.Partition;
@@ -113,9 +111,9 @@ void TKafkaFetchActor::HandleSuccessResponse(const NKikimr::TEvPQ::TEvFetchRespo
113111
partKafkaResponse.ErrorCode = ConvertErrorCode(partPQResponse.GetReadResult().GetErrorCode());
114112

115113
if (partPQResponse.GetReadResult().GetErrorCode() != NPersQueue::NErrorCode::EErrorCode::OK) {
116-
KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic <<
117-
", partition: " << partPQResponse.GetPartition() <<
118-
". Code: " << static_cast<size_t>(partPQResponse.GetReadResult().GetErrorCode()) <<
114+
KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic <<
115+
", partition: " << partPQResponse.GetPartition() <<
116+
". Code: " << static_cast<size_t>(partPQResponse.GetReadResult().GetErrorCode()) <<
119117
". Reason: " + partPQResponse.GetReadResult().GetErrorReason());
120118
}
121119

@@ -174,7 +172,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp
174172
record.TimestampDelta = lastTimestamp - baseTimestamp;
175173

176174
record.Length = record.Size(TKafkaRecord::MessageMeta::PresentVersions.Max) - SizeOfZeroVarint;
177-
KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta <<
175+
KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta <<
178176
", TimestampDelta: " << record.TimestampDelta << ", Length: " << record.Length);
179177
}
180178

@@ -187,11 +185,11 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp
187185
//recordsBatch.Attributes https://kafka.apache.org/documentation/#recordbatch
188186

189187
recordsBatch.BatchLength = recordsBatch.Size(TKafkaRecordBatch::MessageMeta::PresentVersions.Max) - BatchFirstTwoFieldsSize;
190-
KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta <<
191-
", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp <<
188+
KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta <<
189+
", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp <<
192190
", BaseSequence: " << recordsBatch.BaseSequence << ", BatchLength: " << recordsBatch.BatchLength);
193191
auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, partPQResponse.GetTopic());
194-
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", "")));
192+
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", "")));
195193
}
196194

197195
void TKafkaFetchActor::RespondIfRequired(const TActorContext& ctx) {

ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp

+11-6
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ void TKafkaProduceActor::LogEvent(IEventHandle& ev) {
4343
void TKafkaProduceActor::SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx) {
4444
auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, topicName);
4545
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, TStringBuilder() << "api.kafka.produce." << name, "")));
46-
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", "")));
46+
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", "")));
4747
}
4848

4949
void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) {
@@ -82,7 +82,7 @@ void TKafkaProduceActor::PassAway() {
8282
void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) {
8383
const auto now = ctx.Now();
8484

85-
std::map<TString, TTopicInfo> newTopics;
85+
std::map<TString, TTopicInfo> newTopics;
8686
for(auto& [topicPath, topicInfo] : Topics) {
8787
if (topicInfo.ExpirationTime > now) {
8888
newTopics[topicPath] = std::move(topicInfo);
@@ -242,7 +242,8 @@ size_t TKafkaProduceActor::EnqueueInitialization() {
242242
THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data,
243243
const TString& topicName,
244244
ui64 cookie,
245-
const TString& clientDC) {
245+
const TString& clientDC,
246+
bool ruPerRequest) {
246247
auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
247248
auto& request = ev->Record;
248249

@@ -254,6 +255,9 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
254255
partitionRequest->SetPartition(data.Index);
255256
// partitionRequest->SetCmdWriteOffset();
256257
partitionRequest->SetCookie(cookie);
258+
if (ruPerRequest) {
259+
partitionRequest->SetMeteringV2Enabled(true);
260+
}
257261

258262
ui64 totalSize = 0;
259263

@@ -317,11 +321,11 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
317321
pendingRequest->StartTime = ctx.Now();
318322

319323
size_t position = 0;
324+
bool ruPerRequest = Context->Config.GetMeteringV2Enabled();
320325
for(const auto& topicData : r->TopicData) {
321326
const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
322327
for(const auto& partitionData : topicData.PartitionData) {
323328
const auto partitionId = partitionData.Index;
324-
325329
auto writer = PartitionWriter(topicPath, partitionId, ctx);
326330
if (OK == writer.first) {
327331
auto ownCookie = ++Cookie;
@@ -334,7 +338,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
334338
pendingRequest->WaitAcceptingCookies.insert(ownCookie);
335339
pendingRequest->WaitResultCookies.insert(ownCookie);
336340

337-
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC);
341+
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, ruPerRequest);
342+
ruPerRequest = false;
338343

339344
Send(writer.second, std::move(ev));
340345
} else {
@@ -441,7 +446,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
441446
// We send the results in the order of receipt of the request
442447
while (!PendingRequests.empty()) {
443448
auto pendingRequest = PendingRequests.front();
444-
449+
445450
// We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died.
446451
bool expired = expireTime > pendingRequest->StartTime;
447452

ydb/core/persqueue/fetch_request_actor.cpp

+9-8
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ struct TEvPrivate {
7575

7676
bool CanProcessFetchRequest; //any partitions answered that it has data or WaitMs timeout occured
7777
ui32 FetchRequestReadsDone;
78-
ui64 FetchRequestCurrentReadTablet;
78+
ui64 FetchRequestCurrentReadTablet;
7979
ui64 CurrentCookie;
8080
ui32 FetchRequestBytesLeft;
8181
THolder<TEvPQ::TEvFetchResponse> Response;
@@ -145,10 +145,10 @@ struct TEvPrivate {
145145
break;
146146

147147
case EWakeupTag::RlNoResource:
148-
// Re-requesting the quota. We do this until we get a quota.
148+
// Re-requesting the quota. We do this until we get a quota.
149149
RequestDataQuota(PendingQuotaAmount, ctx);
150150
break;
151-
151+
152152
default:
153153
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
154154
}
@@ -171,7 +171,7 @@ struct TEvPrivate {
171171

172172
void SendSchemeCacheRequest(const TActorContext& ctx) {
173173
LOG_DEBUG_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "SendSchemeCacheRequest");
174-
174+
175175
auto schemeCacheRequest = std::make_unique<TSchemeCacheNavigate>(1);
176176
schemeCacheRequest->DatabaseName = Settings.Database;
177177

@@ -250,7 +250,7 @@ struct TEvPrivate {
250250
), ctx
251251
);;
252252
}
253-
253+
254254
auto& description = entry.PQGroupInfo->Description;
255255
auto& topicInfo = TopicInfo[path];
256256
topicInfo.BalancerTabletId = description.GetBalancerTabletID();
@@ -345,7 +345,7 @@ struct TEvPrivate {
345345
if (HandlePipeError(tabletId, ctx))
346346
return;
347347

348-
auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status"
348+
auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status"
349349
<< NKikimrProto::EReplyStatus_Name(msg->Status).data()
350350
<< ", Marker# PQ6";
351351
return SendReplyAndDie(CreateErrorReply(Ydb::StatusIds::INTERNAL_ERROR, reason), ctx);
@@ -423,7 +423,7 @@ struct TEvPrivate {
423423
preq->Record.SetRequestId(reqId);
424424
auto partReq = preq->Record.MutablePartitionRequest();
425425
partReq->SetCookie(CurrentCookie);
426-
426+
427427
partReq->SetTopic(topic);
428428
partReq->SetPartition(part);
429429
auto read = partReq->MutableCmdRead();
@@ -479,7 +479,8 @@ struct TEvPrivate {
479479
SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode());
480480

481481
if (IsQuotaRequired()) {
482-
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record));
482+
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + (Settings.RuPerRequest ? 1 : 0);
483+
Settings.RuPerRequest = false;
483484
RequestDataQuota(PendingQuotaAmount, ctx);
484485
} else {
485486
ProceedFetchRequest(ctx);

ydb/core/persqueue/fetch_request_actor.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ struct TPartitionFetchRequest {
1515
ui64 Offset;
1616
ui64 MaxBytes;
1717
ui64 ReadTimestampMs;
18-
18+
1919
TPartitionFetchRequest(const TString& topic, ui32 partition, ui64 offset, ui64 maxBytes, ui64 readTimestampMs = 0, const TString& clientId = NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER)
2020
: Topic(topic)
2121
, ClientId(clientId)
@@ -35,18 +35,20 @@ struct TFetchRequestSettings {
3535
ui64 MaxWaitTimeMs;
3636
ui64 TotalMaxBytes;
3737
TRlContext RlCtx;
38+
bool RuPerRequest;
3839

3940
ui64 RequestId = 0;
4041
TFetchRequestSettings(
4142
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx,
42-
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0
43+
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool ruPerRequest = false
4344
)
4445
: Database(database)
4546
, Partitions(partitions)
4647
, User(user)
4748
, MaxWaitTimeMs(maxWaitTimeMs)
4849
, TotalMaxBytes(totalMaxBytes)
4950
, RlCtx(rlCtx)
51+
, RuPerRequest(ruPerRequest)
5052
, RequestId(requestId)
5153
{}
5254
};

ydb/core/persqueue/writer/writer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
489489

490490
if (needToRequestQuota) {
491491
++processed;
492-
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize());
492+
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + (it->second.GetPartitionRequest().GetMeteringV2Enabled() ? 1 : 0);
493493
PendingQuota.emplace_back(it->first);
494494
}
495495

ydb/core/protos/config.proto

+1
Original file line numberDiff line numberDiff line change
@@ -1701,6 +1701,7 @@ message TKafkaProxyConfig {
17011701
}
17021702

17031703
optional TProxy Proxy = 7;
1704+
optional bool MeteringV2Enabled = 10 [default = false];
17041705
}
17051706

17061707
message TAwsCompatibilityConfig {

ydb/core/protos/msgbus_pq.proto

+1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ message TPersQueuePartitionRequest {
170170
optional bool IsDirectWrite = 18 [default = false];
171171
optional uint64 PutUnitsSize = 19;
172172
optional int64 InitialSeqNo = 26;
173+
optional bool MeteringV2Enabled = 27 [default = false];
173174
}
174175

175176
message TPersQueueMetaRequest {

0 commit comments

Comments
 (0)