Skip to content

Commit 604788d

Browse files
authored
24-3: Pre-serialized bootstrap config (#9342)
1 parent 993c21c commit 604788d

12 files changed

+88
-32
lines changed

ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class TMessageBusServerPersQueueRequestTestBase: public TTestBase {
169169
static int version = 0;
170170
++version;
171171

172-
THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
172+
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
173173
for (size_t i : partitions) {
174174
request->Record.MutableTabletConfig()->AddPartitionIds(i);
175175
}

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -2595,7 +2595,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
25952595
for (auto& [tabletId, t] : topicTxs) {
25962596
auto& transaction = t.tx;
25972597

2598-
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
2598+
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();
25992599

26002600
if (t.hasWrite && writeId.Defined()) {
26012601
auto* w = transaction.MutableWriteId();

ydb/core/persqueue/events/global.h

+10-2
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,15 @@ struct TEvPersQueue {
7373
TEvResponse() {}
7474
};
7575

76-
struct TEvUpdateConfig: public TEventPB<TEvUpdateConfig,
76+
struct TEvUpdateConfig: public TEventPreSerializedPB<TEvUpdateConfig,
7777
NKikimrPQ::TUpdateConfig, EvUpdateConfig> {
7878
TEvUpdateConfig() {}
7979
};
8080

81+
struct TEvUpdateConfigBuilder: public TEvUpdateConfig {
82+
using TBase::Record;
83+
};
84+
8185
struct TEvUpdateBalancerConfig: public TEventPB<TEvUpdateBalancerConfig,
8286
NKikimrPQ::TUpdateBalancerConfig, EvUpdateBalancerConfig> {
8387
TEvUpdateBalancerConfig() {}
@@ -245,7 +249,11 @@ struct TEvPersQueue {
245249
{}
246250
};
247251

248-
struct TEvProposeTransaction : public TEventPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
252+
struct TEvProposeTransaction : public TEventPreSerializedPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
253+
};
254+
255+
struct TEvProposeTransactionBuilder: public TEvProposeTransaction {
256+
using TBase::Record;
249257
};
250258

251259
struct TEvProposeTransactionResult : public TEventPB<TEvProposeTransactionResult, NKikimrPQ::TEvProposeTransactionResult, EvProposeTransactionResult> {

ydb/core/persqueue/partition.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo
899899

900900
void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
901901
{
902-
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
902+
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
903903
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
904904
Y_ABORT_UNLESS(event.HasData());
905905
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
@@ -1990,7 +1990,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
19901990
return EProcessResult::Continue;
19911991
}
19921992
t->Predicate.ConstructInPlace(true);
1993-
return PreProcessImmediateTx(t->ProposeTransaction->Record);
1993+
return PreProcessImmediateTx(t->ProposeTransaction->GetRecord());
19941994

19951995
} else if (t->Tx) { // Distributed TX
19961996
if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined.
@@ -2573,7 +2573,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
25732573
void TPartition::ExecImmediateTx(TTransaction& t)
25742574
{
25752575
--ImmediateTxCount;
2576-
auto& record = t.ProposeTransaction->Record;
2576+
const auto& record = t.ProposeTransaction->GetRecord();
25772577
Y_ABORT_UNLESS(record.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
25782578
Y_ABORT_UNLESS(record.HasData());
25792579

@@ -2586,7 +2586,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
25862586
t.Message);
25872587
return;
25882588
}
2589-
for (auto& operation : record.GetData().GetOperations()) {
2589+
for (const auto& operation : record.GetData().GetOperations()) {
25902590
if (!operation.HasBegin() || !operation.HasEnd() || !operation.HasConsumer()) {
25912591
continue; //Write operation - handled separately via WriteInfo
25922592
}

ydb/core/persqueue/partition.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ struct TTransaction {
7373
: ProposeTransaction(proposeTx)
7474
, State(ECommitState::Committed)
7575
{
76-
if (proposeTx->Record.HasSupportivePartitionActor()) {
77-
SupportivePartitionActor = ActorIdFromProto(proposeTx->Record.GetSupportivePartitionActor());
76+
const auto& record = proposeTx->GetRecord();
77+
if (record.HasSupportivePartitionActor()) {
78+
SupportivePartitionActor = ActorIdFromProto(record.GetSupportivePartitionActor());
7879
}
7980
Y_ABORT_UNLESS(ProposeTransaction);
8081
}

ydb/core/persqueue/pq_impl.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -1631,7 +1631,7 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
16311631

16321632
void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
16331633
{
1634-
auto& record = ev->Record;
1634+
const auto& record = ev->GetRecord();
16351635

16361636
int oldConfigVersion = Config.HasVersion() ? Config.GetVersion() : -1;
16371637
int newConfigVersion = NewConfig.HasVersion() ? NewConfig.GetVersion() : oldConfigVersion;
@@ -3266,9 +3266,9 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co
32663266

32673267
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
32683268
{
3269-
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.ShortDebugString());
3269+
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
3270+
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString());
32703271

3271-
NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
32723272
switch (event.GetTxBodyCase()) {
32733273
case NKikimrPQ::TEvProposeTransaction::kData:
32743274
HandleDataTransaction(ev->Release(), ctx);
@@ -3323,7 +3323,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
33233323
void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
33243324
const TActorContext& ctx)
33253325
{
3326-
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
3326+
NKikimrPQ::TEvProposeTransaction& event = *ev->MutableRecord();
33273327
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
33283328
Y_ABORT_UNLESS(event.HasData());
33293329
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
@@ -3434,7 +3434,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
34343434
void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
34353435
const TActorContext& ctx)
34363436
{
3437-
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
3437+
const NKikimrPQ::TEvProposeTransaction& event = ev->GetRecord();
34383438
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig);
34393439
Y_ABORT_UNLESS(event.HasConfig());
34403440

@@ -3712,7 +3712,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
37123712
const auto front = std::move(EvProposeTransactionQueue.front());
37133713
EvProposeTransactionQueue.pop_front();
37143714

3715-
const NKikimrPQ::TEvProposeTransaction& event = front->Record;
3715+
const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord();
37163716
TDistributedTransaction& tx = Txs[event.GetTxId()];
37173717

37183718
switch (tx.State) {

ydb/core/persqueue/ut/common/pq_ut_common.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
4444
try {
4545
runtime.ResetScheduledCount();
4646

47-
THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
47+
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
4848
for (ui32 i = 0; i < parameters.partitions; ++i) {
4949
request->Record.MutableTabletConfig()->AddPartitionIds(i);
5050
}

ydb/core/persqueue/ut/partition_ut.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition,
882882
bool immediate,
883883
ui64 txId)
884884
{
885-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
885+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
886886

887887
ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
888888
auto* body = event->Record.MutableData();
@@ -1606,7 +1606,7 @@ ui64 TPartitionTxTestHelper::MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected)
16061606
ui64 TPartitionTxTestHelper::MakeAndSendImmediateTx(const TSrcIdMap& srcIdsAffected) {
16071607
auto actIter = AddWriteTxImpl(srcIdsAffected, NextActId++, 0);
16081608

1609-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
1609+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
16101610

16111611
ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
16121612
auto* body = event->Record.MutableData();

ydb/core/persqueue/ut/pqtablet_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ void TPQTabletFixture::SendToPipe(const TActorId& sender,
294294

295295
void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
296296
{
297-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
297+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
298298
THashSet<ui32> partitions;
299299

300300
ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());

ydb/core/persqueue/ut/user_action_processor_ut.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,
647647
bool immediate,
648648
ui64 txId)
649649
{
650-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
650+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
651651

652652
ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
653653
auto* body = event->Record.MutableTxBody();
@@ -665,7 +665,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,
665665

666666
void TUserActionProcessorFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
667667
{
668-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
668+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
669669

670670
//
671671
// Source

ydb/core/tx/schemeshard/schemeshard__operation_common.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -695,15 +695,15 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
695695
const TTopicTabletInfo& pqShard,
696696
const TString& topicName,
697697
const TString& topicPath,
698-
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
698+
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
699699
const TString& cloudId,
700700
const TString& folderId,
701701
const TString& databaseId,
702702
const TString& databasePath,
703703
TTxState::ETxType txType,
704704
const TOperationContext& context)
705705
{
706-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
706+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
707707
event->Record.SetTxId(ui64(txId));
708708
ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor());
709709

@@ -719,7 +719,7 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
719719
databasePath);
720720
if (bootstrapConfig) {
721721
Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup);
722-
event->Record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
722+
event->PreSerializedData += bootstrapConfig->GetPreSerializedProposeTransaction();
723723
}
724724

725725
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
@@ -734,15 +734,15 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
734734
const TTopicTabletInfo& pqShard,
735735
const TString& topicName,
736736
const TString& topicPath,
737-
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
737+
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
738738
const TString& cloudId,
739739
const TString& folderId,
740740
const TString& databaseId,
741741
const TString& databasePath,
742742
TTxState::ETxType txType,
743743
const TOperationContext& context)
744744
{
745-
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfig>();
745+
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
746746
event->Record.SetTxId(ui64(txId));
747747

748748
MakePQTabletConfig(context,
@@ -757,7 +757,7 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
757757
databasePath);
758758
if (bootstrapConfig) {
759759
Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup);
760-
event->Record.MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
760+
event->PreSerializedData += bootstrapConfig->GetPreSerializedUpdateConfig();
761761
}
762762

763763
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,

ydb/core/tx/schemeshard/schemeshard__operation_common.h

+51-4
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,54 @@ class TDone: public TSubOperationState {
552552

553553
namespace NPQState {
554554

555+
class TBootstrapConfigWrapper: public NKikimrPQ::TBootstrapConfig {
556+
struct TSerializedProposeTransaction {
557+
TString Value;
558+
559+
static TSerializedProposeTransaction Serialize(const NKikimrPQ::TBootstrapConfig& value) {
560+
NKikimrPQ::TEvProposeTransaction record;
561+
record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(value);
562+
return {record.SerializeAsString()};
563+
}
564+
};
565+
566+
struct TSerializedUpdateConfig {
567+
TString Value;
568+
569+
static TSerializedUpdateConfig Serialize(const NKikimrPQ::TBootstrapConfig& value) {
570+
NKikimrPQ::TUpdateConfig record;
571+
record.MutableBootstrapConfig()->CopyFrom(value);
572+
return {record.SerializeAsString()};
573+
}
574+
};
575+
576+
mutable std::optional<std::variant<
577+
TSerializedProposeTransaction,
578+
TSerializedUpdateConfig
579+
>> PreSerialized;
580+
581+
template <typename T>
582+
const TString& Get() const {
583+
if (!PreSerialized) {
584+
PreSerialized.emplace(T::Serialize(*this));
585+
}
586+
587+
const auto* value = std::get_if<T>(&PreSerialized.value());
588+
Y_ABORT_UNLESS(value);
589+
590+
return value->Value;
591+
}
592+
593+
public:
594+
const TString& GetPreSerializedProposeTransaction() const {
595+
return Get<TSerializedProposeTransaction>();
596+
}
597+
598+
const TString& GetPreSerializedUpdateConfig() const {
599+
return Get<TSerializedUpdateConfig>();
600+
}
601+
};
602+
555603
class TConfigureParts: public TSubOperationState {
556604
private:
557605
TOperationId OperationId;
@@ -627,7 +675,6 @@ class TConfigureParts: public TSubOperationState {
627675
return false;
628676
}
629677

630-
631678
bool ProgressState(TOperationContext& context) override {
632679
TTabletId ssId = context.SS->SelfTabletId();
633680

@@ -669,7 +716,7 @@ class TConfigureParts: public TSubOperationState {
669716
TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString();
670717
auto topicPath = TPath::Init(txState->TargetPathId, context.SS);
671718

672-
std::optional<NKikimrPQ::TBootstrapConfig> bootstrapConfig;
719+
std::optional<TBootstrapConfigWrapper> bootstrapConfig;
673720
if (txState->TxType == TTxState::TxCreatePQGroup && topicPath.Parent().IsCdcStream()) {
674721
bootstrapConfig.emplace();
675722

@@ -918,7 +965,7 @@ class TConfigureParts: public TSubOperationState {
918965
const TTopicTabletInfo& pqShard,
919966
const TString& topicName,
920967
const TString& topicPath,
921-
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
968+
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
922969
const TString& cloudId,
923970
const TString& folderId,
924971
const TString& databaseId,
@@ -931,7 +978,7 @@ class TConfigureParts: public TSubOperationState {
931978
const TTopicTabletInfo& pqShard,
932979
const TString& topicName,
933980
const TString& topicPath,
934-
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
981+
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
935982
const TString& cloudId,
936983
const TString& folderId,
937984
const TString& databaseId,

0 commit comments

Comments
 (0)