6
6
#include < ydb/core/tx/tx_proxy/proxy.h>
7
7
#include < ydb/core/kqp/session_actor/kqp_worker_common.h>
8
8
#include < ydb/core/tx/schemeshard/schemeshard_build_index.h>
9
+ #include < ydb/services/metadata/abstract/kqp_common.h>
9
10
10
11
namespace NKikimr ::NKqp {
11
12
@@ -49,7 +50,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
49
50
return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
50
51
}
51
52
52
- TKqpSchemeExecuter (TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
53
+ TKqpSchemeExecuter (TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
53
54
const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken,
54
55
bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx)
55
56
: PhyTx(phyTx)
@@ -69,13 +70,11 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
69
70
}
70
71
71
72
void StartBuildOperation () {
72
- const auto & schemeOp = PhyTx->GetSchemeOperation ();
73
- auto buildOp = schemeOp.GetBuildOperation ();
74
73
Send (MakeTxProxyID (), new TEvTxUserProxy::TEvAllocateTxId);
75
- Become (&TKqpSchemeExecuter::ExecuteState);
74
+ Become (&TKqpSchemeExecuter::ExecuteState);
76
75
}
77
76
78
- void Bootstrap () {
77
+ void MakeSchemeOperationRequest () {
79
78
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
80
79
81
80
auto ev = MakeHolder<TRequest>();
@@ -124,30 +123,44 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
124
123
}
125
124
126
125
case NKqpProto::TKqpSchemeOperation::kAlterTable : {
127
- auto modifyScheme = schemeOp.GetAlterTable ();
126
+ const auto & modifyScheme = schemeOp.GetAlterTable ();
128
127
ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
129
128
break ;
130
129
}
131
130
132
131
case NKqpProto::TKqpSchemeOperation::kBuildOperation : {
133
- auto buildOp = schemeOp.GetBuildOperation ();
134
132
return StartBuildOperation ();
135
133
}
136
134
137
135
case NKqpProto::TKqpSchemeOperation::kCreateUser : {
138
- auto modifyScheme = schemeOp.GetCreateUser ();
136
+ const auto & modifyScheme = schemeOp.GetCreateUser ();
139
137
ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
140
138
break ;
141
139
}
142
140
143
141
case NKqpProto::TKqpSchemeOperation::kAlterUser : {
144
- auto modifyScheme = schemeOp.GetAlterUser ();
142
+ const auto & modifyScheme = schemeOp.GetAlterUser ();
145
143
ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
146
144
break ;
147
145
}
148
146
149
147
case NKqpProto::TKqpSchemeOperation::kDropUser : {
150
- auto modifyScheme = schemeOp.GetDropUser ();
148
+ const auto & modifyScheme = schemeOp.GetDropUser ();
149
+ ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
150
+ break ;
151
+ }
152
+ case NKqpProto::TKqpSchemeOperation::kCreateExternalTable : {
153
+ const auto & modifyScheme = schemeOp.GetCreateExternalTable ();
154
+ ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
155
+ break ;
156
+ }
157
+ case NKqpProto::TKqpSchemeOperation::kAlterExternalTable : {
158
+ const auto & modifyScheme = schemeOp.GetAlterExternalTable ();
159
+ ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
160
+ break ;
161
+ }
162
+ case NKqpProto::TKqpSchemeOperation::kDropExternalTable : {
163
+ const auto & modifyScheme = schemeOp.GetDropExternalTable ();
151
164
ev->Record .MutableTransaction ()->MutableModifyScheme ()->CopyFrom (modifyScheme);
152
165
break ;
153
166
}
@@ -191,7 +204,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
191
204
auto promise = NewPromise<IKqpGateway::TGenericResult>();
192
205
193
206
bool successOnNotExist = false ;
194
- bool failedOnAlreadyExists = false ;
207
+ bool failedOnAlreadyExists = false ;
195
208
// exists/not exists semantics supported only in the query service.
196
209
if (IsQueryService ()) {
197
210
successOnNotExist = ev->Record .GetTransaction ().GetModifyScheme ().GetSuccessOnNotExist ();
@@ -218,6 +231,57 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
218
231
Become (&TKqpSchemeExecuter::ExecuteState);
219
232
}
220
233
234
+ void MakeObjectRequest () {
235
+ const auto & schemeOp = PhyTx->GetSchemeOperation ();
236
+ NMetadata::IClassBehaviour::TPtr cBehaviour (NMetadata::IClassBehaviour::TFactory::Construct (schemeOp.GetObjectType ()));
237
+ if (!cBehaviour) {
238
+ InternalError (TStringBuilder () << " Unsupported object type: \" " << schemeOp.GetObjectType () << " \" " );
239
+ return ;
240
+ }
241
+
242
+ if (!cBehaviour->GetOperationsManager ()) {
243
+ InternalError (TStringBuilder () << " Object type \" " << schemeOp.GetObjectType () << " \" does not have manager for operations" );
244
+ }
245
+
246
+ auto * actorSystem = TActivationContext::ActorSystem ();
247
+ auto selfId = SelfId ();
248
+
249
+ NMetadata::NModifications::IOperationsManager::TExternalModificationContext context;
250
+ context.SetDatabase (Database);
251
+ context.SetActorSystem (actorSystem);
252
+ if (UserToken) {
253
+ context.SetUserToken (*UserToken);
254
+ }
255
+
256
+ auto resultFuture = cBehaviour->GetOperationsManager ()->ExecutePrepared (schemeOp, cBehaviour, context);
257
+
258
+ using TResultFuture = NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus>;
259
+ resultFuture.Subscribe ([actorSystem, selfId](const TResultFuture& f) {
260
+ const auto & status = f.GetValue ();
261
+ auto ev = MakeHolder<TEvPrivate::TEvResult>();
262
+ if (status.Ok ()) {
263
+ ev->Result .SetSuccess ();
264
+ } else {
265
+ ev->Result .SetStatus (status.GetStatus ());
266
+ if (TString message = status.GetErrorMessage ()) {
267
+ ev->Result .AddIssue (NYql::TIssue{message});
268
+ }
269
+ }
270
+ actorSystem->Send (selfId, ev.Release ());
271
+ });
272
+
273
+ Become (&TKqpSchemeExecuter::ObjectExecuteState);
274
+ }
275
+
276
+ void Bootstrap () {
277
+ const auto & schemeOp = PhyTx->GetSchemeOperation ();
278
+ if (schemeOp.GetObjectType ()) {
279
+ MakeObjectRequest ();
280
+ } else {
281
+ MakeSchemeOperationRequest ();
282
+ }
283
+ }
284
+
221
285
public:
222
286
STATEFN (ExecuteState) {
223
287
try {
@@ -240,6 +304,19 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
240
304
}
241
305
}
242
306
307
+ STATEFN (ObjectExecuteState) {
308
+ try {
309
+ switch (ev->GetTypeRewrite ()) {
310
+ hFunc (TEvPrivate::TEvResult, HandleExecute);
311
+ hFunc (TEvKqp::TEvAbortExecution, HandleAbortExecution);
312
+ default :
313
+ UnexpectedEvent (" ObjectExecuteState" , ev->GetTypeRewrite ());
314
+ }
315
+ } catch (const yexception& e) {
316
+ InternalError (e.what ());
317
+ }
318
+ }
319
+
243
320
244
321
void Handle (TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
245
322
const auto * msg = ev->Get ();
@@ -250,18 +327,18 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
250
327
251
328
void Navigate (const TActorId& schemeCache) {
252
329
const auto & schemeOp = PhyTx->GetSchemeOperation ();
253
- auto buildOp = schemeOp.GetBuildOperation ();
330
+ const auto & buildOp = schemeOp.GetBuildOperation ();
254
331
const auto & path = buildOp.source_path ();
255
332
256
333
const auto paths = NKikimr::SplitPath (path);
257
334
if (paths.empty ()) {
258
335
TString error = TStringBuilder () << " Failed to split table path " << path;
259
336
return ReplyErrorAndDie (Ydb::StatusIds::BAD_REQUEST, NYql::TIssue (error));
260
337
}
261
-
338
+
262
339
auto request = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
263
340
264
- request->DatabaseName = Database;
341
+ request->DatabaseName = Database;
265
342
auto & entry = request->ResultSet .emplace_back ();
266
343
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
267
344
entry.Path = ::NKikimr::SplitPath (path);
@@ -312,7 +389,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
312
389
}
313
390
314
391
const auto & schemeOp = PhyTx->GetSchemeOperation ();
315
- auto buildOp = schemeOp.GetBuildOperation ();
392
+ const auto & buildOp = schemeOp.GetBuildOperation ();
316
393
SetSchemeShardId (domainInfo->ExtractSchemeShard ());
317
394
auto req = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvCreateRequest>(TxId, Database, buildOp);
318
395
ForwardToSchemeShard (std::move (req));
0 commit comments