Skip to content

Commit 1531272

Browse files
authored
Merge 2d79dc5 into 28859b5
2 parents 28859b5 + 2d79dc5 commit 1531272

17 files changed

+625
-150
lines changed

ydb/core/change_exchange/change_sender_common_ops.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ bool TBaseChangeSender::AddBroadcastPartition(ui64 order, ui64 partitionId) {
351351
Y_ABORT_UNLESS(it != Broadcasting.end());
352352

353353
auto& broadcast = it->second;
354-
if (broadcast.Partitions.contains(partitionId)) {
354+
if (broadcast.CompletedPartitions.contains(partitionId)) {
355355
return false;
356356
}
357357

ydb/core/persqueue/partition_sourcemanager.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() {
8181
}
8282

8383
bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
84-
return !SourceIdWriter.GetSourceIdsToWrite().empty();
84+
return !SourceIdWriter.GetSourceIdsToWrite().empty()
85+
|| !SourceIdWriter.GetSourceIdsToDelete().empty();
8586
}
8687

8788
void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {

ydb/core/persqueue/sourceid.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ class TSourceIdWriter {
122122
return Registrations;
123123
}
124124

125+
const THashSet<TString>& GetSourceIdsToDelete() const {
126+
return Deregistrations;
127+
}
128+
125129
template <typename... Args>
126130
void RegisterSourceId(const TString& sourceId, Args&&... args) {
127131
Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...);

ydb/core/protos/counters_datashard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,4 +486,5 @@ enum ETxTypes {
486486
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
487487
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
488488
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
489+
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
489490
}

ydb/core/tx/datashard/datashard.cpp

Lines changed: 150 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
750750
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
751751
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
752752
NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));
753+
754+
auto res = ChangesQueue.emplace(record.GetOrder(), record);
755+
Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder());
756+
757+
if (res.first->second.SchemaVersion) {
758+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
759+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
760+
}
761+
762+
if (CommittingChangeRecords.empty()) {
763+
db.GetDatabase().OnCommit([this] {
764+
CommittingChangeRecords.clear();
765+
});
766+
db.GetDatabase().OnRollback([this] {
767+
for (const auto order : CommittingChangeRecords) {
768+
auto cIt = ChangesQueue.find(order);
769+
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);
770+
771+
if (cIt->second.SchemaSnapshotAcquired) {
772+
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
773+
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
774+
ScheduleRemoveSchemaSnapshot(snapshotKey);
775+
}
776+
}
777+
778+
ChangesQueue.erase(cIt);
779+
}
780+
781+
CommittingChangeRecords.clear();
782+
});
783+
}
784+
785+
CommittingChangeRecords.push_back(record.GetOrder());
753786
} else {
754787
auto& state = LockChangeRecords[lockId];
755788
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
@@ -829,6 +862,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
829862
committed.Step = rowVersion.Step;
830863
committed.TxId = rowVersion.TxId;
831864
collected.push_back(committed);
865+
866+
auto res = ChangesQueue.emplace(committed.Order, committed);
867+
Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order);
868+
869+
if (res.first->second.SchemaVersion) {
870+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
871+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
872+
}
832873
}
833874

834875
Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once");
@@ -855,7 +896,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
855896
LockChangeRecords.erase(it);
856897
});
857898
db.GetDatabase().OnRollback([this, lockId]() {
858-
CommittedLockChangeRecords.erase(lockId);
899+
auto it = CommittedLockChangeRecords.find(lockId);
900+
Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId);
901+
902+
for (size_t i = 0; i < it->second.Count; ++i) {
903+
const ui64 order = it->second.Order + i;
904+
905+
auto cIt = ChangesQueue.find(order);
906+
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);
907+
908+
if (cIt->second.SchemaSnapshotAcquired) {
909+
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
910+
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
911+
ScheduleRemoveSchemaSnapshot(snapshotKey);
912+
}
913+
}
914+
915+
ChangesQueue.erase(cIt);
916+
}
917+
918+
CommittedLockChangeRecords.erase(it);
859919
});
860920
}
861921

@@ -889,7 +949,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
889949

890950
auto it = ChangesQueue.find(order);
891951
if (it == ChangesQueue.end()) {
892-
Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
893952
return;
894953
}
895954

@@ -917,23 +976,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
917976
ChangesQueueBytes -= record.BodySize;
918977

919978
if (record.SchemaSnapshotAcquired) {
920-
Y_ABORT_UNLESS(record.TableId);
921-
auto tableIt = TableInfos.find(record.TableId.LocalPathId);
922-
923-
if (tableIt != TableInfos.end()) {
924-
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
925-
const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);
926-
927-
if (last) {
928-
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
929-
Y_ABORT_UNLESS(snapshot);
930-
931-
if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
932-
SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
933-
}
934-
}
935-
} else {
936-
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
979+
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
980+
if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
981+
ScheduleRemoveSchemaSnapshot(snapshotKey);
937982
}
938983
}
939984

@@ -954,7 +999,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
954999
CheckChangesQueueNoOverflow();
9551000
}
9561001

957-
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
1002+
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
9581003
if (!records) {
9591004
return;
9601005
}
@@ -974,27 +1019,24 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
9741019
const auto now = AppData()->TimeProvider->Now();
9751020
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
9761021
for (const auto& record : records) {
977-
forward.emplace_back(record.Order, record.PathId, record.BodySize);
1022+
auto it = ChangesQueue.find(record.Order);
1023+
if (it == ChangesQueue.end()) {
1024+
Y_ABORT_UNLESS(afterMove);
1025+
continue;
1026+
}
9781027

979-
auto res = ChangesQueue.emplace(
980-
std::piecewise_construct,
981-
std::forward_as_tuple(record.Order),
982-
std::forward_as_tuple(record, now, cookie)
983-
);
984-
if (res.second) {
985-
ChangesList.PushBack(&res.first->second);
1028+
forward.emplace_back(record.Order, record.PathId, record.BodySize);
9861029

987-
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
988-
ChangesQueueBytes += record.BodySize;
1030+
it->second.EnqueuedAt = now;
1031+
it->second.ReservationCookie = cookie;
1032+
ChangesList.PushBack(&it->second);
9891033

990-
if (record.SchemaVersion) {
991-
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
992-
TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
993-
}
994-
}
1034+
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
1035+
ChangesQueueBytes += record.BodySize;
9951036
}
996-
1037+
9971038
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
1039+
Y_ABORT_UNLESS(!afterMove);
9981040
ChangeQueueReservedCapacity -= it->second;
9991041
ChangeQueueReservedCapacity += records.size();
10001042
}
@@ -1160,6 +1202,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
11601202
.SchemaVersion = schemaVersion,
11611203
});
11621204

1205+
auto res = ChangesQueue.emplace(records.back().Order, records.back());
1206+
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);
1207+
1208+
if (res.first->second.SchemaVersion) {
1209+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
1210+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
1211+
}
1212+
11631213
if (!rowset.Next()) {
11641214
return false;
11651215
}
@@ -1258,6 +1308,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
12581308
});
12591309
entry.Count++;
12601310
needSort = true;
1311+
1312+
auto res = ChangesQueue.emplace(records.back().Order, records.back());
1313+
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);
1314+
1315+
if (res.first->second.SchemaVersion) {
1316+
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
1317+
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
1318+
}
12611319
}
12621320

12631321
LockChangeRecords.erase(lockId);
@@ -1316,6 +1374,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
13161374
}
13171375
}
13181376

1377+
void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) {
1378+
Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key));
1379+
1380+
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key);
1381+
Y_ABORT_UNLESS(snapshot);
1382+
1383+
auto it = TableInfos.find(key.PathId);
1384+
if (it == TableInfos.end()) {
1385+
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
1386+
return;
1387+
}
1388+
1389+
if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
1390+
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();
1391+
PendingSchemaSnapshotsToGc.push_back(key);
1392+
if (wasEmpty) {
1393+
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
1394+
}
1395+
}
1396+
}
1397+
1398+
void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() {
1399+
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();
1400+
1401+
for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) {
1402+
auto it = TableInfos.find(key.PathId);
1403+
if (it == TableInfos.end()) {
1404+
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
1405+
break;
1406+
}
1407+
if (SchemaSnapshotManager.HasReference(key)) {
1408+
continue;
1409+
}
1410+
if (snapshot.Schema->GetTableSchemaVersion() >= it->second->GetTableSchemaVersion()) {
1411+
continue;
1412+
}
1413+
1414+
PendingSchemaSnapshotsToGc.push_back(key);
1415+
}
1416+
1417+
if (wasEmpty && !PendingSchemaSnapshotsToGc.empty()) {
1418+
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
1419+
}
1420+
}
1421+
13191422
void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
13201423
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
13211424
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
@@ -1529,8 +1632,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
15291632
Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId));
15301633
auto tableInfo = TableInfos[pathId.LocalPathId];
15311634

1532-
const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion);
1635+
const auto key = TSchemaSnapshotKey(pathId, tableSchemaVersion);
15331636
SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId));
1637+
1638+
const auto& snapshots = SchemaSnapshotManager.GetSnapshots();
1639+
for (auto it = snapshots.lower_bound(TSchemaSnapshotKey(pathId, 1)); it != snapshots.end(); ++it) {
1640+
if (it->first == key) {
1641+
break;
1642+
}
1643+
if (!SchemaSnapshotManager.HasReference(it->first)) {
1644+
ScheduleRemoveSchemaSnapshot(it->first);
1645+
}
1646+
}
15341647
}
15351648

15361649
void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) {

ydb/core/tx/datashard/datashard__init.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
425425
return false;
426426
}
427427

428+
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
429+
if (!Self->SchemaSnapshotManager.Load(db)) {
430+
return false;
431+
}
432+
}
433+
428434
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) {
429435
if (!Self->LoadChangeRecords(db, ChangeRecords)) {
430436
return false;
@@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
512518
}
513519
}
514520

515-
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
516-
if (!Self->SchemaSnapshotManager.Load(db)) {
517-
return false;
518-
}
519-
}
520-
521521
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
522522
TDataShardLocksDb locksDb(*Self, txc);
523523
if (!Self->SysLocks.Load(locksDb)) {
@@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
547547
Self->SubscribeNewLocks();
548548

549549
Self->ScheduleRemoveAbandonedLockChanges();
550+
Self->ScheduleRemoveAbandonedSchemaSnapshots();
550551

551552
return true;
552553
}

ydb/core/tx/datashard/datashard_change_sending.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
286286
ChangeExchangeSplit = true;
287287
} else {
288288
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
289-
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
289+
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
290290
ActivationList.insert(dstTabletId);
291291
}
292292
}
@@ -346,9 +346,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
346346
}
347347

348348
for (const auto dstTabletId : ActivationList) {
349-
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
350-
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
351-
}
349+
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
352350
}
353351

354352
Self->CheckStateChange(ctx);
@@ -383,7 +381,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
383381
Y_ABORT_UNLESS(Self->ChangeExchangeSplitter.Done());
384382

385383
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
386-
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
384+
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
387385
ActivationList.insert(dstTabletId);
388386
}
389387
}
@@ -396,9 +394,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
396394
<< ", at tablet# " << Self->TabletID());
397395

398396
for (const auto dstTabletId : ActivationList) {
399-
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
400-
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
401-
}
397+
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
402398
}
403399
}
404400

0 commit comments

Comments
 (0)