Skip to content

Commit c73d194

Browse files
committed
fix
1 parent 65aaea5 commit c73d194

File tree

2 files changed

+20
-56
lines changed

2 files changed

+20
-56
lines changed

ydb/core/kafka_proxy/kafka_consumer_protocol.cpp

+18-54
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
namespace NKafka {
55

6-
static constexpr TKafkaUint16 ASSIGNMENT_VERSION = 3;
7-
86
//
97
// TConsumerProtocolSubscription
108
//
@@ -115,67 +113,36 @@ i32 TConsumerProtocolSubscription::TopicPartition::Size(TKafkaVersion _version)
115113
//
116114
// TConsumerProtocolAssignment
117115
//
116+
117+
const TConsumerProtocolSubscription::TopicPartition::TopicPartition::TopicMeta::Type TConsumerProtocolAssignment::TopicPartition::TopicPartition::TopicMeta::Default = {""};
118118
TConsumerProtocolAssignment::TConsumerProtocolAssignment()
119119
{}
120120

121-
void TConsumerProtocolAssignment::Read(TKafkaReadable& r, TKafkaVersion v) {
122-
bool useVarintSize = (v > 3);
123-
124-
ui32 size;
125-
if (useVarintSize) {
126-
size = r.readUnsignedVarint<ui32>();
127-
} else {
128-
TKafkaInt32 s;
129-
r >> s;
130-
size = static_cast<ui32>(s);
131-
}
132-
Y_UNUSED(size);
133-
134-
TKafkaVersion assignmentVersion;
135-
r >> assignmentVersion;
136-
137-
if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min,
138-
MessageMeta::PresentVersions.Max>(assignmentVersion))
139-
{
140-
ythrow yexception() << "Can't read version " << assignmentVersion
141-
<< " of TConsumerProtocolAssignment";
121+
void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
122+
if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
123+
ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolAssignment";
142124
}
125+
NPrivate::Read<AssignedPartitionsMeta>(_readable, _version, AssignedPartitions);
126+
NPrivate::Read<UserDataMeta>(_readable, _version, UserData);
143127

144-
NPrivate::Read<AssignedPartitionsMeta>(r, assignmentVersion, AssignedPartitions);
145-
NPrivate::Read<UserDataMeta>(r, assignmentVersion, UserData);
146-
147-
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min,
148-
MessageMeta::FlexibleVersions.Max>(assignmentVersion))
149-
{
150-
ui32 numTaggedFields = r.readUnsignedVarint<ui32>();
151-
for (ui32 i = 0; i < numTaggedFields; ++i) {
152-
ui32 tag = r.readUnsignedVarint<ui32>();
153-
Y_UNUSED(tag);
154-
155-
ui32 tagSize = r.readUnsignedVarint<ui32>();
156-
r.skip(tagSize);
157-
158-
r.skip(tagSize);
128+
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
129+
ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
130+
for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
131+
ui32 _tag = _readable.readUnsignedVarint<ui32>();
132+
ui32 _size = _readable.readUnsignedVarint<ui32>();
133+
switch (_tag) {
134+
default:
135+
_readable.skip(_size); // skip unknown tag
136+
break;
137+
}
159138
}
160139
}
161140
}
162141

163142
void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
164-
auto useVarintSize = _version > 3;
165-
_version = ASSIGNMENT_VERSION;
166-
167143
if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
168144
ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment";
169145
}
170-
171-
if (useVarintSize) {
172-
_writable.writeUnsignedVarint(Size(_version) + 1);
173-
} else {
174-
TKafkaInt32 size = Size(_version);
175-
_writable << size;
176-
}
177-
178-
_writable << _version;
179146
NPrivate::TWriteCollector _collector;
180147
NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions);
181148
NPrivate::Write<UserDataMeta>(_collector, _writable, _version, UserData);
@@ -186,15 +153,14 @@ void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion
186153
}
187154

188155
i32 TConsumerProtocolAssignment::Size(TKafkaVersion _version) const {
189-
_version = ASSIGNMENT_VERSION;
190156
NPrivate::TSizeCollector _collector;
191157
NPrivate::Size<AssignedPartitionsMeta>(_collector, _version, AssignedPartitions);
192158
NPrivate::Size<UserDataMeta>(_collector, _version, UserData);
193159

194160
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
195161
_collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
196162
}
197-
return _collector.Size + sizeof(_version);
163+
return _collector.Size;
198164
}
199165

200166

@@ -205,8 +171,6 @@ i32 TConsumerProtocolAssignment::Size(TKafkaVersion _version) const {
205171
TConsumerProtocolAssignment::TopicPartition::TopicPartition()
206172
{}
207173

208-
const TConsumerProtocolAssignment::TopicPartition::TopicPartition::TopicMeta::Type TConsumerProtocolAssignment::TopicPartition::TopicPartition::TopicMeta::Default = {""};
209-
210174
void TConsumerProtocolAssignment::TopicPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
211175
if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
212176
ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolAssignment::TopicPartition";

ydb/core/kafka_proxy/kafka_consumer_protocol.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class TConsumerProtocolAssignment : public TMessage {
157157
typedef std::shared_ptr<TConsumerProtocolAssignment> TPtr;
158158

159159
struct MessageMeta {
160-
static constexpr TKafkaVersions PresentVersions = {0, 7};
160+
static constexpr TKafkaVersions PresentVersions = {0, 3};
161161
static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
162162
};
163163

@@ -167,7 +167,7 @@ class TConsumerProtocolAssignment : public TMessage {
167167
struct TopicPartition : public TMessage {
168168
public:
169169
struct MessageMeta {
170-
static constexpr TKafkaVersions PresentVersions = {0, 7};
170+
static constexpr TKafkaVersions PresentVersions = {0, 3};
171171
static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
172172
};
173173

0 commit comments

Comments
 (0)