Skip to content

Commit 60db425

Browse files
authored
Merge 9393b6f into 1064ddc
2 parents 1064ddc + 9393b6f commit 60db425

8 files changed

+313
-196
lines changed

ydb/core/engine/mkql_keys.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,6 @@ NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType *type, bool &isOption
5151
}
5252
}
5353

54-
55-
template<typename T>
56-
TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
57-
static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
58-
const auto v = value.Get<T>();
59-
return TCell(reinterpret_cast<const char*>(&v), sizeof(v));
60-
}
61-
6254
THolder<TKeyDesc> ExtractKeyTuple(const TTableId& tableId, TTupleLiteral* tuple,
6355
const TVector<TKeyDesc::TColumnOp>& columns,
6456
TKeyDesc::ERowOperation rowOperation, bool requireStaticKey, const TTypeEnvironment& env) {

ydb/core/engine/mkql_keys.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ THolder<TKeyDesc> ExtractTableKey(TCallable& callable, const TTableStrings& stri
4545
TVector<THolder<TKeyDesc>> ExtractTableKeys(TExploringNodeVisitor& explorer, const TTypeEnvironment& env);
4646
TTableId ExtractTableId(const TRuntimeNode& node);
4747

48+
template<typename T>
49+
TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
50+
static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
51+
const auto v = value.Get<T>();
52+
return TCell(reinterpret_cast<const char*>(&v), sizeof(v));
53+
}
54+
4855
TCell MakeCell(NScheme::TTypeInfo type, const NUdf::TUnboxedValuePod& value,
4956
const TTypeEnvironment& env, bool copy = true,
5057
i32 typmod = -1, TMaybe<TString>* error = {});

ydb/core/kqp/host/kqp_statement_rewrite.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ namespace {
187187
const TString createTableName = !isAtomicOperation
188188
? tableName
189189
: (TStringBuilder()
190-
<< "/Root/.tmp/sessions/"
190+
<< sessionCtx->GetDatabase()
191+
<< "/.tmp/sessions/"
191192
<< sessionCtx->GetSessionId()
192193
<< tmpTableName);
193194

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424

2525
namespace {
2626
constexpr i64 kInFlightMemoryLimitPerActor = 64_MB;
27-
constexpr i64 kMemoryLimitPerMessage = 48_MB;
28-
constexpr i64 kMaxBatchesPerMessage = 1;
27+
constexpr i64 kMemoryLimitPerMessage = 64_MB;
28+
constexpr i64 kMaxBatchesPerMessage = 8;
2929

3030
struct TWriteActorBackoffSettings {
3131
TDuration StartRetryDelay = TDuration::MilliSeconds(250);
@@ -81,12 +81,12 @@ namespace {
8181
namespace NKikimr {
8282
namespace NKqp {
8383

84-
class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::NDq::IDqComputeActorAsyncOutput {
85-
using TBase = TActorBootstrapped<TKqpWriteActor>;
84+
class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, public NYql::NDq::IDqComputeActorAsyncOutput {
85+
using TBase = TActorBootstrapped<TKqpDirectWriteActor>;
8686

8787
class TResumeNotificationManager {
8888
public:
89-
TResumeNotificationManager(TKqpWriteActor& writer)
89+
TResumeNotificationManager(TKqpDirectWriteActor& writer)
9090
: Writer(writer) {
9191
CheckMemory();
9292
}
@@ -102,7 +102,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
102102
}
103103

104104
private:
105-
TKqpWriteActor& Writer;
105+
TKqpDirectWriteActor& Writer;
106106
i64 LastFreeMemory = std::numeric_limits<i64>::max();
107107
};
108108

@@ -127,7 +127,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
127127
};
128128

129129
public:
130-
TKqpWriteActor(
130+
TKqpDirectWriteActor(
131131
NKikimrKqp::TKqpTableSinkSettings&& settings,
132132
NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args,
133133
TIntrusivePtr<TKqpCounters> counters)
@@ -157,13 +157,13 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
157157
void Bootstrap() {
158158
LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix;
159159
ResolveTable();
160-
Become(&TKqpWriteActor::StateFunc);
160+
Become(&TKqpDirectWriteActor::StateFunc);
161161
}
162162

163163
static constexpr char ActorName[] = "KQP_WRITE_ACTOR";
164164

165165
private:
166-
virtual ~TKqpWriteActor() {
166+
virtual ~TKqpDirectWriteActor() {
167167
}
168168

169169
void CommitState(const NYql::NDqProto::TCheckpoint&) final {};
@@ -491,7 +491,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
491491
<< ", Cookie=" << ev->Cookie
492492
<< ", LocksCount=" << ev->Get()->Record.GetTxLocks().size());
493493

494-
PopShardBatch(ev->Get()->Record.GetOrigin(), ev->Cookie);
494+
OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie);
495495

496496
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
497497
LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock);
@@ -500,7 +500,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
500500
ProcessBatches();
501501
}
502502

503-
void PopShardBatch(ui64 shardId, ui64 cookie) {
503+
void OnMessageAcknowledged(ui64 shardId, ui64 cookie) {
504504
TResumeNotificationManager resumeNotificator(*this);
505505
const auto removedDataSize = ShardedWriteController->OnMessageAcknowledged(shardId, cookie);
506506
if (removedDataSize) {
@@ -669,7 +669,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
669669

670670
void PassAway() override {
671671
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
672-
TActorBootstrapped<TKqpWriteActor>::PassAway();
672+
TActorBootstrapped<TKqpDirectWriteActor>::PassAway();
673673
}
674674

675675
void Prepare() {
@@ -721,7 +721,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
721721
Callbacks->ResumeExecution();
722722
}
723723

724-
NActors::TActorId TxProxyId = MakeTxProxyID();
725724
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
726725

727726
TString LogPrefix;
@@ -754,7 +753,7 @@ void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<
754753
factory.RegisterSink<NKikimrKqp::TKqpTableSinkSettings>(
755754
TString(NYql::KqpTableSinkName),
756755
[counters] (NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args) {
757-
auto* actor = new TKqpWriteActor(std::move(settings), std::move(args), counters);
756+
auto* actor = new TKqpDirectWriteActor(std::move(settings), std::move(args), counters);
758757
return std::make_pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*>(actor, actor);
759758
});
760759
}

0 commit comments

Comments
 (0)