Skip to content

Commit 09731f3

Browse files
committed
YQ-3689 added kqp proxy database cache (ydb-platform#9644)
1 parent 7001722 commit 09731f3

20 files changed

+643
-89
lines changed

ydb/core/kqp/common/events/events.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,18 @@ struct TEvKqp {
111111
struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
112112
TEvScriptRequest() = default;
113113

114+
const TString& GetDatabase() const {
115+
return Record.GetRequest().GetDatabase();
116+
}
117+
118+
const TString& GetDatabaseId() const {
119+
return Record.GetRequest().GetDatabaseId();
120+
}
121+
122+
void SetDatabaseId(const TString& databaseId) {
123+
Record.MutableRequest()->SetDatabaseId(databaseId);
124+
}
125+
114126
mutable NKikimrKqp::TEvQueryRequest Record;
115127
TDuration ForgetAfter;
116128
TDuration ResultsTtl;
@@ -164,6 +176,40 @@ struct TEvKqp {
164176
return issues;
165177
}
166178
};
179+
180+
struct TEvUpdateDatabaseInfo : public TEventLocal<TEvUpdateDatabaseInfo, TKqpEvents::EvUpdateDatabaseInfo> {
181+
TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
182+
: Status(status)
183+
, Database(database)
184+
, Issues(std::move(issues))
185+
{}
186+
187+
TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless)
188+
: Status(Ydb::StatusIds::SUCCESS)
189+
, Database(database)
190+
, DatabaseId(databaseId)
191+
, Serverless(serverless)
192+
, Issues({})
193+
{}
194+
195+
Ydb::StatusIds::StatusCode Status;
196+
TString Database;
197+
TString DatabaseId;
198+
bool Serverless = false;
199+
NYql::TIssues Issues;
200+
};
201+
202+
struct TEvDelayedRequestError : public TEventLocal<TEvDelayedRequestError, TKqpEvents::EvDelayedRequestError> {
203+
TEvDelayedRequestError(THolder<IEventHandle> requestEvent, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
204+
: RequestEvent(std::move(requestEvent))
205+
, Status(status)
206+
, Issues(std::move(issues))
207+
{}
208+
209+
THolder<IEventHandle> RequestEvent;
210+
Ydb::StatusIds::StatusCode Status;
211+
NYql::TIssues Issues;
212+
};
167213
};
168214

169215
} // namespace NKikimr::NKqp

ydb/core/kqp/common/events/query.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,14 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
351351
return PoolConfig;
352352
}
353353

354+
const TString& GetDatabaseId() const {
355+
return DatabaseId ? DatabaseId : Record.GetRequest().GetDatabaseId();
356+
}
357+
358+
void SetDatabaseId(const TString& databaseId) {
359+
DatabaseId = databaseId;
360+
}
361+
354362
mutable NKikimrKqp::TEvQueryRequest Record;
355363

356364
private:
@@ -363,6 +371,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
363371
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
364372
TActorId RequestActorId;
365373
TString Database;
374+
TString DatabaseId;
366375
TString SessionId;
367376
TString YqlText;
368377
TString QueryId;

ydb/core/kqp/common/events/script_executions.h

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 {
2222
FS_ROLLBACK,
2323
};
2424

25-
struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
26-
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
25+
template <typename TEv, ui32 TEventType>
26+
struct TEventWithDatabaseId : public NActors::TEventLocal<TEv, TEventType> {
27+
TEventWithDatabaseId(const TString& database)
2728
: Database(database)
28-
, OperationId(id)
2929
{}
3030

31+
const TString& GetDatabase() const {
32+
return Database;
33+
}
34+
35+
const TString& GetDatabaseId() const {
36+
return DatabaseId;
37+
}
38+
39+
void SetDatabaseId(const TString& databaseId) {
40+
DatabaseId = databaseId;
41+
}
42+
3143
const TString Database;
44+
TString DatabaseId;
45+
};
46+
47+
struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
48+
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
49+
: TEventWithDatabaseId(database)
50+
, OperationId(id)
51+
{}
52+
3253
const NOperationId::TOperationId OperationId;
3354
};
3455

@@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<T
4364
NYql::TIssues Issues;
4465
};
4566

46-
struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
47-
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
48-
: Database(database)
67+
struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
68+
TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
69+
: TEventWithDatabaseId(database)
4970
, OperationId(id)
50-
{
51-
}
71+
{}
5272

53-
TString Database;
5473
NOperationId::TOperationId OperationId;
5574
};
5675

@@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
97116
TMaybe<google::protobuf::Any> Metadata;
98117
};
99118

100-
struct TEvListScriptExecutionOperations : public NActors::TEventLocal<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
119+
struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
101120
TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken)
102-
: Database(database)
121+
: TEventWithDatabaseId(database)
103122
, PageSize(pageSize)
104123
, PageToken(pageToken)
105124
{}
106125

107-
TString Database;
108126
ui64 PageSize;
109127
TString PageToken;
110128
};
@@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKi
151169
struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
152170
};
153171

154-
struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
155-
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
156-
: Database(database)
172+
struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
173+
TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
174+
: TEventWithDatabaseId(database)
157175
, OperationId(id)
158-
{
159-
}
176+
{}
160177

161-
TString Database;
162178
NOperationId::TOperationId OperationId;
163179
};
164180

ydb/core/kqp/common/events/workload_service.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
44
#include <ydb/core/resource_pools/resource_pool_settings.h>
5+
#include <ydb/core/scheme/scheme_pathid.h>
56

67
#include <ydb/library/aclib/aclib.h>
78
#include <ydb/library/actors/core/event_local.h>
@@ -90,14 +91,20 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
9091
const std::optional<NACLib::TSecurityObject> SecurityObject;
9192
};
9293

93-
struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
94-
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
95-
: Database(database)
94+
struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, TKqpWorkloadServiceEvents::EvFetchDatabaseResponse> {
95+
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues)
96+
: Status(status)
97+
, Database(database)
9698
, Serverless(serverless)
99+
, PathId(pathId)
100+
, Issues(std::move(issues))
97101
{}
98102

103+
const Ydb::StatusIds::StatusCode Status;
99104
const TString Database;
100105
const bool Serverless;
106+
const TPathId PathId;
107+
const NYql::TIssues Issues;
101108
};
102109

103110
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/common/events/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ PEERDIR(
1616
ydb/core/kqp/common/shutdown
1717
ydb/core/kqp/common/compilation
1818
ydb/core/resource_pools
19+
ydb/core/scheme
1920

2021
ydb/library/yql/dq/actors
2122
ydb/public/api/protos

ydb/core/kqp/common/kqp_event_impl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
9090
Record.MutableRequest()->SetPoolId(PoolId);
9191
}
9292

93+
if (!DatabaseId.empty()) {
94+
Record.MutableRequest()->SetDatabaseId(DatabaseId);
95+
}
96+
9397
Record.MutableRequest()->SetSessionId(SessionId);
9498
Record.MutableRequest()->SetAction(QueryAction);
9599
Record.MutableRequest()->SetType(QueryType);

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ struct TKqpEvents {
4444
EvListSessionsRequest,
4545
EvListSessionsResponse,
4646
EvListProxyNodesRequest,
47-
EvListProxyNodesResponse
47+
EvListProxyNodesResponse,
48+
EvUpdateDatabaseInfo,
49+
EvDelayedRequestError
4850
};
4951

5052
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
@@ -175,8 +177,8 @@ struct TKqpWorkloadServiceEvents {
175177
EvCleanupRequest,
176178
EvCleanupResponse,
177179
EvUpdatePoolInfo,
178-
EvUpdateDatabaseInfo,
179180
EvSubscribeOnPoolChanges,
181+
EvFetchDatabaseResponse,
180182
};
181183
};
182184

ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include <ydb/core/base/path.h>
55
#include <ydb/core/cms/console/configs_dispatcher.h>
66
#include <ydb/core/kqp/workload_service/actors/actors.h>
7-
#include <ydb/core/kqp/workload_service/common/events.h>
87
#include <ydb/core/protos/console_config.pb.h>
98
#include <ydb/core/resource_pools/resource_pool_classifier_settings.h>
109

@@ -20,6 +19,31 @@ using namespace NResourcePool;
2019
using namespace NWorkload;
2120

2221

22+
struct TEvPrivate {
23+
// Event ids
24+
enum EEv : ui32 {
25+
EvRanksCheckerResponse = EventSpaceBegin(TEvents::ES_PRIVATE),
26+
27+
EvEnd
28+
};
29+
30+
static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
31+
32+
struct TEvRanksCheckerResponse : public TEventLocal<TEvRanksCheckerResponse, EvRanksCheckerResponse> {
33+
TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues)
34+
: Status(status)
35+
, MaxRank(maxRank)
36+
, NumberClassifiers(numberClassifiers)
37+
, Issues(std::move(issues))
38+
{}
39+
40+
const Ydb::StatusIds::StatusCode Status;
41+
const i64 MaxRank;
42+
const ui64 NumberClassifiers;
43+
const NYql::TIssues Issues;
44+
};
45+
};
46+
2347
class TRanksCheckerActor : public NKikimr::TQueryBase {
2448
using TBase = NKikimr::TQueryBase;
2549

@@ -177,7 +201,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou
177201
TryFinish();
178202
}
179203

180-
void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
204+
void Handle(TEvFetchDatabaseResponse::TPtr& ev) {
181205
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
182206
FailAndPassAway("Database check failed", ev->Get()->Status, ev->Get()->Issues);
183207
return;
@@ -223,7 +247,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou
223247

224248
STRICT_STFUNC(StateFunc,
225249
hFunc(TEvPrivate::TEvRanksCheckerResponse, Handle);
226-
hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle);
250+
hFunc(TEvFetchDatabaseResponse, Handle);
227251
hFunc(TEvents::TEvUndelivered, Handle);
228252
hFunc(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse, Handle);
229253
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle)

0 commit comments

Comments
 (0)