Skip to content

Commit 16d8816

Browse files
authored
Merge 203d0a7 into 0e4b669
2 parents 0e4b669 + 203d0a7 commit 16d8816

23 files changed

+662
-150
lines changed

ydb/core/change_exchange/change_sender_common_ops.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ class TBaseChangeSender {
336336
Y_ABORT_UNLESS(it != Broadcasting.end());
337337

338338
auto& broadcast = it->second;
339-
if (broadcast.Partitions.contains(partitionId)) {
339+
if (broadcast.CompletedPartitions.contains(partitionId)) {
340340
return false;
341341
}
342342

ydb/core/persqueue/partition_sourcemanager.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() {
8181
}
8282

8383
bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
84-
return !SourceIdWriter.GetSourceIdsToWrite().empty();
84+
return !SourceIdWriter.GetSourceIdsToWrite().empty()
85+
|| !SourceIdWriter.GetSourceIdsToDelete().empty();
8586
}
8687

8788
void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {

ydb/core/persqueue/sourceid.h

+4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ class TSourceIdWriter {
8585
return Registrations;
8686
}
8787

88+
const THashSet<TString>& GetSourceIdsToDelete() const {
89+
return Deregistrations;
90+
}
91+
8892
template <typename... Args>
8993
void RegisterSourceId(const TString& sourceId, Args&&... args) {
9094
Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...);

ydb/core/protos/counters_datashard.proto

+1
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,5 @@ enum ETxTypes {
490490
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
491491
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
492492
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
493+
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
493494
}

ydb/core/tx/datashard/datashard.cpp

+150-37
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
855855
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
856856
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
857857
NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));
858+
859+
auto res = ChangesQueue.emplace(record.GetOrder(), record);
860+
Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder());
861+
862+
if (res.first->second.SchemaVersion) {
863+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
864+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
865+
}
866+
867+
if (CommittingChangeRecords.empty()) {
868+
db.GetDatabase().OnCommit([this] {
869+
CommittingChangeRecords.clear();
870+
});
871+
db.GetDatabase().OnRollback([this] {
872+
for (const auto order : CommittingChangeRecords) {
873+
auto cIt = ChangesQueue.find(order);
874+
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);
875+
876+
if (cIt->second.SchemaSnapshotAcquired) {
877+
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
878+
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
879+
ScheduleRemoveSchemaSnapshot(snapshotKey);
880+
}
881+
}
882+
883+
ChangesQueue.erase(cIt);
884+
}
885+
886+
CommittingChangeRecords.clear();
887+
});
888+
}
889+
890+
CommittingChangeRecords.push_back(record.GetOrder());
858891
} else {
859892
auto& state = LockChangeRecords[lockId];
860893
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
@@ -934,6 +967,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
934967
committed.Step = rowVersion.Step;
935968
committed.TxId = rowVersion.TxId;
936969
collected.push_back(committed);
970+
971+
auto res = ChangesQueue.emplace(committed.Order, committed);
972+
Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order);
973+
974+
if (res.first->second.SchemaVersion) {
975+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
976+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
977+
}
937978
}
938979

939980
Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once");
@@ -960,7 +1001,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
9601001
LockChangeRecords.erase(it);
9611002
});
9621003
db.GetDatabase().OnRollback([this, lockId]() {
963-
CommittedLockChangeRecords.erase(lockId);
1004+
auto it = CommittedLockChangeRecords.find(lockId);
1005+
Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId);
1006+
1007+
for (size_t i = 0; i < it->second.Count; ++i) {
1008+
const ui64 order = it->second.Order + i;
1009+
1010+
auto cIt = ChangesQueue.find(order);
1011+
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);
1012+
1013+
if (cIt->second.SchemaSnapshotAcquired) {
1014+
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
1015+
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
1016+
ScheduleRemoveSchemaSnapshot(snapshotKey);
1017+
}
1018+
}
1019+
1020+
ChangesQueue.erase(cIt);
1021+
}
1022+
1023+
CommittedLockChangeRecords.erase(it);
9641024
});
9651025
}
9661026

@@ -994,7 +1054,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
9941054

9951055
auto it = ChangesQueue.find(order);
9961056
if (it == ChangesQueue.end()) {
997-
Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
9981057
return;
9991058
}
10001059

@@ -1022,23 +1081,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10221081
ChangesQueueBytes -= record.BodySize;
10231082

10241083
if (record.SchemaSnapshotAcquired) {
1025-
Y_ABORT_UNLESS(record.TableId);
1026-
auto tableIt = TableInfos.find(record.TableId.LocalPathId);
1027-
1028-
if (tableIt != TableInfos.end()) {
1029-
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
1030-
const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);
1031-
1032-
if (last) {
1033-
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
1034-
Y_ABORT_UNLESS(snapshot);
1035-
1036-
if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
1037-
SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
1038-
}
1039-
}
1040-
} else {
1041-
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
1084+
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
1085+
if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
1086+
ScheduleRemoveSchemaSnapshot(snapshotKey);
10421087
}
10431088
}
10441089

@@ -1059,7 +1104,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10591104
CheckChangesQueueNoOverflow();
10601105
}
10611106

1062-
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
1107+
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
10631108
if (!records) {
10641109
return;
10651110
}
@@ -1079,27 +1124,24 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
10791124
const auto now = AppData()->TimeProvider->Now();
10801125
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
10811126
for (const auto& record : records) {
1082-
forward.emplace_back(record.Order, record.PathId, record.BodySize);
1127+
auto it = ChangesQueue.find(record.Order);
1128+
if (it == ChangesQueue.end()) {
1129+
Y_ABORT_UNLESS(afterMove);
1130+
continue;
1131+
}
10831132

1084-
auto res = ChangesQueue.emplace(
1085-
std::piecewise_construct,
1086-
std::forward_as_tuple(record.Order),
1087-
std::forward_as_tuple(record, now, cookie)
1088-
);
1089-
if (res.second) {
1090-
ChangesList.PushBack(&res.first->second);
1133+
forward.emplace_back(record.Order, record.PathId, record.BodySize);
10911134

1092-
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
1093-
ChangesQueueBytes += record.BodySize;
1135+
it->second.EnqueuedAt = now;
1136+
it->second.ReservationCookie = cookie;
1137+
ChangesList.PushBack(&it->second);
10941138

1095-
if (record.SchemaVersion) {
1096-
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
1097-
TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
1098-
}
1099-
}
1139+
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
1140+
ChangesQueueBytes += record.BodySize;
11001141
}
1101-
1142+
11021143
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
1144+
Y_ABORT_UNLESS(!afterMove);
11031145
ChangeQueueReservedCapacity -= it->second;
11041146
ChangeQueueReservedCapacity += records.size();
11051147
}
@@ -1265,6 +1307,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
12651307
.SchemaVersion = schemaVersion,
12661308
});
12671309

1310+
auto res = ChangesQueue.emplace(records.back().Order, records.back());
1311+
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);
1312+
1313+
if (res.first->second.SchemaVersion) {
1314+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
1315+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
1316+
}
1317+
12681318
if (!rowset.Next()) {
12691319
return false;
12701320
}
@@ -1363,6 +1413,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
13631413
});
13641414
entry.Count++;
13651415
needSort = true;
1416+
1417+
auto res = ChangesQueue.emplace(records.back().Order, records.back());
1418+
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);
1419+
1420+
if (res.first->second.SchemaVersion) {
1421+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
1422+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
1423+
}
13661424
}
13671425

13681426
LockChangeRecords.erase(lockId);
@@ -1421,6 +1479,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
14211479
}
14221480
}
14231481

1482+
void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) {
1483+
Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key));
1484+
1485+
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key);
1486+
Y_ABORT_UNLESS(snapshot);
1487+
1488+
auto it = TableInfos.find(key.PathId);
1489+
if (it == TableInfos.end()) {
1490+
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
1491+
return;
1492+
}
1493+
1494+
if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
1495+
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();
1496+
PendingSchemaSnapshotsToGc.push_back(key);
1497+
if (wasEmpty) {
1498+
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
1499+
}
1500+
}
1501+
}
1502+
1503+
void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() {
1504+
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();
1505+
1506+
for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) {
1507+
auto it = TableInfos.find(key.PathId);
1508+
if (it == TableInfos.end()) {
1509+
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
1510+
break;
1511+
}
1512+
if (SchemaSnapshotManager.HasReference(key)) {
1513+
continue;
1514+
}
1515+
if (snapshot.Schema->GetTableSchemaVersion() >= it->second->GetTableSchemaVersion()) {
1516+
continue;
1517+
}
1518+
1519+
PendingSchemaSnapshotsToGc.push_back(key);
1520+
}
1521+
1522+
if (wasEmpty && !PendingSchemaSnapshotsToGc.empty()) {
1523+
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
1524+
}
1525+
}
1526+
14241527
void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
14251528
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
14261529
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
@@ -1649,8 +1752,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
16491752
Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId));
16501753
auto tableInfo = TableInfos[pathId.LocalPathId];
16511754

1652-
const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion);
1755+
const auto key = TSchemaSnapshotKey(pathId, tableSchemaVersion);
16531756
SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId));
1757+
1758+
const auto& snapshots = SchemaSnapshotManager.GetSnapshots();
1759+
for (auto it = snapshots.lower_bound(TSchemaSnapshotKey(pathId, 1)); it != snapshots.end(); ++it) {
1760+
if (it->first == key) {
1761+
break;
1762+
}
1763+
if (!SchemaSnapshotManager.HasReference(it->first)) {
1764+
ScheduleRemoveSchemaSnapshot(it->first);
1765+
}
1766+
}
16541767
}
16551768

16561769
void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) {

ydb/core/tx/datashard/datashard__init.cpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
425425
return false;
426426
}
427427

428+
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
429+
if (!Self->SchemaSnapshotManager.Load(db)) {
430+
return false;
431+
}
432+
}
433+
428434
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) {
429435
if (!Self->LoadChangeRecords(db, ChangeRecords)) {
430436
return false;
@@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
512518
}
513519
}
514520

515-
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
516-
if (!Self->SchemaSnapshotManager.Load(db)) {
517-
return false;
518-
}
519-
}
520-
521521
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
522522
TDataShardLocksDb locksDb(*Self, txc);
523523
if (!Self->SysLocks.Load(locksDb)) {
@@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
547547
Self->SubscribeNewLocks();
548548

549549
Self->ScheduleRemoveAbandonedLockChanges();
550+
Self->ScheduleRemoveAbandonedSchemaSnapshots();
550551

551552
return true;
552553
}

ydb/core/tx/datashard/datashard_change_sending.cpp

+4-8
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
286286
ChangeExchangeSplit = true;
287287
} else {
288288
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
289-
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
289+
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
290290
ActivationList.insert(dstTabletId);
291291
}
292292
}
@@ -346,9 +346,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
346346
}
347347

348348
for (const auto dstTabletId : ActivationList) {
349-
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
350-
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
351-
}
349+
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
352350
}
353351

354352
Self->CheckStateChange(ctx);
@@ -383,7 +381,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
383381
Y_ABORT_UNLESS(Self->ChangeExchangeSplitter.Done());
384382

385383
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
386-
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
384+
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
387385
ActivationList.insert(dstTabletId);
388386
}
389387
}
@@ -396,9 +394,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
396394
<< ", at tablet# " << Self->TabletID());
397395

398396
for (const auto dstTabletId : ActivationList) {
399-
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
400-
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
401-
}
397+
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
402398
}
403399
}
404400

0 commit comments

Comments
 (0)