Skip to content

Commit 97c9e87

Browse files
authored
24-1: Mark reenqueued records & forcibly request them (#4597)
1 parent 920ac4a commit 97c9e87

File tree

2 files changed

+31
-19
lines changed

2 files changed

+31
-19
lines changed

ydb/core/change_exchange/change_sender_common_ops.cpp

+9-13
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ void TBaseChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64
1818
for (const auto& [order, broadcast] : Broadcasting) {
1919
if (AddBroadcastPartition(order, partitionId)) {
2020
// re-enqueue record to send it in the correct order
21-
Enqueued.insert(broadcast.Record);
21+
Enqueued.insert(ReEnqueue(broadcast.Record));
2222
}
2323
}
2424
}
@@ -95,21 +95,22 @@ void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueReco
9595
RequestRecords();
9696
}
9797

98-
bool TBaseChangeSender::RequestRecords(bool forceAtLeastOne) {
98+
bool TBaseChangeSender::RequestRecords() {
9999
if (!Enqueued) {
100100
return false;
101101
}
102102

103103
auto it = Enqueued.begin();
104-
TVector<TRequestedRecord> records;
104+
TVector<TIncompleteRecord> records;
105105

106+
bool exceeded = false;
106107
while (it != Enqueued.end()) {
107108
if (MemUsage && (MemUsage + it->BodySize) > MemLimit) {
108-
if (!forceAtLeastOne) {
109+
if (!it->ReEnqueued || exceeded) {
109110
break;
110111
}
111112

112-
forceAtLeastOne = false;
113+
exceeded = true;
113114
}
114115

115116
MemUsage += it->BodySize;
@@ -165,16 +166,11 @@ void TBaseChangeSender::SendRecords() {
165166
THashSet<ui64> registrations;
166167
bool needToResolve = false;
167168

168-
// used to avoid deadlock between RequestRecords & SendRecords
169-
bool processedAtLeastOne = false;
170-
171169
while (it != PendingSent.end()) {
172170
if (Enqueued && Enqueued.begin()->Order <= it->first) {
173171
break;
174172
}
175173

176-
processedAtLeastOne = true;
177-
178174
if (PendingBody && PendingBody.begin()->Order <= it->first) {
179175
break;
180176
}
@@ -232,7 +228,7 @@ void TBaseChangeSender::SendRecords() {
232228
Resolver->Resolve();
233229
}
234230

235-
RequestRecords(!processedAtLeastOne);
231+
RequestRecords();
236232
}
237233

238234
void TBaseChangeSender::ForgetRecords(TVector<ui64>&& records) {
@@ -314,12 +310,12 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {
314310

315311
void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
316312
for (const auto& record : sender.Pending) {
317-
Enqueued.insert(record);
313+
Enqueued.insert(ReEnqueue(record));
318314
}
319315

320316
for (const auto& record : sender.Prepared) {
321317
if (!record->IsBroadcast()) {
322-
Enqueued.emplace(record->GetOrder(), record->GetBody().size());
318+
Enqueued.insert(ReEnqueue(record->GetOrder(), record->GetBody().size()));
323319
MemUsage -= record->GetBody().size();
324320
}
325321
}

ydb/core/change_exchange/change_sender_common_ops.h

+22-6
Original file line numberDiff line numberDiff line change
@@ -76,19 +76,35 @@ class IChangeSenderResolver {
7676
};
7777

7878
class TBaseChangeSender: public IChangeSender {
79-
using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
80-
using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
79+
using TIncompleteRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
80+
81+
struct TEnqueuedRecord: TIncompleteRecord {
82+
bool ReEnqueued = false;
83+
84+
using TIncompleteRecord::TIncompleteRecord;
85+
explicit TEnqueuedRecord(const TIncompleteRecord& record)
86+
: TIncompleteRecord(record)
87+
{
88+
}
89+
};
90+
91+
template <typename... Args>
92+
static TEnqueuedRecord ReEnqueue(Args&&... args) {
93+
TEnqueuedRecord record(std::forward<Args>(args)...);
94+
record.ReEnqueued = true;
95+
return record;
96+
}
8197

8298
struct TSender {
8399
TActorId ActorId;
84100
bool Ready = false;
85-
TVector<TEnqueuedRecord> Pending;
101+
TVector<TIncompleteRecord> Pending;
86102
TVector<IChangeRecord::TPtr> Prepared;
87103
TVector<ui64> Broadcasting;
88104
};
89105

90106
struct TBroadcast {
91-
const TEnqueuedRecord Record;
107+
const TIncompleteRecord Record;
92108
THashSet<ui64> Partitions;
93109
THashSet<ui64> PendingPartitions;
94110
THashSet<ui64> CompletedPartitions;
@@ -99,7 +115,7 @@ class TBaseChangeSender: public IChangeSender {
99115
void CreateMissingSenders(const TVector<ui64>& partitionIds);
100116
void RecreateSenders(const TVector<ui64>& partitionIds);
101117

102-
bool RequestRecords(bool forceAtLeastOne = false);
118+
bool RequestRecords();
103119
void SendRecords();
104120

105121
void SendPreparedRecords(ui64 partitionId);
@@ -156,7 +172,7 @@ class TBaseChangeSender: public IChangeSender {
156172

157173
THashMap<ui64, TSender> Senders; // ui64 is partition id
158174
TSet<TEnqueuedRecord> Enqueued;
159-
TSet<TRequestedRecord> PendingBody;
175+
TSet<TIncompleteRecord> PendingBody;
160176
TMap<ui64, IChangeRecord::TPtr> PendingSent; // ui64 is order
161177
THashMap<ui64, TBroadcast> Broadcasting; // ui64 is order
162178

0 commit comments

Comments
 (0)