Skip to content

Commit 993c21c

Browse files
fix(kqp): pass lockNodeId to stream lookup actor (#9311)
1 parent 19fbcf4 commit 993c21c

File tree

5 files changed

+23
-9
lines changed

5 files changed

+23
-9
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+1
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
512512
}
513513

514514
TasksGraph.GetMeta().SetLockTxId(lockTxId);
515+
TasksGraph.GetMeta().SetLockNodeId(SelfId().NodeId());
515516

516517
LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);
517518
if (IsDebugLogEnabled()) {

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,7 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&
11281128

11291129
if (lockTxId) {
11301130
input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId);
1131+
input.Meta.StreamLookupSettings->SetLockNodeId(tasksGraph.GetMeta().LockNodeId);
11311132
}
11321133
transformProto->MutableSettings()->PackFrom(*input.Meta.StreamLookupSettings);
11331134
} else if (input.Meta.SequencerSettings) {

ydb/core/kqp/executer_actor/kqp_tasks_graph.h

+5
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ struct TStageInfoMeta {
9191
struct TGraphMeta {
9292
IKqpGateway::TKqpSnapshot Snapshot;
9393
TMaybe<ui64> LockTxId;
94+
ui32 LockNodeId;
9495
std::unordered_map<ui64, TActorId> ResultChannelProxies;
9596
TActorId ExecuterId;
9697
bool UseFollowers = false;
@@ -117,6 +118,10 @@ struct TGraphMeta {
117118
void SetLockTxId(TMaybe<ui64> lockTxId) {
118119
LockTxId = lockTxId;
119120
}
121+
122+
void SetLockNodeId(ui32 lockNodeId) {
123+
LockNodeId = lockNodeId;
124+
}
120125
};
121126

122127
struct TTaskInputMeta {

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
3838
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
3939
, AllowInconsistentReads(settings.GetAllowInconsistentReads())
4040
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
41+
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
4142
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
4243
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
4344
, Counters(counters)
@@ -456,6 +457,10 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
456457
record.SetLockTxId(*LockTxId);
457458
}
458459

460+
if (NodeLockId) {
461+
record.SetLockNodeId(*NodeLockId);
462+
}
463+
459464
auto defaultSettings = GetDefaultReadSettings()->Record;
460465
record.SetMaxRows(defaultSettings.GetMaxRows());
461466
record.SetMaxBytes(defaultSettings.GetMaxBytes());
@@ -586,6 +591,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
586591
IKqpGateway::TKqpSnapshot Snapshot;
587592
const bool AllowInconsistentReads;
588593
const TMaybe<ui64> LockTxId;
594+
const TMaybe<ui32> NodeLockId;
589595
std::unordered_map<ui64, TReadState> Reads;
590596
std::unordered_map<ui64, TShardState> ReadsPerShard;
591597
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;

ydb/core/protos/kqp.proto

+10-9
Original file line numberDiff line numberDiff line change
@@ -755,15 +755,16 @@ message TKqpTableSinkSettings {
755755
}
756756

757757
message TKqpStreamLookupSettings {
758-
optional NKqpProto.TKqpPhyTableId Table = 1;
759-
repeated TKqpColumnMetadataProto KeyColumns = 2;
760-
repeated TKqpColumnMetadataProto Columns = 3;
761-
optional TKqpSnapshot Snapshot = 4;
762-
optional uint64 LockTxId = 5;
763-
optional bool ImmediateTx = 6;
764-
repeated string LookupKeyColumns = 7;
765-
optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8;
766-
optional bool AllowInconsistentReads = 9 [default = false];
758+
optional NKqpProto.TKqpPhyTableId Table = 1;
759+
repeated TKqpColumnMetadataProto KeyColumns = 2;
760+
repeated TKqpColumnMetadataProto Columns = 3;
761+
optional TKqpSnapshot Snapshot = 4;
762+
optional uint64 LockTxId = 5;
763+
optional bool ImmediateTx = 6;
764+
repeated string LookupKeyColumns = 7;
765+
optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8;
766+
optional bool AllowInconsistentReads = 9 [default = false];
767+
optional uint32 LockNodeId = 10;
767768
}
768769

769770
message TKqpSequencerSettings {

0 commit comments

Comments
 (0)