diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 86cd942d3290..a1de6537e7d2 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -59,10 +59,6 @@ std::tuple FillKqpRequest( kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT); kqpRequest.MutableRequest()->SetKeepSession(false); - // TODO: Avoid explicit tx_control for script queries. - kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true); - kqpRequest.MutableRequest()->SetCancelAfterMs(GetDuration(req.operation_params().cancel_after()).MilliSeconds()); kqpRequest.MutableRequest()->SetTimeoutMs(GetDuration(req.operation_params().operation_timeout()).MilliSeconds()); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 74571ae2f8cc..7ea7dfa77bfe 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -306,4 +306,42 @@ bool TKqpQueryState::HasErrors(const NSchemeCache::TSchemeCacheNavigate& respons return true; } +bool TKqpQueryState::HasImpliedTx() const { + if (HasTxControl()) { + return false; + } + + const NKikimrKqp::EQueryAction action = RequestEv->GetAction(); + if (action != NKikimrKqp::QUERY_ACTION_EXECUTE && + action != NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED) + { + return false; + } + + const NKikimrKqp::EQueryType queryType = RequestEv->GetType(); + if (queryType != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY && + queryType != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT && + queryType != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY) + { + return false; + } + + for (const auto& transactionPtr : PreparedQuery->GetTransactions()) { + switch (transactionPtr->GetType()) { + case NKqpProto::TKqpPhyTx::TYPE_GENERIC: // data transaction + return true; + case NKqpProto::TKqpPhyTx::TYPE_UNSPECIFIED: + case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: + case NKqpProto::TKqpPhyTx::TYPE_DATA: // data transaction, but not in QueryService API + case NKqpProto::TKqpPhyTx::TYPE_SCAN: + case NKqpProto::TKqpPhyTx::TYPE_SCHEME: + case NKqpProto::TKqpPhyTx_EType_TKqpPhyTx_EType_INT_MIN_SENTINEL_DO_NOT_USE_: + case NKqpProto::TKqpPhyTx_EType_TKqpPhyTx_EType_INT_MAX_SENTINEL_DO_NOT_USE_: + break; + } + } + + return false; +} + } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 4bcc4cc47fee..7766ddd70116 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -362,6 +362,8 @@ class TKqpQueryState : public TNonCopyable { return RequestEv->HasTxControl(); } + bool HasImpliedTx() const; // (only for QueryService API) user has not specified TxControl in the request. In this case we behave like Begin/Commit was specified. + const ::Ydb::Table::TransactionControl& GetTxControl() const { return RequestEv->GetTxControl(); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 00bb19061486..e9735f3bc723 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -636,9 +636,21 @@ class TKqpSessionActor : public TActorBootstrapped { Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize()); } + static const Ydb::Table::TransactionControl& GetImpliedTxControl() { + auto create = []() -> Ydb::Table::TransactionControl { + Ydb::Table::TransactionControl control; + control.mutable_begin_tx()->mutable_serializable_read_write(); + control.set_commit_tx(true); + return control; + }; + static const Ydb::Table::TransactionControl control = create(); + return control; + } + bool PrepareQueryTransaction() { - if (QueryState->HasTxControl()) { - const auto& txControl = QueryState->GetTxControl(); + const bool hasTxControl = QueryState->HasTxControl(); + if (hasTxControl || QueryState->HasImpliedTx()) { + const auto& txControl = hasTxControl ? QueryState->GetTxControl() : GetImpliedTxControl(); QueryState->Commit = txControl.commit_tx(); switch (txControl.tx_selector_case()) { @@ -941,7 +953,7 @@ class TKqpSessionActor : public TActorBootstrapped { case NKqpProto::TKqpPhyTx::TYPE_SCHEME: YQL_ENSURE(tx->StagesSize() == 0); - if (QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) { + if (QueryState->HasTxControl() && QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, "Scheme operations cannot be executed inside transaction"); return true; diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 460fa094bef0..e6665f5947c7 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -1321,6 +1321,45 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); } + Y_UNIT_TEST(DdlExecuteScript) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}) + .SetEnableScriptExecutionOperations(true); + + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetQueryClient(); + + const TString sql = R"sql( + CREATE TABLE TestDdlExecuteScript ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )sql"; + + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NOperation::TOperationClient client(kikimr.GetDriver()); + TMaybe readyOp; + while (true) { + auto op = client.Get(scriptExecutionOperation.Id()).GetValueSync(); + if (op.Ready()) { + readyOp = std::move(op); + break; + } + UNIT_ASSERT_C(op.Status().IsSuccess(), TStringBuilder() << op.Status().GetStatus() << ":" << op.Status().GetIssues().ToString()); + Sleep(TDuration::MilliSeconds(10)); + } + UNIT_ASSERT_C(readyOp->Status().IsSuccess(), readyOp->Status().GetIssues().ToString()); + UNIT_ASSERT_EQUAL_C(readyOp->Metadata().ExecStatus, EExecStatus::Completed, readyOp->Status().GetIssues().ToString()); + } + Y_UNIT_TEST(DdlMixedDml) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); @@ -1353,7 +1392,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UPSERT INTO KeyValue (Key, Value) VALUES (3, "Three"); SELECT * FROM KeyValue; )", TTxControl::NoTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } Y_UNIT_TEST(Tcl) { diff --git a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp index a91c4e4d3504..5fdb3ed11809 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp @@ -203,7 +203,7 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) { } - void ExecuteScriptWithStatsMode (Ydb::Query::StatsMode statsMode) { + void ExecuteScriptWithStatsMode(Ydb::Query::StatsMode statsMode) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient();