Skip to content

Improvements in Write Actor #6251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions ydb/core/engine/mkql_keys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType *type, bool &isOption
}
}


template<typename T>
TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
const auto v = value.Get<T>();
return TCell(reinterpret_cast<const char*>(&v), sizeof(v));
}

THolder<TKeyDesc> ExtractKeyTuple(const TTableId& tableId, TTupleLiteral* tuple,
const TVector<TKeyDesc::TColumnOp>& columns,
TKeyDesc::ERowOperation rowOperation, bool requireStaticKey, const TTypeEnvironment& env) {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/engine/mkql_keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ THolder<TKeyDesc> ExtractTableKey(TCallable& callable, const TTableStrings& stri
TVector<THolder<TKeyDesc>> ExtractTableKeys(TExploringNodeVisitor& explorer, const TTypeEnvironment& env);
TTableId ExtractTableId(const TRuntimeNode& node);

template<typename T>
TCell MakeCell(const NUdf::TUnboxedValuePod& value) {
static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell.");
const auto v = value.Get<T>();
return TCell(reinterpret_cast<const char*>(&v), sizeof(v));
}

TCell MakeCell(NScheme::TTypeInfo type, const NUdf::TUnboxedValuePod& value,
const TTypeEnvironment& env, bool copy = true,
i32 typmod = -1, TMaybe<TString>* error = {});
Expand Down
32 changes: 17 additions & 15 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

namespace {
constexpr i64 kInFlightMemoryLimitPerActor = 64_MB;
constexpr i64 kMemoryLimitPerMessage = 48_MB;
constexpr i64 kMaxBatchesPerMessage = 1;
constexpr i64 kMemoryLimitPerMessage = 64_MB;
constexpr i64 kMaxBatchesPerMessage = 8;

struct TWriteActorBackoffSettings {
TDuration StartRetryDelay = TDuration::MilliSeconds(250);
Expand Down Expand Up @@ -81,12 +81,12 @@ namespace {
namespace NKikimr {
namespace NKqp {

class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::NDq::IDqComputeActorAsyncOutput {
using TBase = TActorBootstrapped<TKqpWriteActor>;
class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, public NYql::NDq::IDqComputeActorAsyncOutput {
using TBase = TActorBootstrapped<TKqpDirectWriteActor>;

class TResumeNotificationManager {
public:
TResumeNotificationManager(TKqpWriteActor& writer)
TResumeNotificationManager(TKqpDirectWriteActor& writer)
: Writer(writer) {
CheckMemory();
}
Expand All @@ -102,7 +102,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
}

private:
TKqpWriteActor& Writer;
TKqpDirectWriteActor& Writer;
i64 LastFreeMemory = std::numeric_limits<i64>::max();
};

Expand All @@ -127,7 +127,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
};

public:
TKqpWriteActor(
TKqpDirectWriteActor(
NKikimrKqp::TKqpTableSinkSettings&& settings,
NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args,
TIntrusivePtr<TKqpCounters> counters)
Expand All @@ -137,6 +137,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
, Callbacks(args.Callback)
, Counters(counters)
, TypeEnv(args.TypeEnv)
, Alloc(args.Alloc)
, TxId(args.TxId)
, TableId(
Settings.GetTable().GetOwnerId(),
Expand All @@ -157,13 +158,13 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
void Bootstrap() {
LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix;
ResolveTable();
Become(&TKqpWriteActor::StateFunc);
Become(&TKqpDirectWriteActor::StateFunc);
}

static constexpr char ActorName[] = "KQP_WRITE_ACTOR";

private:
virtual ~TKqpWriteActor() {
virtual ~TKqpDirectWriteActor() {
}

void CommitState(const NYql::NDqProto::TCheckpoint&) final {};
Expand Down Expand Up @@ -491,7 +492,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
<< ", Cookie=" << ev->Cookie
<< ", LocksCount=" << ev->Get()->Record.GetTxLocks().size());

PopShardBatch(ev->Get()->Record.GetOrigin(), ev->Cookie);
OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie);

for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock);
Expand All @@ -500,7 +501,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
ProcessBatches();
}

void PopShardBatch(ui64 shardId, ui64 cookie) {
void OnMessageAcknowledged(ui64 shardId, ui64 cookie) {
TResumeNotificationManager resumeNotificator(*this);
const auto removedDataSize = ShardedWriteController->OnMessageAcknowledged(shardId, cookie);
if (removedDataSize) {
Expand Down Expand Up @@ -669,7 +670,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N

void PassAway() override {
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
TActorBootstrapped<TKqpWriteActor>::PassAway();
TActorBootstrapped<TKqpDirectWriteActor>::PassAway();
}

void Prepare() {
Expand All @@ -693,7 +694,8 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
: kMaxBatchesPerMessage),
},
std::move(columnsMetadata),
TypeEnv);
TypeEnv,
Alloc);
} catch (...) {
RuntimeError(
CurrentExceptionMessage(),
Expand Down Expand Up @@ -721,7 +723,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
Callbacks->ResumeExecution();
}

NActors::TActorId TxProxyId = MakeTxProxyID();
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);

TString LogPrefix;
Expand All @@ -731,6 +732,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr;
TIntrusivePtr<TKqpCounters> Counters;
const NMiniKQL::TTypeEnvironment& TypeEnv;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;

const NYql::NDq::TTxId TxId;
const TTableId TableId;
Expand All @@ -754,7 +756,7 @@ void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<
factory.RegisterSink<NKikimrKqp::TKqpTableSinkSettings>(
TString(NYql::KqpTableSinkName),
[counters] (NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args) {
auto* actor = new TKqpWriteActor(std::move(settings), std::move(args), counters);
auto* actor = new TKqpDirectWriteActor(std::move(settings), std::move(args), counters);
return std::make_pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*>(actor, actor);
});
}
Expand Down
Loading
Loading