Skip to content

Commit 245dfa3

Browse files
authored
YQ-3684 passed database id into workload manager and resource pool classifiers tables (#9610)
1 parent e73d806 commit 245dfa3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1213
-411
lines changed

.github/config/muted_ya.txt

-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
2727
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
2828
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Complex
2929
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Simple
30-
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalDataSource
31-
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
3230
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
3331
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
3432
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication

ydb/core/kqp/common/events/events.h

+46
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,18 @@ struct TEvKqp {
111111
struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
112112
TEvScriptRequest() = default;
113113

114+
const TString& GetDatabase() const {
115+
return Record.GetRequest().GetDatabase();
116+
}
117+
118+
const TString& GetDatabaseId() const {
119+
return Record.GetRequest().GetDatabaseId();
120+
}
121+
122+
void SetDatabaseId(const TString& databaseId) {
123+
Record.MutableRequest()->SetDatabaseId(databaseId);
124+
}
125+
114126
mutable NKikimrKqp::TEvQueryRequest Record;
115127
TDuration ForgetAfter;
116128
TDuration ResultsTtl;
@@ -164,6 +176,40 @@ struct TEvKqp {
164176
return issues;
165177
}
166178
};
179+
180+
struct TEvUpdateDatabaseInfo : public TEventLocal<TEvUpdateDatabaseInfo, TKqpEvents::EvUpdateDatabaseInfo> {
181+
TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
182+
: Status(status)
183+
, Database(database)
184+
, Issues(std::move(issues))
185+
{}
186+
187+
TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless)
188+
: Status(Ydb::StatusIds::SUCCESS)
189+
, Database(database)
190+
, DatabaseId(databaseId)
191+
, Serverless(serverless)
192+
, Issues({})
193+
{}
194+
195+
Ydb::StatusIds::StatusCode Status;
196+
TString Database;
197+
TString DatabaseId;
198+
bool Serverless = false;
199+
NYql::TIssues Issues;
200+
};
201+
202+
struct TEvDelayedRequestError : public TEventLocal<TEvDelayedRequestError, TKqpEvents::EvDelayedRequestError> {
203+
TEvDelayedRequestError(THolder<IEventHandle> requestEvent, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
204+
: RequestEvent(std::move(requestEvent))
205+
, Status(status)
206+
, Issues(std::move(issues))
207+
{}
208+
209+
THolder<IEventHandle> RequestEvent;
210+
Ydb::StatusIds::StatusCode Status;
211+
NYql::TIssues Issues;
212+
};
167213
};
168214

169215
} // namespace NKikimr::NKqp

ydb/core/kqp/common/events/query.h

+9
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,14 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
351351
return PoolConfig;
352352
}
353353

354+
const TString& GetDatabaseId() const {
355+
return DatabaseId ? DatabaseId : Record.GetRequest().GetDatabaseId();
356+
}
357+
358+
void SetDatabaseId(const TString& databaseId) {
359+
DatabaseId = databaseId;
360+
}
361+
354362
mutable NKikimrKqp::TEvQueryRequest Record;
355363

356364
private:
@@ -363,6 +371,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
363371
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
364372
TActorId RequestActorId;
365373
TString Database;
374+
TString DatabaseId;
366375
TString SessionId;
367376
TString YqlText;
368377
TString QueryId;

ydb/core/kqp/common/events/script_executions.h

+34-18
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 {
2222
FS_ROLLBACK,
2323
};
2424

25-
struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
26-
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
25+
template <typename TEv, ui32 TEventType>
26+
struct TEventWithDatabaseId : public NActors::TEventLocal<TEv, TEventType> {
27+
TEventWithDatabaseId(const TString& database)
2728
: Database(database)
28-
, OperationId(id)
2929
{}
3030

31+
const TString& GetDatabase() const {
32+
return Database;
33+
}
34+
35+
const TString& GetDatabaseId() const {
36+
return DatabaseId;
37+
}
38+
39+
void SetDatabaseId(const TString& databaseId) {
40+
DatabaseId = databaseId;
41+
}
42+
3143
const TString Database;
44+
TString DatabaseId;
45+
};
46+
47+
struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
48+
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
49+
: TEventWithDatabaseId(database)
50+
, OperationId(id)
51+
{}
52+
3253
const NOperationId::TOperationId OperationId;
3354
};
3455

@@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<T
4364
NYql::TIssues Issues;
4465
};
4566

46-
struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
47-
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
48-
: Database(database)
67+
struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
68+
TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
69+
: TEventWithDatabaseId(database)
4970
, OperationId(id)
50-
{
51-
}
71+
{}
5272

53-
TString Database;
5473
NOperationId::TOperationId OperationId;
5574
};
5675

@@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
97116
TMaybe<google::protobuf::Any> Metadata;
98117
};
99118

100-
struct TEvListScriptExecutionOperations : public NActors::TEventLocal<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
119+
struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
101120
TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken)
102-
: Database(database)
121+
: TEventWithDatabaseId(database)
103122
, PageSize(pageSize)
104123
, PageToken(pageToken)
105124
{}
106125

107-
TString Database;
108126
ui64 PageSize;
109127
TString PageToken;
110128
};
@@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKi
151169
struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
152170
};
153171

154-
struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
155-
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
156-
: Database(database)
172+
struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
173+
TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
174+
: TEventWithDatabaseId(database)
157175
, OperationId(id)
158-
{
159-
}
176+
{}
160177

161-
TString Database;
162178
NOperationId::TOperationId OperationId;
163179
};
164180

ydb/core/kqp/common/events/workload_service.h

+24-15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
44
#include <ydb/core/resource_pools/resource_pool_settings.h>
5+
#include <ydb/core/scheme/scheme_pathid.h>
56

67
#include <ydb/library/aclib/aclib.h>
78
#include <ydb/library/actors/core/event_local.h>
@@ -13,24 +14,24 @@
1314
namespace NKikimr::NKqp::NWorkload {
1415

1516
struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
16-
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
17-
: Database(database)
17+
TEvSubscribeOnPoolChanges(const TString& databaseId, const TString& poolId)
18+
: DatabaseId(databaseId)
1819
, PoolId(poolId)
1920
{}
2021

21-
const TString Database;
22+
const TString DatabaseId;
2223
const TString PoolId;
2324
};
2425

2526
struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
26-
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
27-
: Database(database)
27+
TEvPlaceRequestIntoPool(const TString& databaseId, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
28+
: DatabaseId(databaseId)
2829
, SessionId(sessionId)
2930
, PoolId(poolId)
3031
, UserToken(userToken)
3132
{}
3233

33-
const TString Database;
34+
const TString DatabaseId;
3435
const TString SessionId;
3536
TString PoolId; // Can be changed to default pool id
3637
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
@@ -51,15 +52,15 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
5152
};
5253

5354
struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
54-
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
55-
: Database(database)
55+
TEvCleanupRequest(const TString& databaseId, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
56+
: DatabaseId(databaseId)
5657
, SessionId(sessionId)
5758
, PoolId(poolId)
5859
, Duration(duration)
5960
, CpuConsumed(cpuConsumed)
6061
{}
6162

62-
const TString Database;
63+
const TString DatabaseId;
6364
const TString SessionId;
6465
const TString PoolId;
6566
const TDuration Duration;
@@ -77,27 +78,35 @@ struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqp
7778
};
7879

7980
struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWorkloadServiceEvents::EvUpdatePoolInfo> {
80-
TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
81-
: Database(database)
81+
TEvUpdatePoolInfo(const TString& databaseId, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
82+
: DatabaseId(databaseId)
8283
, PoolId(poolId)
8384
, Config(config)
8485
, SecurityObject(securityObject)
8586
{}
8687

87-
const TString Database;
88+
const TString DatabaseId;
8889
const TString PoolId;
8990
const std::optional<NResourcePool::TPoolSettings> Config;
9091
const std::optional<NACLib::TSecurityObject> SecurityObject;
9192
};
9293

93-
struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
94-
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
95-
: Database(database)
94+
struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, TKqpWorkloadServiceEvents::EvFetchDatabaseResponse> {
95+
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, const TString& databaseId, bool serverless, TPathId pathId, NYql::TIssues issues)
96+
: Status(status)
97+
, Database(database)
98+
, DatabaseId(databaseId)
9699
, Serverless(serverless)
100+
, PathId(pathId)
101+
, Issues(std::move(issues))
97102
{}
98103

104+
const Ydb::StatusIds::StatusCode Status;
99105
const TString Database;
106+
const TString DatabaseId;
100107
const bool Serverless;
108+
const TPathId PathId;
109+
const NYql::TIssues Issues;
101110
};
102111

103112
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/common/events/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ PEERDIR(
1616
ydb/core/kqp/common/shutdown
1717
ydb/core/kqp/common/compilation
1818
ydb/core/resource_pools
19+
ydb/core/scheme
1920

2021
ydb/library/yql/dq/actors
2122
ydb/public/api/protos

ydb/core/kqp/common/kqp_event_impl.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
9090
Record.MutableRequest()->SetPoolId(PoolId);
9191
}
9292

93+
if (!DatabaseId.empty()) {
94+
Record.MutableRequest()->SetDatabaseId(DatabaseId);
95+
}
96+
9397
Record.MutableRequest()->SetSessionId(SessionId);
9498
Record.MutableRequest()->SetAction(QueryAction);
9599
Record.MutableRequest()->SetType(QueryType);

ydb/core/kqp/common/kqp_user_request_context.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
namespace NKikimr::NKqp {
44

55
void TUserRequestContext::Out(IOutputStream& o) const {
6-
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
6+
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", DatabaseId: " << DatabaseId << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
77
}
88

99
void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) {
1010
resultMap["TraceId"] = ctx.TraceId;
1111
resultMap["Database"] = ctx.Database;
12+
resultMap["DatabaseId"] = ctx.DatabaseId;
1213
resultMap["SessionId"] = ctx.SessionId;
1314
resultMap["CurrentExecutionId"] = ctx.CurrentExecutionId;
1415
resultMap["CustomerSuppliedId"] = ctx.CustomerSuppliedId;

ydb/core/kqp/common/kqp_user_request_context.h

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace NKikimr::NKqp {
1111
struct TUserRequestContext : public TAtomicRefCount<TUserRequestContext> {
1212
TString TraceId;
1313
TString Database;
14+
TString DatabaseId;
1415
TString SessionId;
1516
TString CurrentExecutionId;
1617
TString CustomerSuppliedId;

ydb/core/kqp/common/simple/kqp_event_ids.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ struct TKqpEvents {
4444
EvListSessionsRequest,
4545
EvListSessionsResponse,
4646
EvListProxyNodesRequest,
47-
EvListProxyNodesResponse
47+
EvListProxyNodesResponse,
48+
EvUpdateDatabaseInfo,
49+
EvDelayedRequestError
4850
};
4951

5052
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
@@ -175,8 +177,8 @@ struct TKqpWorkloadServiceEvents {
175177
EvCleanupRequest,
176178
EvCleanupResponse,
177179
EvUpdatePoolInfo,
178-
EvUpdateDatabaseInfo,
179180
EvSubscribeOnPoolChanges,
181+
EvFetchDatabaseResponse,
180182
};
181183
};
182184

ydb/core/kqp/common/simple/query_id.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99

1010
namespace NKikimr::NKqp {
1111

12-
TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
12+
TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
1313
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
1414
const TGUCSettings& gUCSettings)
1515
: Cluster(cluster)
1616
, Database(database)
17+
, DatabaseId(databaseId)
1718
, Text(text)
1819
, Settings(settings)
1920
, QueryParameterTypes(queryParameterTypes)
@@ -41,6 +42,7 @@ bool TKqpQueryId::IsSql() const {
4142
bool TKqpQueryId::operator==(const TKqpQueryId& other) const {
4243
if (!(Cluster == other.Cluster &&
4344
Database == other.Database &&
45+
DatabaseId == other.DatabaseId &&
4446
UserSid == other.UserSid &&
4547
Text == other.Text &&
4648
Settings == other.Settings &&

ydb/core/kqp/common/simple/query_id.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace NKikimr::NKqp {
1313
struct TKqpQueryId {
1414
TString Cluster;
1515
TString Database;
16+
TString DatabaseId;
1617
TString UserSid;
1718
TString Text;
1819
TKqpQuerySettings Settings;
@@ -21,7 +22,7 @@ struct TKqpQueryId {
2122
TGUCSettings GUCSettings;
2223

2324
public:
24-
TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
25+
TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
2526
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
2627
const TGUCSettings& gUCSettings);
2728

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
265265
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
266266
std::make_shared<TKqpTableMetadataLoader>(
267267
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
268-
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
268+
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, QueryId.DatabaseId, std::move(loader),
269269
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
270270
Gateway->SetToken(QueryId.Cluster, UserToken);
271271

0 commit comments

Comments
 (0)