Skip to content

Commit 91f8a44

Browse files
authored
Better handling of connection loss (#9993)
1 parent ea37d52 commit 91f8a44

File tree

8 files changed

+71
-51
lines changed

8 files changed

+71
-51
lines changed

ydb/core/change_exchange/change_sender.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,27 @@ void TChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) {
6161
}
6262
}
6363

64-
void TChangeSender::CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged) {
65-
if (partitioningChanged) {
64+
void TChangeSender::CreateSendersImpl(const TVector<ui64>& partitionIds) {
65+
if (partitionIds) {
6666
CreateMissingSenders(partitionIds);
6767
} else {
68-
RecreateSenders(GonePartitions);
68+
RecreateSenders(std::exchange(GonePartitions, {}));
6969
}
7070

71-
GonePartitions.clear();
72-
7371
if (!Enqueued || !RequestRecords()) {
7472
SendRecords();
7573
}
7674
}
7775

76+
void TChangeSender::CreateSenders(const TVector<ui64>& partitionIds) {
77+
Y_ABORT_UNLESS(partitionIds);
78+
CreateSendersImpl(partitionIds);
79+
}
80+
81+
void TChangeSender::CreateSenders() {
82+
CreateSendersImpl({});
83+
}
84+
7885
void TChangeSender::KillSenders() {
7986
for (const auto& [_, sender] : std::exchange(Senders, {})) {
8087
if (sender.ActorId) {
@@ -303,6 +310,7 @@ void TChangeSender::OnGone(ui64 partitionId) {
303310
if (it->second.Ready) {
304311
--ReadySenders;
305312
}
313+
306314
Senders.erase(it);
307315
GonePartitions.push_back(partitionId);
308316

ydb/core/change_exchange/change_sender.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ class TChangeSender {
113113
THashSet<ui64> CompletedPartitions;
114114
};
115115

116+
void CreateSendersImpl(const TVector<ui64>& partitionIds);
116117
void LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId);
117118
void RegisterSender(ui64 partitionId);
118119
void CreateMissingSenders(const TVector<ui64>& partitionIds);
@@ -150,7 +151,8 @@ class TChangeSender {
150151
return ChangeServer;
151152
}
152153

153-
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true);
154+
void CreateSenders(const TVector<ui64>& partitionIds); // creates senders after partitioning changes
155+
void CreateSenders(); // creates senders after connection loss
154156
void KillSenders();
155157

156158
void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records);

ydb/core/change_exchange/util.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#include "util.h"
2+
3+
namespace NKikimr::NChangeExchange {
4+
5+
TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
6+
TVector<ui64> result(::Reserve(partitions.size()));
7+
8+
for (const auto& partition : partitions) {
9+
result.push_back(partition.ShardId);
10+
}
11+
12+
return result;
13+
}
14+
15+
}

ydb/core/change_exchange/util.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <ydb/core/scheme/scheme_tabledefs.h>
4+
5+
namespace NKikimr::NChangeExchange {
6+
7+
TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions);
8+
9+
}

ydb/core/change_exchange/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SRCS(
66
change_sender.cpp
77
change_sender_monitoring.cpp
88
resolve_partition.cpp
9+
util.cpp
910
)
1011

1112
GENERATE_ENUM_SERIALIZATION(change_record.h)

ydb/core/tx/datashard/change_sender_cdc_stream.cpp

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <ydb/core/change_exchange/change_sender.h>
88
#include <ydb/core/change_exchange/change_sender_monitoring.h>
9+
#include <ydb/core/change_exchange/util.h>
910
#include <ydb/core/persqueue/writer/source_id_encoding.h>
1011
#include <ydb/core/persqueue/writer/writer.h>
1112
#include <ydb/core/tx/scheme_cache/helpers.h>
@@ -440,16 +441,6 @@ class TCdcChangeSenderMain
440441
return false;
441442
}
442443

443-
static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) {
444-
TVector<ui64> result(Reserve(partitions.size()));
445-
446-
for (const auto& partition : partitions) {
447-
result.push_back(partition.ShardId);
448-
}
449-
450-
return result;
451-
}
452-
453444
/// ResolveCdcStream
454445

455446
void ResolveCdcStream() {
@@ -571,6 +562,14 @@ class TCdcChangeSenderMain
571562
return;
572563
}
573564

565+
const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
566+
if (TopicVersion && TopicVersion == topicVersion) {
567+
CreateSenders();
568+
return Become(&TThis::StateMain);
569+
}
570+
571+
TopicVersion = topicVersion;
572+
574573
const auto& pqDesc = entry.PQGroupInfo->Description;
575574
const auto& pqConfig = pqDesc.GetPQTabletConfig();
576575

@@ -579,12 +578,7 @@ class TCdcChangeSenderMain
579578
PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId());
580579
}
581580

582-
const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
583-
const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
584-
TopicVersion = topicVersion;
585-
586-
auto topicAutoPartitioning = ::NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED != pqConfig.GetPartitionStrategy().GetPartitionStrategyType();
587-
581+
const bool topicAutoPartitioning = IsTopicAutoPartitioningEnabled(pqConfig.GetPartitionStrategy().GetPartitionStrategyType());
588582
Y_ABORT_UNLESS(topicAutoPartitioning || entry.PQGroupInfo->Schema);
589583
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema);
590584
Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning);
@@ -598,10 +592,14 @@ class TCdcChangeSenderMain
598592
SetPartitionResolver(new TMd5PartitionResolver(KeyDesc->GetPartitions().size()));
599593
}
600594

601-
CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged);
595+
CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning));
602596
Become(&TThis::StateMain);
603597
}
604598

599+
static bool IsTopicAutoPartitioningEnabled(NKikimrPQ::TPQTabletConfig::TPartitionStrategyType strategy) {
600+
return strategy != NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
601+
}
602+
605603
/// Main
606604

607605
STATEFN(StateMain) {

ydb/core/tx/datashard/change_sender_table_base.h

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "change_exchange_helpers.h"
44

5+
#include <ydb/core/change_exchange/util.h>
56
#include <ydb/core/tablet_flat/flat_row_state.h>
67
#include <ydb/core/tx/scheme_cache/helpers.h>
78

@@ -151,6 +152,11 @@ class TResolveTargetTableState
151152
return;
152153
}
153154

155+
if (AsDerived()->TargetTableVersion && AsDerived()->TargetTableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
156+
AsDerived()->CreateSenders();
157+
return AsDerived()->Serve();
158+
}
159+
154160
AsDerived()->TagMap.clear();
155161
TVector<NScheme::TTypeInfo> keyColumnTypes;
156162

@@ -181,7 +187,6 @@ class TResolveTargetTableState
181187
);
182188

183189
AsDerived()->SetPartitionResolver(CreateDefaultPartitionResolver(*AsDerived()->KeyDesc.Get()));
184-
185190
AsDerived()->NextState(TStateTag{});
186191
}
187192
};
@@ -245,24 +250,12 @@ class TResolveKeysState
245250
return AsDerived()->Retry();
246251
}
247252

248-
const bool versionChanged = !AsDerived()->TargetTableVersion || AsDerived()->TargetTableVersion != entry.GeneralVersion;
249253
AsDerived()->TargetTableVersion = entry.GeneralVersion;
250-
251254
AsDerived()->KeyDesc = std::move(entry.KeyDescription);
252-
AsDerived()->CreateSenders(MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()), versionChanged);
255+
AsDerived()->CreateSenders(NChangeExchange::MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()));
253256

254257
AsDerived()->NextState(TStateTag{});
255258
}
256-
257-
static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
258-
TVector<ui64> result(Reserve(partitions.size()));
259-
260-
for (const auto& partition : partitions) {
261-
result.push_back(partition.ShardId); // partition = shard
262-
}
263-
264-
return result;
265-
}
266259
};
267260

268261
template <typename TDerived>

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <ydb/core/base/tablet_pipecache.h>
66
#include <ydb/core/change_exchange/change_sender.h>
7+
#include <ydb/core/change_exchange/util.h>
78
#include <ydb/core/tablet_flat/flat_row_eggs.h>
89
#include <ydb/core/tx/datashard/datashard.h>
910
#include <ydb/core/tx/scheme_cache/helpers.h>
@@ -268,16 +269,6 @@ class TLocalTableWriter
268269
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected);
269270
}
270271

271-
static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
272-
TVector<ui64> result(::Reserve(partitions.size()));
273-
274-
for (const auto& partition : partitions) {
275-
result.push_back(partition.ShardId);
276-
}
277-
278-
return result;
279-
}
280-
281272
void Registered(TActorSystem*, const TActorId&) override {
282273
ChangeServer = SelfId();
283274
}
@@ -338,6 +329,12 @@ class TLocalTableWriter
338329
return;
339330
}
340331

332+
if (TableVersion && TableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
333+
Y_ABORT_UNLESS(Initialized);
334+
Resolving = false;
335+
return CreateSenders();
336+
}
337+
341338
auto schema = MakeIntrusive<TLightweightSchema>();
342339
if (entry.Self && entry.Self->Info.HasVersion()) {
343340
schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion();
@@ -370,7 +367,6 @@ class TLocalTableWriter
370367
);
371368

372369
TChangeSender::SetPartitionResolver(CreateResolverFn(*KeyDesc.Get()));
373-
374370
ResolveKeys();
375371
}
376372

@@ -408,11 +404,9 @@ class TLocalTableWriter
408404
return LogWarnAndRetry("Empty partitions");
409405
}
410406

411-
const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion;
412407
TableVersion = entry.GeneralVersion;
413-
414408
KeyDesc = std::move(entry.KeyDescription);
415-
CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
409+
CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));
416410

417411
if (!Initialized) {
418412
Send(Worker, new TEvWorker::TEvHandshake());

0 commit comments

Comments
 (0)