Skip to content

Commit 02b4c2d

Browse files
nikvas0uzhastik
andauthored
24-3: Sinks improvements (#6856)
Co-authored-by: uzhastik <[email protected]>
1 parent 0466744 commit 02b4c2d

16 files changed

+389
-247
lines changed

ydb/core/engine/mkql_keys.cpp

-8
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,6 @@ NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType *type, bool &isOption
5151
}
5252
}
5353

54-
55-
template<typename T>
56-
TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
57-
static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
58-
const auto v = value.Get<T>();
59-
return TCell(reinterpret_cast<const char*>(&v), sizeof(v));
60-
}
61-
6254
THolder<TKeyDesc> ExtractKeyTuple(const TTableId& tableId, TTupleLiteral* tuple,
6355
const TVector<TKeyDesc::TColumnOp>& columns,
6456
TKeyDesc::ERowOperation rowOperation, bool requireStaticKey, const TTypeEnvironment& env) {

ydb/core/engine/mkql_keys.h

+7
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ THolder<TKeyDesc> ExtractTableKey(TCallable& callable, const TTableStrings& stri
4545
TVector<THolder<TKeyDesc>> ExtractTableKeys(TExploringNodeVisitor& explorer, const TTypeEnvironment& env);
4646
TTableId ExtractTableId(const TRuntimeNode& node);
4747

48+
template<typename T>
49+
TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
50+
static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
51+
const auto v = value.Get<T>();
52+
return TCell(reinterpret_cast<const char*>(&v), sizeof(v));
53+
}
54+
4855
TCell MakeCell(NScheme::TTypeInfo type, const NUdf::TUnboxedValuePod& value,
4956
const TTypeEnvironment& env, bool copy = true,
5057
i32 typmod = -1, TMaybe<TString>* error = {});

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ TExprBase KqpBuildUpdateStages(TExprBase node, TExprContext& ctx, const TKqpOpti
9191

9292
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, update.Table().Path());
9393

94-
const bool isSink = NeedSinks(table, kqpCtx) && table.Metadata->Kind == EKikimrTableKind::Olap;
94+
const bool isSink = NeedSinks(table, kqpCtx);
9595
const bool needPrecompute = !isSink;
9696

9797
if (needPrecompute) {

ydb/core/kqp/runtime/kqp_write_actor.cpp

+17-15
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424

2525
namespace {
2626
constexpr i64 kInFlightMemoryLimitPerActor = 64_MB;
27-
constexpr i64 kMemoryLimitPerMessage = 48_MB;
28-
constexpr i64 kMaxBatchesPerMessage = 1;
27+
constexpr i64 kMemoryLimitPerMessage = 64_MB;
28+
constexpr i64 kMaxBatchesPerMessage = 8;
2929

3030
struct TWriteActorBackoffSettings {
3131
TDuration StartRetryDelay = TDuration::MilliSeconds(250);
@@ -81,12 +81,12 @@ namespace {
8181
namespace NKikimr {
8282
namespace NKqp {
8383

84-
class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::NDq::IDqComputeActorAsyncOutput {
85-
using TBase = TActorBootstrapped<TKqpWriteActor>;
84+
class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, public NYql::NDq::IDqComputeActorAsyncOutput {
85+
using TBase = TActorBootstrapped<TKqpDirectWriteActor>;
8686

8787
class TResumeNotificationManager {
8888
public:
89-
TResumeNotificationManager(TKqpWriteActor& writer)
89+
TResumeNotificationManager(TKqpDirectWriteActor& writer)
9090
: Writer(writer) {
9191
CheckMemory();
9292
}
@@ -102,7 +102,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
102102
}
103103

104104
private:
105-
TKqpWriteActor& Writer;
105+
TKqpDirectWriteActor& Writer;
106106
i64 LastFreeMemory = std::numeric_limits<i64>::max();
107107
};
108108

@@ -127,7 +127,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
127127
};
128128

129129
public:
130-
TKqpWriteActor(
130+
TKqpDirectWriteActor(
131131
NKikimrKqp::TKqpTableSinkSettings&& settings,
132132
NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args,
133133
TIntrusivePtr<TKqpCounters> counters)
@@ -137,6 +137,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
137137
, Callbacks(args.Callback)
138138
, Counters(counters)
139139
, TypeEnv(args.TypeEnv)
140+
, Alloc(args.Alloc)
140141
, TxId(args.TxId)
141142
, TableId(
142143
Settings.GetTable().GetOwnerId(),
@@ -157,13 +158,13 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
157158
void Bootstrap() {
158159
LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix;
159160
ResolveTable();
160-
Become(&TKqpWriteActor::StateFunc);
161+
Become(&TKqpDirectWriteActor::StateFunc);
161162
}
162163

163164
static constexpr char ActorName[] = "KQP_WRITE_ACTOR";
164165

165166
private:
166-
virtual ~TKqpWriteActor() {
167+
virtual ~TKqpDirectWriteActor() {
167168
}
168169

169170
void CommitState(const NYql::NDqProto::TCheckpoint&) final {};
@@ -491,7 +492,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
491492
<< ", Cookie=" << ev->Cookie
492493
<< ", LocksCount=" << ev->Get()->Record.GetTxLocks().size());
493494

494-
PopShardBatch(ev->Get()->Record.GetOrigin(), ev->Cookie);
495+
OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie);
495496

496497
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
497498
LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock);
@@ -500,7 +501,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
500501
ProcessBatches();
501502
}
502503

503-
void PopShardBatch(ui64 shardId, ui64 cookie) {
504+
void OnMessageAcknowledged(ui64 shardId, ui64 cookie) {
504505
TResumeNotificationManager resumeNotificator(*this);
505506
const auto removedDataSize = ShardedWriteController->OnMessageAcknowledged(shardId, cookie);
506507
if (removedDataSize) {
@@ -669,7 +670,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
669670

670671
void PassAway() override {
671672
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
672-
TActorBootstrapped<TKqpWriteActor>::PassAway();
673+
TActorBootstrapped<TKqpDirectWriteActor>::PassAway();
673674
}
674675

675676
void Prepare() {
@@ -693,7 +694,8 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
693694
: kMaxBatchesPerMessage),
694695
},
695696
std::move(columnsMetadata),
696-
TypeEnv);
697+
TypeEnv,
698+
Alloc);
697699
} catch (...) {
698700
RuntimeError(
699701
CurrentExceptionMessage(),
@@ -721,7 +723,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
721723
Callbacks->ResumeExecution();
722724
}
723725

724-
NActors::TActorId TxProxyId = MakeTxProxyID();
725726
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
726727

727728
TString LogPrefix;
@@ -731,6 +732,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
731732
NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr;
732733
TIntrusivePtr<TKqpCounters> Counters;
733734
const NMiniKQL::TTypeEnvironment& TypeEnv;
735+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
734736

735737
const NYql::NDq::TTxId TxId;
736738
const TTableId TableId;
@@ -754,7 +756,7 @@ void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<
754756
factory.RegisterSink<NKikimrKqp::TKqpTableSinkSettings>(
755757
TString(NYql::KqpTableSinkName),
756758
[counters] (NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args) {
757-
auto* actor = new TKqpWriteActor(std::move(settings), std::move(args), counters);
759+
auto* actor = new TKqpDirectWriteActor(std::move(settings), std::move(args), counters);
758760
return std::make_pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*>(actor, actor);
759761
});
760762
}

0 commit comments

Comments
 (0)