Skip to content

Commit 71c4756

Browse files
authored
HTAP: Query Processor (#8917)
1 parent 7d436f8 commit 71c4756

17 files changed

+317
-119
lines changed

.github/config/muted_ya.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
2525
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
2626
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
2727
ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
28+
ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap
2829
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
2930
ydb/core/persqueue/ut [*/*]*
3031
ydb/core/persqueue/ut TPQTest.*DirectRead*

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,17 @@ NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const
1414
if (pathId.OwnerId() != 0) {
1515
auto table = txCtx.TableByIdMap.FindPtr(pathId);
1616
if (!table) {
17-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
17+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
1818
}
19-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
19+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << *table << "`");
2020
} else {
2121
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
2222
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
2323
if (pathId.TableId() == pathId.TableId()) {
24-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
24+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << table << "`");
2525
}
2626
}
27-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
27+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
2828
}
2929
}
3030

@@ -36,6 +36,27 @@ TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpT
3636
invalidatedLock.GetPathId()));
3737
}
3838

39+
NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId) {
40+
TStringBuilder message;
41+
message << "Transaction locks invalidated.";
42+
43+
if (auto it = shardIdToTableInfo.find(shardId); it != std::end(shardIdToTableInfo)) {
44+
message << " Tables: ";
45+
bool first = true;
46+
for (const auto& path : it->second.Pathes) {
47+
if (!first) {
48+
message << ", ";
49+
first = false;
50+
}
51+
message << "`" << path << "`";
52+
}
53+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
54+
} else {
55+
message << " Unknown table.";
56+
}
57+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
58+
}
59+
3960
std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
4061
TKqpTransactionContext& txCtx)
4162
{

ydb/core/kqp/common/kqp_tx.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,14 @@ struct TDeferredEffects {
121121
friend class TKqpTransactionContext;
122122
};
123123

124+
struct TTableInfo {
125+
bool IsOlap = false;
126+
THashSet<TString> Pathes;
127+
};
128+
129+
using TShardIdToTableInfo = THashMap<ui64, TTableInfo>;
130+
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;
131+
124132
class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
125133
public:
126134
explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry,
@@ -285,6 +293,12 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
285293
TTxAllocatorState::TPtr TxAlloc;
286294

287295
IKqpGateway::TKqpSnapshotHandle SnapshotHandle;
296+
297+
bool HasOlapTable = false;
298+
bool HasOltpTable = false;
299+
bool HasTableWrite = false;
300+
301+
TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
288302
};
289303

290304
struct TTxId {
@@ -433,6 +447,7 @@ class TTransactionsCache {
433447
};
434448

435449
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
450+
NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId);
436451
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
437452
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);
438453

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
601601
kqpConfig.EnableCreateTableAs = serviceConfig.GetEnableCreateTableAs();
602602
kqpConfig.EnableOlapSink = serviceConfig.GetEnableOlapSink();
603603
kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink();
604+
kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx();
604605
kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode();
605606
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
606607
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
523523
bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault();
524524
bool enableOlapSink = TableServiceConfig.GetEnableOlapSink();
525525
bool enableOltpSink = TableServiceConfig.GetEnableOltpSink();
526+
bool enableHtapTx = TableServiceConfig.GetEnableHtapTx();
526527
bool enableCreateTableAs = TableServiceConfig.GetEnableCreateTableAs();
527528
auto blockChannelsMode = TableServiceConfig.GetBlockChannelsMode();
528529

@@ -556,6 +557,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
556557
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
557558
TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
558559
TableServiceConfig.GetEnableOltpSink() != enableOltpSink ||
560+
TableServiceConfig.GetEnableHtapTx() != enableHtapTx ||
559561
TableServiceConfig.GetEnableCreateTableAs() != enableCreateTableAs ||
560562
TableServiceConfig.GetBlockChannelsMode() != blockChannelsMode ||
561563
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||

0 commit comments

Comments
 (0)