@@ -216,6 +216,15 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail(TDe
216
216
swap (ready, NotReady);
217
217
}
218
218
219
+ // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
220
+ // TDecompressionQueueItem
221
+
222
+ template <bool UseMigrationProtocol>
223
+ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TDecompressionQueueItem::OnDestroyReadSession()
224
+ {
225
+ BatchInfo->OnDestroyReadSession ();
226
+ }
227
+
219
228
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
220
229
// TSingleClusterReadSessionImpl
221
230
@@ -224,6 +233,10 @@ TSingleClusterReadSessionImpl<UseMigrationProtocol>::~TSingleClusterReadSessionI
224
233
for (auto && [_, partitionStream] : PartitionStreams) {
225
234
partitionStream->ClearQueue ();
226
235
}
236
+
237
+ for (auto & e : DecompressionQueue) {
238
+ e.OnDestroyReadSession ();
239
+ }
227
240
}
228
241
229
242
@@ -1565,6 +1578,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDes
1565
1578
1566
1579
template <bool UseMigrationProtocol>
1567
1580
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed (i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize) {
1581
+
1568
1582
TDeferredActions<UseMigrationProtocol> deferred;
1569
1583
1570
1584
Y_ABORT_UNLESS (DecompressionTasksInflight > 0 );
@@ -2524,6 +2538,14 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double
2524
2538
}
2525
2539
}
2526
2540
2541
+ template <bool UseMigrationProtocol>
2542
+ void TDataDecompressionInfo<UseMigrationProtocol>::OnDestroyReadSession ()
2543
+ {
2544
+ for (auto & task : Tasks) {
2545
+ task.ClearParent ();
2546
+ }
2547
+ }
2548
+
2527
2549
template <bool UseMigrationProtocol>
2528
2550
void TDataDecompressionEvent<UseMigrationProtocol>::TakeData (TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
2529
2551
TVector<typename TADataReceivedEvent<UseMigrationProtocol>::TMessage>& messages,
@@ -2673,19 +2695,23 @@ TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::TDecompression
2673
2695
2674
2696
template <bool UseMigrationProtocol>
2675
2697
void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator ()() {
2698
+ auto parent = Parent;
2699
+ if (!parent) {
2700
+ return ;
2701
+ }
2676
2702
i64 minOffset = Max<i64>();
2677
2703
i64 maxOffset = 0 ;
2678
- const i64 partition_id = [this ](){
2704
+ const i64 partition_id = [parent ](){
2679
2705
if constexpr (UseMigrationProtocol) {
2680
- return Parent ->ServerMessage .partition ();
2706
+ return parent ->ServerMessage .partition ();
2681
2707
} else {
2682
- return Parent ->ServerMessage .partition_session_id ();
2708
+ return parent ->ServerMessage .partition_session_id ();
2683
2709
}
2684
2710
}();
2685
2711
i64 dataProcessed = 0 ;
2686
2712
size_t messagesProcessed = 0 ;
2687
2713
for (const TMessageRange& messages : Messages) {
2688
- auto & batch = *Parent ->ServerMessage .mutable_batches (messages.Batch );
2714
+ auto & batch = *parent ->ServerMessage .mutable_batches (messages.Batch );
2689
2715
for (size_t i = messages.MessageRange .first ; i < messages.MessageRange .second ; ++i) {
2690
2716
auto & data = *batch.mutable_message_data (i);
2691
2717
@@ -2696,7 +2722,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
2696
2722
2697
2723
try {
2698
2724
if constexpr (UseMigrationProtocol) {
2699
- if (Parent ->DoDecompress
2725
+ if (parent ->DoDecompress
2700
2726
&& data.codec () != Ydb::PersQueue::V1::CODEC_RAW
2701
2727
&& data.codec () != Ydb::PersQueue::V1::CODEC_UNSPECIFIED
2702
2728
) {
@@ -2706,7 +2732,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
2706
2732
data.set_codec (Ydb::PersQueue::V1::CODEC_RAW);
2707
2733
}
2708
2734
} else {
2709
- if (Parent ->DoDecompress
2735
+ if (parent ->DoDecompress
2710
2736
&& static_cast <Ydb::Topic::Codec>(batch.codec ()) != Ydb::Topic::CODEC_RAW
2711
2737
&& static_cast <Ydb::Topic::Codec>(batch.codec ()) != Ydb::Topic::CODEC_UNSPECIFIED
2712
2738
) {
@@ -2718,32 +2744,38 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
2718
2744
2719
2745
DecompressedSize += data.data ().size ();
2720
2746
} catch (...) {
2721
- Parent ->PutDecompressionError (std::current_exception (), messages.Batch , i);
2747
+ parent ->PutDecompressionError (std::current_exception (), messages.Batch , i);
2722
2748
data.clear_data (); // Free memory, because we don't count it.
2723
2749
2724
- if (auto session = Parent ->CbContext ->LockShared ()) {
2750
+ if (auto session = parent ->CbContext ->LockShared ()) {
2725
2751
session->GetLog () << TLOG_INFO << " Error decompressing data: " << CurrentExceptionMessage ();
2726
2752
}
2727
2753
}
2728
2754
}
2729
2755
}
2730
- if (auto session = Parent ->CbContext ->LockShared ()) {
2756
+ if (auto session = parent ->CbContext ->LockShared ()) {
2731
2757
LOG_LAZY (session->GetLog (), TLOG_DEBUG, TStringBuilder () << " Decompression task done. Partition/PartitionSessionId: "
2732
2758
<< partition_id << " (" << minOffset << " -"
2733
2759
<< maxOffset << " )" );
2734
2760
}
2735
2761
Y_ASSERT (dataProcessed == SourceDataSize);
2736
2762
2737
- Parent ->OnDataDecompressed (SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed);
2763
+ parent ->OnDataDecompressed (SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed);
2738
2764
2739
- Parent ->SourceDataNotProcessed -= dataProcessed;
2765
+ parent ->SourceDataNotProcessed -= dataProcessed;
2740
2766
Ready->Ready = true ;
2741
2767
2742
- if (auto session = Parent ->CbContext ->LockShared ()) {
2768
+ if (auto session = parent ->CbContext ->LockShared ()) {
2743
2769
session->GetEventsQueue ()->SignalReadyEvents (PartitionStream);
2744
2770
}
2745
2771
}
2746
2772
2773
+ template <bool UseMigrationProtocol>
2774
+ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::ClearParent ()
2775
+ {
2776
+ Parent = nullptr ;
2777
+ }
2778
+
2747
2779
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
2748
2780
// TUserRetrievedEventsInfoAccumulator
2749
2781
@@ -2781,7 +2813,7 @@ void TDeferredActions<UseMigrationProtocol>::DeferReadFromProcessor(const typena
2781
2813
}
2782
2814
2783
2815
template <bool UseMigrationProtocol>
2784
- void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask (const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction task) {
2816
+ void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask (const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction&& task) {
2785
2817
ExecutorsTasks.emplace_back (executor, std::move (task));
2786
2818
}
2787
2819
0 commit comments