Skip to content

Commit 5c26443

Browse files
authored
Refactoring - optimize parameters of CreatePartitionWriter (#669)
1 parent 0e45751 commit 5c26443

13 files changed

+521
-525
lines changed

ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp

+8-13
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,7 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu
132132
if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) {
133133
topic.Status = OK;
134134
topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL;
135-
for(auto& p : info.PQGroupInfo->Description.GetPartitions()) {
136-
topic.partitions[p.GetPartitionId()] = p.GetTabletId();
137-
}
135+
topic.PartitionChooser = CreatePartitionChooser(info.PQGroupInfo->Description);
138136
} else {
139137
KAFKA_LOG_W("Produce actor: Unauthorized PRODUCE to topic '" << topicPath << "'");
140138
topic.Status = UNAUTHORIZED;
@@ -178,7 +176,7 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TP
178176
auto& topicInfo = Topics[path];
179177
topicInfo.Status = NOT_FOUND;
180178
topicInfo.ExpirationTime = ctx.Now() + TOPIC_NOT_FOUND_EXPIRATION_INTERVAL;
181-
topicInfo.partitions.clear();
179+
topicInfo.PartitionChooser.reset();
182180
}
183181

184182
void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) {
@@ -192,10 +190,7 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TP
192190
}
193191
topic.Status = OK;
194192
topic.ExpirationTime = ctx.Now() + TOPIC_OK_EXPIRATION_INTERVAL;
195-
topic.partitions.clear();
196-
for (auto& p : e->Result->GetPathDescription().GetPersQueueGroup().GetPartitions()) {
197-
topic.partitions[p.GetPartitionId()] = p.GetTabletId();
198-
}
193+
topic.PartitionChooser = CreatePartitionChooser(e->Result->GetPathDescription().GetPersQueueGroup());
199194
}
200195

201196
void TKafkaProduceActor::Handle(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx) {
@@ -578,17 +573,17 @@ std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::Partit
578573
return { OK, writerInfo.ActorId };
579574
}
580575

581-
auto& partitions = topicInfo.partitions;
582-
auto pit = partitions.find(partitionId);
583-
if (pit == partitions.end()) {
576+
auto* partition = topicInfo.PartitionChooser->GetPartition(partitionId);
577+
if (!partition) {
584578
return { NOT_FOUND, TActorId{} };
585579
}
586580

587-
auto tabletId = pit->second;
588581
TPartitionWriterOpts opts;
589582
opts.WithDeduplication(false)
583+
.WithSourceId(SourceId)
584+
.WithTopicPath(topicPath)
590585
.WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext);
591-
auto* writerActor = CreatePartitionWriter(SelfId(), topicPath, tabletId, partitionId, {/*expectedGeneration*/}, SourceId, opts);
586+
auto* writerActor = CreatePartitionWriter(SelfId(), partition->TabletId, partitionId, opts);
592587

593588
auto& writerInfo = partitionWriters[partitionId];
594589
writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor);

ydb/core/kafka_proxy/actors/kafka_produce_actor.h

+1-3
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
181181
TInstant ExpirationTime;
182182

183183
NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
184-
185-
// partitioId -> tabletId
186-
std::unordered_map<ui32, ui64> partitions;
184+
std::shared_ptr<IPartitionChooser> PartitionChooser;
187185
};
188186
std::map<TString, TTopicInfo> Topics;
189187

ydb/core/persqueue/writer/partition_chooser.h

+19
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,25 @@ struct TEvPartitionChooser {
4848

4949
};
5050

51+
class IPartitionChooser {
52+
public:
53+
struct TPartitionInfo {
54+
TPartitionInfo(ui32 partitionId, ui64 tabletId)
55+
: PartitionId(partitionId)
56+
, TabletId(tabletId) {}
57+
58+
ui32 PartitionId;
59+
ui64 TabletId;
60+
};
61+
62+
virtual ~IPartitionChooser() = default;
63+
64+
virtual const TPartitionInfo* GetPartition(const TString& sourceId) const = 0;
65+
virtual const TPartitionInfo* GetPartition(ui32 partitionId) const = 0;
66+
};
67+
68+
std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash = false);
69+
5170
NActors::IActor* CreatePartitionChooserActor(TActorId parentId,
5271
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
5372
NPersQueue::TTopicConverterPtr& fullConverter,

0 commit comments

Comments
 (0)