Skip to content

Commit 4e84252

Browse files
conveyor logging and remove ownerId - internal processes only (#6442)
1 parent 35573d3 commit 4e84252

28 files changed

+183
-173
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,6 @@ void TColumnShard::EnqueueBackgroundActivities(const bool periodic) {
510510
StoragesManager->GetOperatorVerified(NOlap::IStoragesManager::DefaultStorageId);
511511
StoragesManager->GetSharedBlobsManager()->GetStorageManagerVerified(NOlap::IStoragesManager::DefaultStorageId);
512512
CSCounters.OnStartBackground();
513-
SendPeriodicStats();
514513

515514
if (!TablesManager.HasPrimaryIndex()) {
516515
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("problem", "Background activities cannot be started: no index at tablet");
@@ -537,7 +536,7 @@ class TChangesTask: public NConveyor::ITask {
537536
TString ClassId;
538537
NOlap::TSnapshot LastCompletedTx;
539538
protected:
540-
virtual bool DoExecute() override {
539+
virtual TConclusionStatus DoExecute(const std::shared_ptr<NConveyor::ITask>& /*taskPtr*/) override {
541540
NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId));
542541
{
543542
NOlap::TConstructionContext context(*TxEvent->IndexInfo, Counters, LastCompletedTx);
@@ -547,7 +546,7 @@ class TChangesTask: public NConveyor::ITask {
547546
}
548547
}
549548
TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent));
550-
return true;
549+
return TConclusionStatus::Success();
551550
}
552551
public:
553552
virtual TString GetTaskClassIdentifier() const override {

ydb/core/tx/columnshard/columnshard_private_events.h

+20
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
#include <ydb/core/tx/data_events/write_data.h>
1212
#include <ydb/core/formats/arrow/special_keys.h>
1313

14+
namespace NKikimr::NOlap::NReader {
15+
class IApplyAction;
16+
}
17+
1418
namespace NKikimr::NColumnShard {
1519

1620
struct TEvPrivate {
@@ -41,11 +45,27 @@ struct TEvPrivate {
4145
EvExportCursorSaved,
4246
EvExportSaveCursor,
4347

48+
EvTaskProcessedResult,
49+
4450
EvEnd
4551
};
4652

4753
static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
4854

55+
class TEvTaskProcessedResult: public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult> {
56+
private:
57+
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>> Result;
58+
59+
public:
60+
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>> ExtractResult() {
61+
return std::move(Result);
62+
}
63+
64+
TEvTaskProcessedResult(const TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>>& result)
65+
: Result(result) {
66+
}
67+
};
68+
4969
struct TEvTieringModified: public TEventLocal<TEvTieringModified, EvTieringModified> {
5070
};
5171

ydb/core/tx/columnshard/engines/reader/abstract/abstract.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class TScanIteratorBase {
1313
public:
1414
virtual ~TScanIteratorBase() = default;
1515

16-
virtual void Apply(IDataTasksProcessor::ITask::TPtr /*processor*/) {
16+
virtual void Apply(const std::shared_ptr<IApplyAction>& /*task*/) {
1717

1818
}
1919

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,18 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
9898
}
9999
}
100100

101-
void TColumnShardScan::HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
101+
void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResult::TPtr& ev) {
102102
--InFlightReads;
103103
auto g = Stats->MakeGuard("task_result");
104-
if (ev->Get()->GetErrorMessage()) {
105-
ACFL_ERROR("event", "TEvTaskProcessedResult")("error", ev->Get()->GetErrorMessage());
106-
SendScanError("task_error:" + ev->Get()->GetErrorMessage());
104+
auto result = ev->Get()->ExtractResult();
105+
if (result.IsFail()) {
106+
ACFL_ERROR("event", "TEvTaskProcessedResult")("error", result.GetErrorMessage());
107+
SendScanError("task_error:" + result.GetErrorMessage());
107108
Finish(NColumnShard::TScanCounters::EStatusFinish::ConveyorInternalError);
108109
} else {
109110
ACFL_DEBUG("event", "TEvTaskProcessedResult");
110-
auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult());
111-
Y_DEBUG_ABORT_UNLESS(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()));
111+
auto t = static_pointer_cast<IApplyAction>(result.GetResult());
112+
Y_DEBUG_ABORT_UNLESS(dynamic_pointer_cast<IDataTasksProcessor::ITask>(result.GetResult()));
112113
if (!ScanIterator->Finished()) {
113114
ScanIterator->Apply(t);
114115
}

ydb/core/tx/columnshard/engines/reader/actor/actor.h

+16-17
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
#pragma once
2+
#include <ydb/core/formats/arrow/converter.h>
3+
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
24
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
5+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
6+
#include <ydb/core/tx/columnshard/counters/scan.h>
37
#include <ydb/core/tx/columnshard/engines/reader/abstract/abstract.h>
4-
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
58
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
6-
#include <ydb/core/tx/columnshard/counters/scan.h>
9+
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
710
#include <ydb/core/tx/conveyor/usage/events.h>
811
#include <ydb/core/tx/tracing/usage/tracing.h>
9-
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
10-
11-
#include <ydb/core/formats/arrow/converter.h>
1212

13-
#include <ydb/library/actors/core/log.h>
1413
#include <ydb/library/actors/core/actor_bootstrapped.h>
14+
#include <ydb/library/actors/core/log.h>
1515
#include <ydb/library/chunks_limiter/chunks_limiter.h>
1616

1717
namespace NKikimr::NOlap::NReader {
@@ -22,6 +22,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
2222
TActorId ReadCoordinatorActorId;
2323
const std::shared_ptr<IStoragesManager> StoragesManager;
2424
std::optional<TMonotonic> StartInstant;
25+
2526
public:
2627
static constexpr auto ActorActivityType() {
2728
return NKikimrServices::TActivity::KQP_OLAP_SCAN;
@@ -31,31 +32,29 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
3132
virtual void PassAway() override;
3233

3334
TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId,
34-
const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy,
35-
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
36-
ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
35+
const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId,
36+
ui32 scanGen, ui64 requestCookie, ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
3737
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool);
3838

3939
void Bootstrap(const TActorContext& ctx);
4040

4141
private:
4242
STATEFN(StateScan) {
4343
auto g = Stats->MakeGuard("processing");
44-
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN)
45-
("SelfId", SelfId())("TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen)
46-
);
44+
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN) ("SelfId", SelfId())(
45+
"TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen));
4746
switch (ev->GetTypeRewrite()) {
4847
hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleScan);
4948
hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleScan);
5049
hFunc(TEvents::TEvUndelivered, HandleScan);
5150
hFunc(TEvents::TEvWakeup, HandleScan);
52-
hFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, HandleScan);
51+
hFunc(NColumnShard::TEvPrivate::TEvTaskProcessedResult, HandleScan);
5352
default:
5453
AFL_VERIFY(false)("unexpected_event", ev->GetTypeName());
5554
}
5655
}
5756

58-
void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev);
57+
void HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResult::TPtr& ev);
5958

6059
void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev);
6160

@@ -80,10 +79,10 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
8079
class TScanStatsOwner: public NKqp::TEvKqpCompute::IShardScanStats {
8180
private:
8281
YDB_READONLY_DEF(TReadStats, Stats);
82+
8383
public:
8484
TScanStatsOwner(const TReadStats& stats)
8585
: Stats(stats) {
86-
8786
}
8887

8988
virtual THashMap<TString, ui64> GetMetrics() const override {
@@ -142,11 +141,11 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
142141
TDuration ReadingDurationMax;
143142
NMonitoring::THistogramPtr BlobDurationsCounter;
144143
NMonitoring::THistogramPtr ByteDurationsCounter;
144+
145145
public:
146146
TBlobStats(const NMonitoring::THistogramPtr blobDurationsCounter, const NMonitoring::THistogramPtr byteDurationsCounter)
147147
: BlobDurationsCounter(blobDurationsCounter)
148148
, ByteDurationsCounter(byteDurationsCounter) {
149-
150149
}
151150
void Received(const TBlobRange& br, const TDuration d) {
152151
ReadingDurationSum += d;
@@ -181,4 +180,4 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
181180
TDuration LastReportedElapsedTime;
182181
};
183182

184-
}
183+
} // namespace NKikimr::NOlap::NReader
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,23 @@
11
#include "conveyor_task.h"
2+
#include <ydb/library/actors/core/actor.h>
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
24

35
namespace NKikimr::NOlap::NReader {
46

5-
bool IDataTasksProcessor::ITask::Apply(IDataReader& indexedDataRead) const {
6-
return DoApply(indexedDataRead);
7+
NKikimr::TConclusionStatus IDataTasksProcessor::ITask::DoExecute(const std::shared_ptr<NConveyor::ITask>& taskPtr) {
8+
auto result = DoExecuteImpl();
9+
if (result.IsFail()) {
10+
NActors::TActivationContext::AsActorContext().Send(OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(result));
11+
} else {
12+
NActors::TActivationContext::AsActorContext().Send(
13+
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(static_pointer_cast<IDataTasksProcessor::ITask>(taskPtr)));
14+
}
15+
return result;
16+
}
17+
18+
void IDataTasksProcessor::ITask::DoOnCannotExecute(const TString& reason) {
19+
NActors::TActivationContext::AsActorContext().Send(
20+
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(TConclusionStatus::Fail(reason)));
721
}
822

923
}
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,46 @@
11
#pragma once
2+
23
#include <ydb/core/tx/conveyor/usage/abstract.h>
4+
35
#include <ydb/library/accessor/accessor.h>
6+
#include <ydb/library/conclusion/result.h>
47

58
namespace NKikimr::NOlap::NReader {
69

710
class IDataReader;
811

12+
class IApplyAction {
13+
protected:
14+
virtual bool DoApply(IDataReader& indexedDataRead) const = 0;
15+
16+
public:
17+
bool Apply(IDataReader& indexedDataRead) const {
18+
return DoApply(indexedDataRead);
19+
}
20+
};
21+
922
class IDataTasksProcessor {
1023
public:
11-
class ITask: public NConveyor::ITask {
24+
class ITask: public NConveyor::ITask, public IApplyAction {
1225
private:
1326
using TBase = NConveyor::ITask;
14-
protected:
15-
virtual bool DoApply(IDataReader& indexedDataRead) const = 0;
16-
public:
17-
ITask(const std::optional<NActors::TActorId> ownerId = {})
18-
: TBase(ownerId) {
27+
const NActors::TActorId OwnerId;
28+
virtual TConclusionStatus DoExecuteImpl() = 0;
1929

20-
}
30+
protected:
31+
virtual TConclusionStatus DoExecute(const std::shared_ptr<NConveyor::ITask>& taskPtr) override final;
32+
virtual void DoOnCannotExecute(const TString& reason) override;
2133

34+
public:
2235
using TPtr = std::shared_ptr<ITask>;
2336
virtual ~ITask() = default;
24-
bool Apply(IDataReader& indexedDataRead) const;
37+
38+
ITask(const NActors::TActorId& ownerId)
39+
: OwnerId(ownerId)
40+
{
41+
42+
}
2543
};
2644
};
2745

28-
}
46+
} // namespace NKikimr::NOlap::NReader

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "constructor.h"
22
#include <ydb/core/tx/conveyor/usage/service.h>
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
34

45
namespace NKikimr::NOlap::NReader::NPlain {
56

@@ -14,7 +15,7 @@ bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& ra
1415
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())
1516
("status", status.GetErrorMessage())("status_code", status.GetStatus())("storage_id", storageId);
1617
NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(),
17-
std::make_unique<NConveyor::TEvExecution::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
18+
std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
1819
return false;
1920
}
2021

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp

+4-5
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,19 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const {
1515
return true;
1616
}
1717

18-
bool TStepAction::DoExecute() {
18+
TConclusionStatus TStepAction::DoExecuteImpl() {
1919
if (Source->IsAborted()) {
20-
return true;
20+
return TConclusionStatus::Success();
2121
}
2222
auto executeResult = Cursor.Execute(Source);
2323
if (!executeResult) {
24-
SetErrorMessage(executeResult.GetErrorMessage());
25-
return false;
24+
return executeResult;
2625
}
2726
if (*executeResult) {
2827
Source->Finalize();
2928
FinishedFlag = true;
3029
}
31-
return true;
30+
return TConclusionStatus::Success();
3231
}
3332

3433
TConclusion<bool> TColumnBlobsFetchingStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ class TStepAction: public IDataTasksProcessor::ITask {
131131
bool FinishedFlag = false;
132132
protected:
133133
virtual bool DoApply(IDataReader& owner) const override;
134-
virtual bool DoExecute() override;
134+
virtual TConclusionStatus DoExecuteImpl() override;
135+
135136
public:
136137
virtual TString GetTaskClassIdentifier() const override {
137138
return "STEP_ACTION";

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ TColumnShardScanIterator::~TColumnShardScanIterator() {
5353
ReadMetadata->ReadStats->PrintToLog();
5454
}
5555

56-
void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
56+
void TColumnShardScanIterator::Apply(const std::shared_ptr<IApplyAction>& task) {
5757
if (!IndexedData->IsFinished()) {
5858
Y_ABORT_UNLESS(task->Apply(*IndexedData));
5959
}

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class TColumnShardScanIterator: public TScanIteratorBase {
8484
;
8585
}
8686

87-
virtual void Apply(IDataTasksProcessor::ITask::TPtr task) override;
87+
virtual void Apply(const std::shared_ptr<IApplyAction>& task) override;
8888

8989
bool Finished() const override {
9090
return IndexedData->IsFinished() && ReadyResults.empty();

0 commit comments

Comments
 (0)