Skip to content

Fixes for kcat #1586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
response->ErrorCode = EKafkaErrors::NONE_ERROR;

AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3});
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS);
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA);
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID);
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE);
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE);
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS);
AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3, .MaxVersion=9});
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS, {.MaxVersion=2});
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA, {.MaxVersion=9});
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID, {.MaxVersion=4});
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE, {.MaxVersion=1});
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE, {.MaxVersion=2});
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS, {.MinVersion=1, .MaxVersion=1});
AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3});
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP);
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP);
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP, {.MaxVersion=9});
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP, {.MaxVersion=3});
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP, {.MaxVersion=5});
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT, {.MaxVersion=4});
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR, {.MaxVersion=0});
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=0});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH, {.MaxVersion=8});

return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
response->Coordinators.push_back(coordinator);
}

response->ErrorCode = NONE_ERROR;
response->Host = host;
response->Port = port;
response->NodeId = nodeId;

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
partition.CommittedOffset = sourcePartition.CommittedOffset;
partition.PartitionIndex = sourcePartition.PartitionIndex;
partition.ErrorCode = sourcePartition.ErrorCode;
topic.Partitions.push_back(partition);
}
response->Topics.push_back(topic);
}
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr

switch (ReadStep) {
case WAIT_JOIN_GROUP: { // join first time
if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType);
if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
Expand Down Expand Up @@ -156,8 +156,8 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
return;
}

if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType);
if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
Expand Down Expand Up @@ -361,9 +361,9 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
for (auto part: finalPartitionsToRead) {
KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part);
topicPartition.Partitions.push_back(part);
assignment.AssignedPartitions.push_back(topicPartition);
partitions.ReadingNow.emplace(part);
}
assignment.AssignedPartitions.push_back(topicPartition);
}

return assignment;
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,20 @@ void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion
}

void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
auto useVarintSize = _version > 3;
_version = ASSIGNMENT_VERSION;

if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment";
}

_writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1);
if (useVarintSize) {
_writable.writeUnsignedVarint(Size(_version) + 1);
} else {
TKafkaInt32 size = Size(_version);
_writable << size;
}

_writable << _version;
NPrivate::TWriteCollector _collector;
NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions);
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kafka_proxy/kafka_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4517,7 +4517,12 @@ i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const {
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
_collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
}
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
auto useVarintSize = _version > 3;
if (useVarintSize) {
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
} else {
return _collector.Size + sizeof(TKafkaInt32);
}
}


Expand Down