Skip to content

24-1: Mark reenqueued records & forcibly request them #4597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions ydb/core/change_exchange/change_sender_common_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ void TBaseChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64
for (const auto& [order, broadcast] : Broadcasting) {
if (AddBroadcastPartition(order, partitionId)) {
// re-enqueue record to send it in the correct order
Enqueued.insert(broadcast.Record);
Enqueued.insert(ReEnqueue(broadcast.Record));
}
}
}
Expand Down Expand Up @@ -95,21 +95,22 @@ void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueReco
RequestRecords();
}

bool TBaseChangeSender::RequestRecords(bool forceAtLeastOne) {
bool TBaseChangeSender::RequestRecords() {
if (!Enqueued) {
return false;
}

auto it = Enqueued.begin();
TVector<TRequestedRecord> records;
TVector<TIncompleteRecord> records;

bool exceeded = false;
while (it != Enqueued.end()) {
if (MemUsage && (MemUsage + it->BodySize) > MemLimit) {
if (!forceAtLeastOne) {
if (!it->ReEnqueued || exceeded) {
break;
}

forceAtLeastOne = false;
exceeded = true;
}

MemUsage += it->BodySize;
Expand Down Expand Up @@ -165,16 +166,11 @@ void TBaseChangeSender::SendRecords() {
THashSet<ui64> registrations;
bool needToResolve = false;

// used to avoid deadlock between RequestRecords & SendRecords
bool processedAtLeastOne = false;

while (it != PendingSent.end()) {
if (Enqueued && Enqueued.begin()->Order <= it->first) {
break;
}

processedAtLeastOne = true;

if (PendingBody && PendingBody.begin()->Order <= it->first) {
break;
}
Expand Down Expand Up @@ -232,7 +228,7 @@ void TBaseChangeSender::SendRecords() {
Resolver->Resolve();
}

RequestRecords(!processedAtLeastOne);
RequestRecords();
}

void TBaseChangeSender::ForgetRecords(TVector<ui64>&& records) {
Expand Down Expand Up @@ -314,12 +310,12 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {

void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
for (const auto& record : sender.Pending) {
Enqueued.insert(record);
Enqueued.insert(ReEnqueue(record));
}

for (const auto& record : sender.Prepared) {
if (!record->IsBroadcast()) {
Enqueued.emplace(record->GetOrder(), record->GetBody().size());
Enqueued.insert(ReEnqueue(record->GetOrder(), record->GetBody().size()));
MemUsage -= record->GetBody().size();
}
}
Expand Down
28 changes: 22 additions & 6 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,35 @@ class IChangeSenderResolver {
};

class TBaseChangeSender: public IChangeSender {
using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
using TIncompleteRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;

struct TEnqueuedRecord: TIncompleteRecord {
bool ReEnqueued = false;

using TIncompleteRecord::TIncompleteRecord;
explicit TEnqueuedRecord(const TIncompleteRecord& record)
: TIncompleteRecord(record)
{
}
};

template <typename... Args>
static TEnqueuedRecord ReEnqueue(Args&&... args) {
TEnqueuedRecord record(std::forward<Args>(args)...);
record.ReEnqueued = true;
return record;
}

struct TSender {
TActorId ActorId;
bool Ready = false;
TVector<TEnqueuedRecord> Pending;
TVector<TIncompleteRecord> Pending;
TVector<IChangeRecord::TPtr> Prepared;
TVector<ui64> Broadcasting;
};

struct TBroadcast {
const TEnqueuedRecord Record;
const TIncompleteRecord Record;
THashSet<ui64> Partitions;
THashSet<ui64> PendingPartitions;
THashSet<ui64> CompletedPartitions;
Expand All @@ -99,7 +115,7 @@ class TBaseChangeSender: public IChangeSender {
void CreateMissingSenders(const TVector<ui64>& partitionIds);
void RecreateSenders(const TVector<ui64>& partitionIds);

bool RequestRecords(bool forceAtLeastOne = false);
bool RequestRecords();
void SendRecords();

void SendPreparedRecords(ui64 partitionId);
Expand Down Expand Up @@ -156,7 +172,7 @@ class TBaseChangeSender: public IChangeSender {

THashMap<ui64, TSender> Senders; // ui64 is partition id
TSet<TEnqueuedRecord> Enqueued;
TSet<TRequestedRecord> PendingBody;
TSet<TIncompleteRecord> PendingBody;
TMap<ui64, IChangeRecord::TPtr> PendingSent; // ui64 is order
THashMap<ui64, TBroadcast> Broadcasting; // ui64 is order

Expand Down
Loading