Skip to content

Commit f5332ff

Browse files
authored
Merge 1d6d29f into a3bc9a0
2 parents a3bc9a0 + 1d6d29f commit f5332ff

11 files changed

+108
-15
lines changed

ydb/core/protos/config.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,6 +1519,9 @@ message TSchemeShardConfig {
15191519
optional uint32 StatsMaxExecuteMs = 3 [default = 10];
15201520

15211521
repeated TInFlightCounterConfig InFlightCounterConfig = 4;
1522+
1523+
// number of shards per table
1524+
optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 10];
15221525
}
15231526

15241527
message TCompactionConfig {

ydb/core/protos/counters_datashard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ enum ESimpleCounters {
2727
COUNTER_CHANGE_RECORDS_REQUESTED = 17 [(CounterOpts) = {Name: "ChangeRecordsRequested"}];
2828
COUNTER_CHANGE_DELIVERY_LAG = 18 [(CounterOpts) = {Name: "ChangeDeliveryLag"}];
2929
COUNTER_CHANGE_DATA_LAG = 19 [(CounterOpts) = {Name: "ChangeDataLag"}];
30+
COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY = 20 [(CounterOpts) = {Name: "ChangeQueueReservedCapacity"}];
3031
}
3132

3233
enum ECumulativeCounters {

ydb/core/tx/datashard/cdc_stream_scan.cpp

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,26 @@
33
#include "datashard_impl.h"
44

55
#include <ydb/core/protos/datashard_config.pb.h>
6+
#include <ydb/core/protos/tx_datashard.pb.h>
67

78
#include <util/generic/maybe.h>
89
#include <util/string/builder.h>
910

10-
#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
11-
#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
12-
#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
11+
#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream)
12+
#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream)
13+
#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream)
1314

1415
namespace NKikimr::NDataShard {
1516

1617
using namespace NActors;
1718
using namespace NTable;
1819
using namespace NTabletFlatExecutor;
1920

21+
void TCdcStreamScanManager::TStats::Serialize(NKikimrTxDataShard::TEvCdcStreamScanResponse_TStats& proto) const {
22+
proto.SetRowsProcessed(RowsProcessed);
23+
proto.SetBytesProcessed(BytesProcessed);
24+
}
25+
2026
void TCdcStreamScanManager::Reset() {
2127
Scans.clear();
2228
TxIdToPathId.clear();
@@ -95,6 +101,7 @@ void TCdcStreamScanManager::Complete(const TPathId& streamPathId) {
95101
return;
96102
}
97103

104+
CompletedScans[streamPathId] = it->second.Stats;
98105
TxIdToPathId.erase(it->second.TxId);
99106
Scans.erase(it);
100107
}
@@ -104,6 +111,15 @@ void TCdcStreamScanManager::Complete(ui64 txId) {
104111
Complete(TxIdToPathId.at(txId));
105112
}
106113

114+
bool TCdcStreamScanManager::IsCompleted(const TPathId& streamPathId) const {
115+
return CompletedScans.contains(streamPathId);
116+
}
117+
118+
const TCdcStreamScanManager::TStats& TCdcStreamScanManager::GetCompletedStats(const TPathId& streamPathId) const {
119+
Y_ABORT_UNLESS(CompletedScans.contains(streamPathId));
120+
return CompletedScans.at(streamPathId);
121+
}
122+
107123
TCdcStreamScanManager::TScanInfo* TCdcStreamScanManager::Get(const TPathId& streamPathId) {
108124
return Scans.FindPtr(streamPathId);
109125
}
@@ -203,6 +219,10 @@ class TDataShard::TTxCdcStreamScanProgress
203219
return row;
204220
}
205221

222+
ui64 TabletID() const {
223+
return Self->TabletID();
224+
}
225+
206226
public:
207227
explicit TTxCdcStreamScanProgress(TDataShard* self, TDataShard::TEvPrivate::TEvCdcStreamScanProgress::TPtr ev)
208228
: TBase(self)
@@ -452,8 +472,7 @@ class TCdcStreamScan: public IActorCallback, public IScan {
452472
PathIdFromPathId(StreamPathId, response->Record.MutableStreamPathId());
453473
response->Record.SetStatus(status);
454474
response->Record.SetErrorDescription(error);
455-
response->Record.MutableStats()->SetRowsProcessed(Stats.RowsProcessed);
456-
response->Record.MutableStats()->SetBytesProcessed(Stats.BytesProcessed);
475+
Stats.Serialize(*response->Record.MutableStats());
457476

458477
Send(ReplyTo, std::move(response));
459478
}
@@ -568,14 +587,17 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
568587
TEvDataShard::TEvCdcStreamScanRequest::TPtr Request;
569588
THolder<IEventHandle> Response; // response to sender or forward to scanner
570589

571-
THolder<IEventHandle> MakeResponse(const TActorContext& ctx,
572-
NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {}) const
573-
{
590+
template <typename... Args>
591+
THolder<IEventHandle> MakeResponse(const TActorContext& ctx, Args&&... args) const {
574592
return MakeHolder<IEventHandle>(Request->Sender, ctx.SelfID, new TEvDataShard::TEvCdcStreamScanResponse(
575-
Request->Get()->Record, Self->TabletID(), status, error
593+
Request->Get()->Record, Self->TabletID(), std::forward<Args>(args)...
576594
));
577595
}
578596

597+
ui64 TabletID() const {
598+
return Self->TabletID();
599+
}
600+
579601
public:
580602
explicit TTxCdcStreamScanRun(TDataShard* self, TEvDataShard::TEvCdcStreamScanRequest::TPtr ev)
581603
: TBase(self)
@@ -635,6 +657,11 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
635657
} else if (info->ScanId) {
636658
return true; // nop, scan actor will report state when it starts
637659
}
660+
} else if (Self->CdcStreamScanManager.IsCompleted(streamPathId)) {
661+
Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE);
662+
Self->CdcStreamScanManager.GetCompletedStats(streamPathId).Serialize(
663+
*Response->Get<TEvDataShard::TEvCdcStreamScanResponse>()->Record.MutableStats());
664+
return true;
638665
} else if (Self->CdcStreamScanManager.Size()) {
639666
Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED);
640667
return true;
@@ -714,8 +741,7 @@ void TDataShard::Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const T
714741

715742
void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx) {
716743
if (!CdcStreamScanManager.Has(ev->Get()->TxId)) {
717-
LOG_W("Unknown cdc stream scan actor registered"
718-
<< ": at: " << TabletID());
744+
LOG_W("Unknown cdc stream scan actor registered");
719745
return;
720746
}
721747

ydb/core/tx/datashard/cdc_stream_scan.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,19 @@
99
#include <util/generic/hash.h>
1010
#include <util/generic/maybe.h>
1111

12+
namespace NKikimrTxDataShard {
13+
class TEvCdcStreamScanResponse_TStats;
14+
}
15+
1216
namespace NKikimr::NDataShard {
1317

1418
class TCdcStreamScanManager {
1519
public:
1620
struct TStats {
1721
ui64 RowsProcessed = 0;
1822
ui64 BytesProcessed = 0;
23+
24+
void Serialize(NKikimrTxDataShard::TEvCdcStreamScanResponse_TStats& proto) const;
1925
};
2026

2127
private:
@@ -39,6 +45,8 @@ class TCdcStreamScanManager {
3945

4046
void Complete(const TPathId& streamPathId);
4147
void Complete(ui64 txId);
48+
bool IsCompleted(const TPathId& streamPathId) const;
49+
const TStats& GetCompletedStats(const TPathId& streamPathId) const;
4250

4351
TScanInfo* Get(const TPathId& streamPathId);
4452
const TScanInfo* Get(const TPathId& streamPathId) const;
@@ -57,6 +65,7 @@ class TCdcStreamScanManager {
5765

5866
private:
5967
THashMap<TPathId, TScanInfo> Scans;
68+
THashMap<TPathId, TStats> CompletedScans;
6069
THashMap<ui64, TPathId> TxIdToPathId;
6170
};
6271

ydb/core/tx/datashard/datashard.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
949949

950950
IncCounter(COUNTER_CHANGE_RECORDS_REMOVED);
951951
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
952+
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
952953

953954
CheckChangesQueueNoOverflow();
954955
}
@@ -992,10 +993,16 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
992993
}
993994
}
994995
}
996+
997+
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
998+
ChangeQueueReservedCapacity -= it->second;
999+
ChangeQueueReservedCapacity += records.size();
1000+
}
9951001

9961002
UpdateChangeExchangeLag(now);
9971003
IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size());
9981004
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
1005+
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
9991006

10001007
Y_ABORT_UNLESS(OutChangeSender);
10011008
Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
@@ -1030,6 +1037,8 @@ ui64 TDataShard::ReserveChangeQueueCapacity(ui32 capacity) {
10301037
const auto cookie = NextChangeQueueReservationCookie++;
10311038
ChangeQueueReservations.emplace(cookie, capacity);
10321039
ChangeQueueReservedCapacity += capacity;
1040+
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
1041+
10331042
return cookie;
10341043
}
10351044

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2634,6 +2634,49 @@ Y_UNIT_TEST_SUITE(Cdc) {
26342634
});
26352635
}
26362636

2637+
Y_UNIT_TEST(InitialScanRacyCompleteAndRequest) {
2638+
TPortManager portManager;
2639+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
2640+
.SetUseRealThreads(false)
2641+
.SetDomainName("Root")
2642+
.SetEnableChangefeedInitialScan(true)
2643+
);
2644+
2645+
auto& runtime = *server->GetRuntime();
2646+
const auto edgeActor = runtime.AllocateEdgeActor();
2647+
2648+
SetupLogging(runtime);
2649+
InitRoot(server, edgeActor);
2650+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
2651+
2652+
std::unique_ptr<IEventHandle> doneResponse;
2653+
auto blockDone = runtime.AddObserver<TEvDataShard::TEvCdcStreamScanResponse>(
2654+
[&](TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) {
2655+
if (ev->Get()->Record.GetStatus() == NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) {
2656+
doneResponse.reset(ev.Release());
2657+
}
2658+
}
2659+
);
2660+
2661+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
2662+
WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
2663+
WaitFor(runtime, [&]{ return bool(doneResponse); }, "doneResponse");
2664+
blockDone.Remove();
2665+
2666+
bool done = false;
2667+
auto waitDone = runtime.AddObserver<TEvDataShard::TEvCdcStreamScanResponse>(
2668+
[&](TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) {
2669+
if (ev->Get()->Record.GetStatus() == NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) {
2670+
done = true;
2671+
}
2672+
}
2673+
);
2674+
2675+
const auto& record = doneResponse->Get<TEvDataShard::TEvCdcStreamScanResponse>()->Record;
2676+
RebootTablet(runtime, record.GetTablePathId().GetOwnerId(), edgeActor);
2677+
WaitFor(runtime, [&]{ return done; }, "done");
2678+
}
2679+
26372680
Y_UNIT_TEST(InitialScanUpdatedRows) {
26382681
TPortManager portManager;
26392682
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class TExecuteDistributedEraseTxUnit : public TExecutionUnit {
1818

1919
public:
2020
TExecuteDistributedEraseTxUnit(TDataShard& self, TPipeline& pipeline)
21-
: TExecutionUnit(EExecutionUnitKind::ExecuteDistributedEraseTx, false, self, pipeline)
21+
: TExecutionUnit(EExecutionUnitKind::ExecuteDistributedEraseTx, true, self, pipeline)
2222
{
2323
}
2424

ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchem
172172
}
173173

174174
while (!streamInfo->PendingShards.empty()) {
175-
if (streamInfo->InProgressShards.size() >= streamInfo->MaxInProgressShards) {
175+
if (streamInfo->InProgressShards.size() >= Self->MaxCdcInitialScanShardsInFlight) {
176176
break;
177177
}
178178

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4347,6 +4347,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
43474347
ConfigureCompactionQueues(appData->CompactionConfig, ctx);
43484348
ConfigureStatsBatching(appData->SchemeShardConfig, ctx);
43494349
ConfigureStatsOperations(appData->SchemeShardConfig, ctx);
4350+
MaxCdcInitialScanShardsInFlight = appData->SchemeShardConfig.GetMaxCdcInitialScanShardsInFlight();
43504351

43514352
ConfigureBackgroundCleaningQueue(appData->BackgroundCleaningConfig, ctx);
43524353

@@ -6836,6 +6837,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
68366837
const auto& schemeShardConfig = appConfig.GetSchemeShardConfig();
68376838
ConfigureStatsBatching(schemeShardConfig, ctx);
68386839
ConfigureStatsOperations(schemeShardConfig, ctx);
6840+
MaxCdcInitialScanShardsInFlight = schemeShardConfig.GetMaxCdcInitialScanShardsInFlight();
68396841
}
68406842

68416843
if (appConfig.HasTableProfilesConfig()) {

ydb/core/tx/schemeshard/schemeshard_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,9 @@ class TSchemeShard
318318
TActorId SysPartitionStatsCollector;
319319

320320
TActorId TabletMigrator;
321+
321322
TActorId CdcStreamScanFinalizer;
323+
ui32 MaxCdcInitialScanShardsInFlight = 10;
322324

323325
TDuration StatsMaxExecuteTime;
324326
TDuration StatsBatchTimeout;

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2547,8 +2547,6 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
25472547
{}
25482548
};
25492549

2550-
static constexpr ui32 MaxInProgressShards = 10;
2551-
25522550
TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion, EState state)
25532551
: AlterVersion(version)
25542552
, Mode(mode)

0 commit comments

Comments
 (0)