Skip to content

Commit 7021134

Browse files
committed
kafka offset commit
1 parent 30e9736 commit 7021134

8 files changed

+481
-107
lines changed

ydb/core/kafka_proxy/actors/actors.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code)
128128
return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
129129
case Ydb::PersQueue::ErrorCode::ErrorCode::ACCESS_DENIED:
130130
return EKafkaErrors::TOPIC_AUTHORIZATION_FAILED;
131+
case Ydb::PersQueue::ErrorCode::ErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
132+
case Ydb::PersQueue::ErrorCode::ErrorCode::SET_OFFSET_ERROR_COMMIT_TO_PAST:
133+
return EKafkaErrors::OFFSET_OUT_OF_RANGE;
131134
default:
132135
return EKafkaErrors::UNKNOWN_SERVER_ERROR;
133136
}

ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
4343
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
4444
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
4545
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
46-
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT);
46+
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1});
4747
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);
4848

4949
return response;
Lines changed: 177 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,204 @@
11
#include "kafka_offset_commit_actor.h"
22

3-
#include <ydb/core/kafka_proxy/kafka_events.h>
4-
53
namespace NKafka {
64

75

86
NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message) {
97
return new TKafkaOffsetCommitActor(context, correlationId, message);
108
}
119

12-
TOffsetCommitResponseData::TPtr TKafkaOffsetCommitActor::GetOffsetCommitResponse() {
13-
TOffsetCommitResponseData::TPtr response = std::make_shared<TOffsetCommitResponseData>();
10+
TString TKafkaOffsetCommitActor::LogPrefix() {
11+
return "TKafkaOffsetCommitActor";
12+
}
13+
14+
void TKafkaOffsetCommitActor::Die(const TActorContext& ctx) {
15+
KAFKA_LOG_D("PassAway");
16+
ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill());
17+
for (const auto& tabletToPipePair: TabletIdToPipe) {
18+
NTabletPipe::CloseClient(ctx, tabletToPipePair.second);
19+
}
20+
TBase::Die(ctx);
21+
}
22+
23+
void TKafkaOffsetCommitActor::Handle(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) {
24+
KAFKA_LOG_CRIT("Auth failed. reason# " << ev->Get()->Reason);
25+
Error = ConvertErrorCode(ev->Get()->ErrorCode);
26+
SendFailedForAllPartitions(Error, ctx);
27+
}
1428

29+
void TKafkaOffsetCommitActor::SendFailedForAllPartitions(EKafkaErrors error, const TActorContext& ctx) {
1530
for (auto topicReq: Message->Topics) {
1631
TOffsetCommitResponseData::TOffsetCommitResponseTopic topic;
1732
topic.Name = topicReq.Name;
1833
for (auto partitionRequest: topicReq.Partitions) {
1934
TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition partition;
2035
partition.PartitionIndex = partitionRequest.PartitionIndex;
21-
partition.ErrorCode = NONE_ERROR;
36+
partition.ErrorCode = error;
2237
topic.Partitions.push_back(partition);
2338
}
24-
response->Topics.push_back(topic);
39+
Response->Topics.push_back(topic);
40+
}
41+
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, Error));
42+
Die(ctx);
43+
}
44+
45+
void TKafkaOffsetCommitActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
46+
TEvTabletPipe::TEvClientConnected *msg = ev->Get();
47+
48+
if (msg->Status != NKikimrProto::OK) {
49+
KAFKA_LOG_CRIT("Pipe to tablet is dead. status# " << ev->Get()->Status);
50+
ProcessPipeProblem(msg->TabletId, ctx);
51+
}
52+
}
53+
54+
void TKafkaOffsetCommitActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
55+
KAFKA_LOG_CRIT("Pipe to tablet is destroyed");
56+
ProcessPipeProblem(ev->Get()->TabletId, ctx);
57+
}
58+
59+
void TKafkaOffsetCommitActor::ProcessPipeProblem(ui64 tabletId, const TActorContext& ctx) {
60+
auto cookiesIt = TabletIdToCookies.find(tabletId);
61+
Y_ABORT_UNLESS(cookiesIt != TabletIdToCookies.end());
62+
63+
for (auto cookie: cookiesIt->second) {
64+
auto requestInfoIt = CookieToRequestInfo.find(cookie);
65+
Y_ABORT_UNLESS(requestInfoIt != CookieToRequestInfo.end());
66+
67+
if (!requestInfoIt->second.Done) {
68+
requestInfoIt->second.Done = true;
69+
AddPartitionResponse(EKafkaErrors::UNKNOWN_SERVER_ERROR, requestInfoIt->second.TopicName, requestInfoIt->second.PartitionId, ctx);
70+
}
71+
}
72+
}
73+
74+
void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) {
75+
KAFKA_LOG_D("Auth success. Topics count: " << ev->Get()->TopicAndTablets.size());
76+
TopicAndTablets = std::move(ev->Get()->TopicAndTablets);
77+
78+
for (auto topicReq: Message->Topics) {
79+
auto topicIt = TopicAndTablets.find(NormalizePath(Context->DatabasePath, topicReq.Name.value()));
80+
for (auto partitionRequest: topicReq.Partitions) {
81+
if (topicIt == TopicAndTablets.end()) {
82+
AddPartitionResponse(UNKNOWN_TOPIC_OR_PARTITION, topicReq.Name.value(), partitionRequest.PartitionIndex, ctx);
83+
continue;
84+
}
85+
86+
auto tabletIdIt = topicIt->second.PartitionIdToTabletId.find(partitionRequest.PartitionIndex);
87+
if (tabletIdIt == topicIt->second.PartitionIdToTabletId.end()) {
88+
AddPartitionResponse(UNKNOWN_TOPIC_OR_PARTITION, topicReq.Name.value(), partitionRequest.PartitionIndex, ctx);
89+
continue;
90+
}
91+
92+
ui64 tabletId = tabletIdIt->second;
93+
94+
if (!TabletIdToPipe.contains(tabletId)) {
95+
NTabletPipe::TClientConfig clientConfig;
96+
clientConfig.RetryPolicy = RetryPolicyForPipes;
97+
TabletIdToPipe[tabletId] = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
98+
}
99+
100+
NKikimrClient::TPersQueueRequest request;
101+
request.MutablePartitionRequest()->SetTopic(topicIt->second.TopicNameConverter->GetPrimaryPath());
102+
request.MutablePartitionRequest()->SetPartition(partitionRequest.PartitionIndex);
103+
request.MutablePartitionRequest()->SetCookie(NextCookie);
104+
105+
TRequestInfo info(topicReq.Name.value(), partitionRequest.PartitionIndex);
106+
107+
CookieToRequestInfo.emplace(std::make_pair(NextCookie, info));
108+
TabletIdToCookies[tabletId].push_back(NextCookie);
109+
NextCookie++;
110+
111+
auto commit = request.MutablePartitionRequest()->MutableCmdSetClientOffset();
112+
commit->SetClientId(Message->GroupId.value());
113+
commit->SetOffset(partitionRequest.CommittedOffset);
114+
commit->SetStrict(true);
115+
116+
PendingResponses++;
117+
KAFKA_LOG_D("Send commit request for group# " << Message->GroupId.value() <<
118+
", topic# " << topicIt->second.TopicNameConverter->GetPrimaryPath() <<
119+
", partition# " << partitionRequest.PartitionIndex <<
120+
", offset# " << partitionRequest.CommittedOffset);
121+
122+
TAutoPtr<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
123+
req->Record.Swap(&request);
124+
125+
NTabletPipe::SendData(ctx, TabletIdToPipe[tabletId], req.Release());
126+
}
127+
}
128+
}
129+
130+
void TKafkaOffsetCommitActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
131+
const auto& partitionResult = ev->Get()->Record.GetPartitionResponse();
132+
auto requestInfo = CookieToRequestInfo.find(partitionResult.GetCookie());
133+
requestInfo->second.Done = true;
134+
135+
Y_ABORT_UNLESS(requestInfo != CookieToRequestInfo.end());
136+
if (ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) {
137+
KAFKA_LOG_CRIT("Commit offset error. status# " << EErrorCode_Name(ev->Get()->Record.GetErrorCode()) << ", reason# " << ev->Get()->Record.GetErrorReason());
25138
}
26139

27-
return response;
140+
AddPartitionResponse(ConvertErrorCode(NGRpcProxy::V1::ConvertOldCode(ev->Get()->Record.GetErrorCode())), requestInfo->second.TopicName, requestInfo->second.PartitionId, ctx);
141+
}
142+
143+
void TKafkaOffsetCommitActor::AddPartitionResponse(EKafkaErrors error, const TString& topicName, ui64 partitionId, const TActorContext& ctx) {
144+
if (error != NONE_ERROR) {
145+
Error = error;
146+
}
147+
148+
PendingResponses--;
149+
TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition partitionResponse;
150+
partitionResponse.PartitionIndex = partitionId;
151+
partitionResponse.ErrorCode = error;
152+
153+
auto topicIdIt = ResponseTopicIds.find(topicName);
154+
155+
if (topicIdIt != ResponseTopicIds.end()) {
156+
Response->Topics[topicIdIt->second].Partitions.push_back(partitionResponse);
157+
} else {
158+
ResponseTopicIds[topicName] = Response->Topics.size();
159+
160+
TOffsetCommitResponseData::TOffsetCommitResponseTopic topicResponse;
161+
topicResponse.Name = topicName;
162+
topicResponse.Partitions.push_back(partitionResponse);
163+
164+
Response->Topics.push_back(topicResponse);
165+
}
166+
167+
if (PendingResponses == 0) {
168+
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, Error));
169+
Die(ctx);
170+
}
28171
}
29172

30173
void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
31-
Y_UNUSED(Message);
32-
auto response = GetOffsetCommitResponse();
174+
THashSet<TString> topicsToResolve;
175+
for (auto topicReq: Message->Topics) {
176+
topicsToResolve.insert(NormalizePath(Context->DatabasePath, topicReq.Name.value()));
177+
}
33178

34-
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, EKafkaErrors::NONE_ERROR));
35-
Die(ctx);
179+
auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
180+
NKikimr::AppData(ctx)->PQConfig, ""
181+
);
182+
183+
auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
184+
topicConverterFactory
185+
);
186+
187+
auto topicsToConverter = topicHandler->GetReadTopicsList(topicsToResolve, false, Context->DatabasePath);
188+
if (!topicsToConverter.IsValid) {
189+
KAFKA_LOG_CRIT("Commit offsets failed. reason# topicsToConverter is not valid");
190+
Error = INVALID_REQUEST;
191+
SendFailedForAllPartitions(Error, ctx);
192+
return;
193+
}
194+
195+
AuthInitActor = ctx.Register(new NKikimr::NGRpcProxy::V1::TReadInitAndAuthActor(
196+
ctx, ctx.SelfID, Message->GroupId.value(), 0, "",
197+
NKikimr::NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), NKikimr::MakeSchemeCacheID(), nullptr, Context->UserToken, topicsToConverter,
198+
topicHandler->GetLocalCluster(), false)
199+
);
200+
201+
Become(&TKafkaOffsetCommitActor::StateWork);
36202
}
37203

38204
} // NKafka
Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,92 @@
11
#include "actors.h"
22

3+
#include "ydb/core/base/tablet_pipe.h"
4+
#include "ydb/core/grpc_services/local_rpc/local_rpc.h"
5+
#include <ydb/core/kafka_proxy/kafka_events.h>
6+
#include <ydb/core/persqueue/events/internal.h>
7+
#include <ydb/library/aclib/aclib.h>
8+
#include <ydb/library/actors/core/actor.h>
39
#include <ydb/library/actors/core/actor_bootstrapped.h>
10+
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
11+
#include "ydb/public/lib/base/msgbus_status.h"
12+
#include <ydb/services/persqueue_v1/actors/events.h>
13+
#include "ydb/services/persqueue_v1/actors/persqueue_utils.h"
14+
#include <ydb/services/persqueue_v1/actors/read_init_auth_actor.h>
15+
416

517
namespace NKafka {
18+
using namespace NKikimr;
619

720
class TKafkaOffsetCommitActor: public NActors::TActorBootstrapped<TKafkaOffsetCommitActor> {
21+
22+
struct TRequestInfo {
23+
TString TopicName = "";
24+
ui64 PartitionId = 0;
25+
bool Done = false;
26+
27+
TRequestInfo(const TString& topicName, ui64 partitionId)
28+
: TopicName(topicName), PartitionId(partitionId) {}
29+
};
30+
831
public:
32+
using TBase = NActors::TActorBootstrapped<TKafkaOffsetCommitActor>;
933
TKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message)
1034
: Context(context)
1135
, CorrelationId(correlationId)
12-
, Message(message) {
36+
, Message(message)
37+
, Response(new TOffsetCommitResponseData()) {
1338
}
1439

1540
void Bootstrap(const NActors::TActorContext& ctx);
16-
TOffsetCommitResponseData::TPtr GetOffsetCommitResponse();
41+
42+
private:
43+
TString LogPrefix();
44+
void Die(const TActorContext& ctx) override;
45+
46+
STATEFN(StateWork) {
47+
KAFKA_LOG_T("Received event: " << (*ev.Get()).GetTypeName());
48+
switch (ev->GetTypeRewrite()) {
49+
HFunc(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk, Handle);
50+
HFunc(TEvPersQueue::TEvResponse, Handle);
51+
HFunc(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession, Handle);
52+
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
53+
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
54+
}
55+
}
56+
57+
void Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx);
58+
void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx);
59+
void Handle(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx);
60+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
61+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
62+
63+
void AddPartitionResponse(EKafkaErrors error, const TString& topicName, ui64 partitionId, const TActorContext& ctx);
64+
void ProcessPipeProblem(ui64 tabletId, const TActorContext& ctx);
65+
void SendFailedForAllPartitions(EKafkaErrors error, const TActorContext& ctx);
1766

1867
private:
1968
const TContext::TPtr Context;
2069
const ui64 CorrelationId;
2170
const TMessagePtr<TOffsetCommitRequestData> Message;
71+
const TOffsetCommitResponseData::TPtr Response;
72+
73+
ui64 PendingResponses = 0;
74+
ui64 NextCookie = 0;
75+
std::unordered_map<ui64, TVector<ui64>> TabletIdToCookies;
76+
std::unordered_map<ui64, TRequestInfo> CookieToRequestInfo;
77+
std::unordered_map<TString, ui64> ResponseTopicIds;
78+
NKikimr::NGRpcProxy::TTopicInitInfoMap TopicAndTablets;
79+
std::unordered_map<ui64, TActorId> TabletIdToPipe;
80+
TActorId AuthInitActor;
81+
EKafkaErrors Error = NONE_ERROR;
82+
83+
static constexpr NTabletPipe::TClientRetryPolicy RetryPolicyForPipes = {
84+
.RetryLimitCount = 6,
85+
.MinRetryTime = TDuration::MilliSeconds(10),
86+
.MaxRetryTime = TDuration::MilliSeconds(100),
87+
.BackoffMultiplier = 2,
88+
.DoFirstRetryInstantly = true
89+
};
2290
};
2391

2492
} // NKafka

0 commit comments

Comments
 (0)