Skip to content

Commit 9c31292

Browse files
authored
Enable DDL in ExecuteScript. Allow not to specify TxControl in QueryService queries (#1603)
1 parent b75da8b commit 9c31292

File tree

6 files changed

+96
-9
lines changed

6 files changed

+96
-9
lines changed

ydb/core/grpc_services/query/rpc_execute_script.cpp

-4
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,6 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
5959
kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT);
6060
kqpRequest.MutableRequest()->SetKeepSession(false);
6161

62-
// TODO: Avoid explicit tx_control for script queries.
63-
kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
64-
kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true);
65-
6662
kqpRequest.MutableRequest()->SetCancelAfterMs(GetDuration(req.operation_params().cancel_after()).MilliSeconds());
6763
kqpRequest.MutableRequest()->SetTimeoutMs(GetDuration(req.operation_params().operation_timeout()).MilliSeconds());
6864

ydb/core/kqp/session_actor/kqp_query_state.cpp

+38
Original file line numberDiff line numberDiff line change
@@ -306,4 +306,42 @@ bool TKqpQueryState::HasErrors(const NSchemeCache::TSchemeCacheNavigate& respons
306306
return true;
307307
}
308308

309+
bool TKqpQueryState::HasImpliedTx() const {
310+
if (HasTxControl()) {
311+
return false;
312+
}
313+
314+
const NKikimrKqp::EQueryAction action = RequestEv->GetAction();
315+
if (action != NKikimrKqp::QUERY_ACTION_EXECUTE &&
316+
action != NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED)
317+
{
318+
return false;
319+
}
320+
321+
const NKikimrKqp::EQueryType queryType = RequestEv->GetType();
322+
if (queryType != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY &&
323+
queryType != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT &&
324+
queryType != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY)
325+
{
326+
return false;
327+
}
328+
329+
for (const auto& transactionPtr : PreparedQuery->GetTransactions()) {
330+
switch (transactionPtr->GetType()) {
331+
case NKqpProto::TKqpPhyTx::TYPE_GENERIC: // data transaction
332+
return true;
333+
case NKqpProto::TKqpPhyTx::TYPE_UNSPECIFIED:
334+
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
335+
case NKqpProto::TKqpPhyTx::TYPE_DATA: // data transaction, but not in QueryService API
336+
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
337+
case NKqpProto::TKqpPhyTx::TYPE_SCHEME:
338+
case NKqpProto::TKqpPhyTx_EType_TKqpPhyTx_EType_INT_MIN_SENTINEL_DO_NOT_USE_:
339+
case NKqpProto::TKqpPhyTx_EType_TKqpPhyTx_EType_INT_MAX_SENTINEL_DO_NOT_USE_:
340+
break;
341+
}
342+
}
343+
344+
return false;
345+
}
346+
309347
}

ydb/core/kqp/session_actor/kqp_query_state.h

+2
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,8 @@ class TKqpQueryState : public TNonCopyable {
362362
return RequestEv->HasTxControl();
363363
}
364364

365+
bool HasImpliedTx() const; // (only for QueryService API) user has not specified TxControl in the request. In this case we behave like Begin/Commit was specified.
366+
365367
const ::Ydb::Table::TransactionControl& GetTxControl() const {
366368
return RequestEv->GetTxControl();
367369
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+15-3
Original file line numberDiff line numberDiff line change
@@ -636,9 +636,21 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
636636
Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize());
637637
}
638638

639+
static const Ydb::Table::TransactionControl& GetImpliedTxControl() {
640+
auto create = []() -> Ydb::Table::TransactionControl {
641+
Ydb::Table::TransactionControl control;
642+
control.mutable_begin_tx()->mutable_serializable_read_write();
643+
control.set_commit_tx(true);
644+
return control;
645+
};
646+
static const Ydb::Table::TransactionControl control = create();
647+
return control;
648+
}
649+
639650
bool PrepareQueryTransaction() {
640-
if (QueryState->HasTxControl()) {
641-
const auto& txControl = QueryState->GetTxControl();
651+
const bool hasTxControl = QueryState->HasTxControl();
652+
if (hasTxControl || QueryState->HasImpliedTx()) {
653+
const auto& txControl = hasTxControl ? QueryState->GetTxControl() : GetImpliedTxControl();
642654

643655
QueryState->Commit = txControl.commit_tx();
644656
switch (txControl.tx_selector_case()) {
@@ -941,7 +953,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
941953
case NKqpProto::TKqpPhyTx::TYPE_SCHEME:
942954
YQL_ENSURE(tx->StagesSize() == 0);
943955

944-
if (QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) {
956+
if (QueryState->HasTxControl() && QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) {
945957
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
946958
"Scheme operations cannot be executed inside transaction");
947959
return true;

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

+40-1
Original file line numberDiff line numberDiff line change
@@ -1594,6 +1594,45 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
15941594
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
15951595
}
15961596

1597+
Y_UNIT_TEST(DdlExecuteScript) {
1598+
NKikimrConfig::TAppConfig appConfig;
1599+
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
1600+
auto setting = NKikimrKqp::TKqpSetting();
1601+
auto serverSettings = TKikimrSettings()
1602+
.SetAppConfig(appConfig)
1603+
.SetKqpSettings({setting})
1604+
.SetEnableScriptExecutionOperations(true);
1605+
1606+
TKikimrRunner kikimr(serverSettings);
1607+
auto db = kikimr.GetQueryClient();
1608+
1609+
const TString sql = R"sql(
1610+
CREATE TABLE TestDdlExecuteScript (
1611+
Key Uint64,
1612+
Value String,
1613+
PRIMARY KEY (Key)
1614+
);
1615+
)sql";
1616+
1617+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
1618+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
1619+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
1620+
1621+
NYdb::NOperation::TOperationClient client(kikimr.GetDriver());
1622+
TMaybe<NYdb::NQuery::TScriptExecutionOperation> readyOp;
1623+
while (true) {
1624+
auto op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).GetValueSync();
1625+
if (op.Ready()) {
1626+
readyOp = std::move(op);
1627+
break;
1628+
}
1629+
UNIT_ASSERT_C(op.Status().IsSuccess(), TStringBuilder() << op.Status().GetStatus() << ":" << op.Status().GetIssues().ToString());
1630+
Sleep(TDuration::MilliSeconds(10));
1631+
}
1632+
UNIT_ASSERT_C(readyOp->Status().IsSuccess(), readyOp->Status().GetIssues().ToString());
1633+
UNIT_ASSERT_EQUAL_C(readyOp->Metadata().ExecStatus, EExecStatus::Completed, readyOp->Status().GetIssues().ToString());
1634+
}
1635+
15971636
Y_UNIT_TEST(DdlMixedDml) {
15981637
NKikimrConfig::TAppConfig appConfig;
15991638
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
@@ -1626,7 +1665,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
16261665
UPSERT INTO KeyValue (Key, Value) VALUES (3, "Three");
16271666
SELECT * FROM KeyValue;
16281667
)", TTxControl::NoTx()).ExtractValueSync();
1629-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
1668+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
16301669
}
16311670

16321671
Y_UNIT_TEST(Tcl) {

ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
203203
}
204204

205205

206-
void ExecuteScriptWithStatsMode (Ydb::Query::StatsMode statsMode) {
206+
void ExecuteScriptWithStatsMode(Ydb::Query::StatsMode statsMode) {
207207
auto kikimr = DefaultKikimrRunner();
208208
auto db = kikimr.GetQueryClient();
209209

0 commit comments

Comments
 (0)