Skip to content

24-3: Fix bugs in: change exchange split, removing schema snapshots & emitting of resolved timestamps #6616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class TBaseChangeSender {
Y_ABORT_UNLESS(it != Broadcasting.end());

auto& broadcast = it->second;
if (broadcast.Partitions.contains(partitionId)) {
if (broadcast.CompletedPartitions.contains(partitionId)) {
return false;
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() {
}

bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
return !SourceIdWriter.GetSourceIdsToWrite().empty();
return !SourceIdWriter.GetSourceIdsToWrite().empty()
|| !SourceIdWriter.GetSourceIdsToDelete().empty();
}

void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class TSourceIdWriter {
return Registrations;
}

const THashSet<TString>& GetSourceIdsToDelete() const {
return Deregistrations;
}

template <typename... Args>
void RegisterSourceId(const TString& sourceId, Args&&... args) {
Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,5 @@ enum ETxTypes {
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
}
187 changes: 150 additions & 37 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));

auto res = ChangesQueue.emplace(record.GetOrder(), record);
Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder());

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

if (CommittingChangeRecords.empty()) {
db.GetDatabase().OnCommit([this] {
CommittingChangeRecords.clear();
});
db.GetDatabase().OnRollback([this] {
for (const auto order : CommittingChangeRecords) {
auto cIt = ChangesQueue.find(order);
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);

if (cIt->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(cIt);
}

CommittingChangeRecords.clear();
});
}

CommittingChangeRecords.push_back(record.GetOrder());
} else {
auto& state = LockChangeRecords[lockId];
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
Expand Down Expand Up @@ -934,6 +967,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
committed.Step = rowVersion.Step;
committed.TxId = rowVersion.TxId;
collected.push_back(committed);

auto res = ChangesQueue.emplace(committed.Order, committed);
Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once");
Expand All @@ -960,7 +1001,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
LockChangeRecords.erase(it);
});
db.GetDatabase().OnRollback([this, lockId]() {
CommittedLockChangeRecords.erase(lockId);
auto it = CommittedLockChangeRecords.find(lockId);
Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId);

for (size_t i = 0; i < it->second.Count; ++i) {
const ui64 order = it->second.Order + i;

auto cIt = ChangesQueue.find(order);
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);

if (cIt->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(cIt);
}

CommittedLockChangeRecords.erase(it);
});
}

Expand Down Expand Up @@ -994,7 +1054,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {

auto it = ChangesQueue.find(order);
if (it == ChangesQueue.end()) {
Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
return;
}

Expand Down Expand Up @@ -1022,23 +1081,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
ChangesQueueBytes -= record.BodySize;

if (record.SchemaSnapshotAcquired) {
Y_ABORT_UNLESS(record.TableId);
auto tableIt = TableInfos.find(record.TableId.LocalPathId);

if (tableIt != TableInfos.end()) {
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);

if (last) {
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
Y_ABORT_UNLESS(snapshot);

if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
}
}
} else {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

Expand All @@ -1059,7 +1104,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
CheckChangesQueueNoOverflow();
}

void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
if (!records) {
return;
}
Expand All @@ -1079,27 +1124,24 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
const auto now = AppData()->TimeProvider->Now();
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
for (const auto& record : records) {
forward.emplace_back(record.Order, record.PathId, record.BodySize);
auto it = ChangesQueue.find(record.Order);
if (it == ChangesQueue.end()) {
Y_ABORT_UNLESS(afterMove);
continue;
}

auto res = ChangesQueue.emplace(
std::piecewise_construct,
std::forward_as_tuple(record.Order),
std::forward_as_tuple(record, now, cookie)
);
if (res.second) {
ChangesList.PushBack(&res.first->second);
forward.emplace_back(record.Order, record.PathId, record.BodySize);

Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
it->second.EnqueuedAt = now;
it->second.ReservationCookie = cookie;
ChangesList.PushBack(&it->second);

if (record.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
}
}
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
}

if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
Y_ABORT_UNLESS(!afterMove);
ChangeQueueReservedCapacity -= it->second;
ChangeQueueReservedCapacity += records.size();
}
Expand Down Expand Up @@ -1265,6 +1307,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
.SchemaVersion = schemaVersion,
});

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

if (!rowset.Next()) {
return false;
}
Expand Down Expand Up @@ -1363,6 +1413,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
});
entry.Count++;
needSort = true;

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

LockChangeRecords.erase(lockId);
Expand Down Expand Up @@ -1421,6 +1479,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
}
}

void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) {
Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key));

const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key);
Y_ABORT_UNLESS(snapshot);

auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
return;
}

if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();
PendingSchemaSnapshotsToGc.push_back(key);
if (wasEmpty) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}
}

void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() {
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();

for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) {
auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
break;
}
if (SchemaSnapshotManager.HasReference(key)) {
continue;
}
if (snapshot.Schema->GetTableSchemaVersion() >= it->second->GetTableSchemaVersion()) {
continue;
}

PendingSchemaSnapshotsToGc.push_back(key);
}

if (wasEmpty && !PendingSchemaSnapshotsToGc.empty()) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}

void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
Expand Down Expand Up @@ -1649,8 +1752,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId));
auto tableInfo = TableInfos[pathId.LocalPathId];

const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion);
const auto key = TSchemaSnapshotKey(pathId, tableSchemaVersion);
SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId));

const auto& snapshots = SchemaSnapshotManager.GetSnapshots();
for (auto it = snapshots.lower_bound(TSchemaSnapshotKey(pathId, 1)); it != snapshots.end(); ++it) {
if (it->first == key) {
break;
}
if (!SchemaSnapshotManager.HasReference(it->first)) {
ScheduleRemoveSchemaSnapshot(it->first);
}
}
}

void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
return false;
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) {
if (!Self->LoadChangeRecords(db, ChangeRecords)) {
return false;
Expand Down Expand Up @@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
TDataShardLocksDb locksDb(*Self, txc);
if (!Self->SysLocks.Load(locksDb)) {
Expand Down Expand Up @@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
Self->SubscribeNewLocks();

Self->ScheduleRemoveAbandonedLockChanges();
Self->ScheduleRemoveAbandonedSchemaSnapshots();

return true;
}
Expand Down
12 changes: 4 additions & 8 deletions ydb/core/tx/datashard/datashard_change_sending.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
ChangeExchangeSplit = true;
} else {
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
ActivationList.insert(dstTabletId);
}
}
Expand Down Expand Up @@ -346,9 +346,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
}

for (const auto dstTabletId : ActivationList) {
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}

Self->CheckStateChange(ctx);
Expand Down Expand Up @@ -383,7 +381,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
Y_ABORT_UNLESS(Self->ChangeExchangeSplitter.Done());

for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
ActivationList.insert(dstTabletId);
}
}
Expand All @@ -396,9 +394,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
<< ", at tablet# " << Self->TabletID());

for (const auto dstTabletId : ActivationList) {
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
}

Expand Down
Loading
Loading