Skip to content

remove transactions in queries #421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions crates/connectors/ndc-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use ndc_postgres_configuration as configuration;
use query_engine_sql::sql;
use query_engine_translation::translation;

use crate::configuration_mapping;
use crate::error::convert;
use crate::error::record;
use crate::state;
Expand Down Expand Up @@ -73,11 +72,7 @@ fn plan_query(
) -> Result<sql::execution_plan::ExecutionPlan<sql::execution_plan::Query>, translation::error::Error>
{
let timer = state.metrics.time_query_plan();
let result = translation::query::translate(
&configuration.metadata,
configuration_mapping::convert_isolation_level(configuration.isolation_level),
query_request,
);
let result = translation::query::translate(&configuration.metadata, query_request);
timer.complete_with(result)
}

Expand Down
1 change: 1 addition & 0 deletions crates/query-engine/sql/src/sql/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct CommonTableExpression {
/// The 'body' side of a Common Table Expression
#[derive(Debug, Clone, PartialEq)]
pub enum CTExpr {
Select(Select),
RawSql(Vec<RawSql>),
Delete(Delete),
Insert(Insert),
Expand Down
3 changes: 3 additions & 0 deletions crates/query-engine/sql/src/sql/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ impl CommonTableExpression {
impl CTExpr {
pub fn to_sql(&self, sql: &mut SQL) {
match self {
CTExpr::Select(select) => {
select.to_sql(sql);
}
CTExpr::RawSql(raw_vec) => {
for item in raw_vec {
item.to_sql(sql);
Expand Down
8 changes: 2 additions & 6 deletions crates/query-engine/sql/src/sql/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,18 @@ pub fn explain_to_sql(explain: &sql::ast::Explain) -> sql::string::SQL {

/// A simple query execution plan with only a root field and a query.
pub fn simple_query_execution_plan(
isolation_level: sql::ast::transaction::IsolationLevel,
variables: Option<Vec<BTreeMap<String, serde_json::Value>>>,
root_field: String,
query: sql::ast::Select,
) -> ExecutionPlan<Query> {
ExecutionPlan {
pre: sql::helpers::begin(
isolation_level,
sql::ast::transaction::TransactionMode::ReadOnly,
),
pre: vec![],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more begin and commit for queries.

query: Query {
variables,
root_field,
query,
},
post: sql::helpers::commit(),
post: vec![],
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub fn normalize_order_by_element(mut element: OrderByElement) -> OrderByElement
/// Normalize the expression in a common table expression.
pub fn normalize_cte(mut cte: CommonTableExpression) -> CommonTableExpression {
cte.select = match cte.select {
CTExpr::Select(select) => CTExpr::Select(normalize_select(select)),
CTExpr::RawSql(raw_sqls) => CTExpr::RawSql(
raw_sqls
.into_iter()
Expand Down
36 changes: 22 additions & 14 deletions crates/query-engine/translation/src/translation/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,26 +315,17 @@ impl State {
sql::ast::TableReference::AliasedTable(alias)
}

/// Fetch the tracked native queries used in the query plan and their table alias.
pub fn get_native_queries(self) -> Vec<NativeQueryInfo> {
self.native_queries.native_queries
}

/// increment the table index and return the current one.
fn next_global_table_index(&mut self) -> TableAliasIndex {
let TableAliasIndex(index) = self.global_table_index;
self.global_table_index = TableAliasIndex(index + 1);
TableAliasIndex(index)
/// Fetch the tracked native queries used in the query plan and their table alias,
/// and the global table index.
pub fn get_native_queries_and_global_index(self) -> (Vec<NativeQueryInfo>, TableAliasIndex) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We break down the state so we can consume the native queries when processing native queries, and use the global table alias index outside of it when inventing names for the wrapping ctes.

(self.native_queries.native_queries, self.global_table_index)
}

// aliases

/// Create table aliases using this function so they get a unique index.
pub fn make_table_alias(&mut self, name: String) -> sql::ast::TableAlias {
sql::ast::TableAlias {
unique_index: self.next_global_table_index().0,
name,
}
self.global_table_index.make_table_alias(name)
}

/// Create a table alias for left outer join lateral part.
Expand Down Expand Up @@ -373,6 +364,23 @@ impl State {
}
}

impl TableAliasIndex {
/// increment the table index and return the current one.
fn next_global_table_index(&mut self) -> TableAliasIndex {
let index = self.0;
*self = TableAliasIndex(index + 1);
TableAliasIndex(index)
}

/// Create table aliases using this function so they get a unique index.
pub fn make_table_alias(&mut self, name: String) -> sql::ast::TableAlias {
sql::ast::TableAlias {
unique_index: self.next_global_table_index().0,
name,
}
}
}

impl NativeQueries {
fn new() -> NativeQueries {
NativeQueries {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ fn translate_native_query(

// add the procedure native query definition is a with clause.
select.with = sql::ast::With {
common_table_expressions: crate::translation::query::native_queries::translate(env, state)?,
common_table_expressions: crate::translation::query::native_queries::translate(env, state)?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutations do need to run native queries at the top-level, because they are mutations. So we don't wrap the ctes here.

.0,
};

// normalize ast
Expand Down
10 changes: 7 additions & 3 deletions crates/query-engine/translation/src/translation/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use query_engine_sql::sql;
/// Translate the incoming QueryRequest to an ExecutionPlan (SQL) to be run against the database.
pub fn translate(
metadata: &metadata::Metadata,
isolation_level: sql::ast::transaction::IsolationLevel,
query_request: models::QueryRequest,
) -> Result<sql::execution_plan::ExecutionPlan<sql::execution_plan::Query>, Error> {
let mut state = State::new();
Expand Down Expand Up @@ -66,7 +65,13 @@ pub fn translate(
&state.make_table_alias("universe_agg".to_string()),
// native queries if there are any
sql::ast::With {
common_table_expressions: native_queries::translate(&env, state)?,
common_table_expressions: {
let (ctes, mut global_table_index) = native_queries::translate(&env, state)?;
// wrap ctes in another cte to guard against mutations in queries
ctes.into_iter()
.map(|cte| native_queries::wrap_cte_in_cte(&mut global_table_index, cte))
.collect()
},
Comment on lines +68 to +74
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we wrap.

},
select_set,
);
Expand All @@ -75,7 +80,6 @@ pub fn translate(
let json_select = sql::rewrites::constant_folding::normalize_select(json_select);

Ok(sql::execution_plan::simple_query_execution_plan(
isolation_level,
query_request.variables,
query_request.collection,
json_select,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ use ndc_sdk::models;

use super::values;
use crate::translation::error::Error;
use crate::translation::helpers::{Env, State};
use crate::translation::helpers::{Env, State, TableAliasIndex};
use query_engine_metadata::metadata;
use query_engine_sql::sql;

/// Translate native queries collected in State by the translation proccess into CTEs.
pub fn translate(env: &Env, state: State) -> Result<Vec<sql::ast::CommonTableExpression>, Error> {
pub fn translate(
env: &Env,
state: State,
) -> Result<(Vec<sql::ast::CommonTableExpression>, TableAliasIndex), Error> {
let mut ctes = vec![];
let variables_table = env.get_variables_table();
let native_queries = state.get_native_queries();
let (native_queries, global_table_index) = state.get_native_queries_and_global_index();

// We need a 'State' value when translating variables in order
// to be able to generate fresh names for bound relational
Expand Down Expand Up @@ -72,5 +75,38 @@ pub fn translate(env: &Env, state: State) -> Result<Vec<sql::ast::CommonTableExp
});
}

Ok(ctes)
Ok((ctes, global_table_index))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We return the global table index here as well so it can be used when wrapping ctes, but the native queries with the state is consumed.

}

/// Wrap a CTE in another CTE so we can guard against mutations in queries.
pub fn wrap_cte_in_cte(
table_alias_index: &mut TableAliasIndex,
mut cte: sql::ast::CommonTableExpression,
) -> sql::ast::CommonTableExpression {
// This is the name the rest of the query knows, so we keep it on the outer parts.
let outer_cte_alias = cte.alias;

// this is going to be internal, so we replace the CTE.
cte.alias = table_alias_index.make_table_alias(outer_cte_alias.name.clone());

// build the internal `WITH <CTE> SELECT * FROM CTE as <nested_cte_alias>`.
let nested_cte_alias = table_alias_index.make_table_alias(outer_cte_alias.name.clone());

let nested_cte_select = sql::ast::CTExpr::Select({
let mut select = sql::helpers::star_select(sql::ast::From::Table {
reference: sql::ast::TableReference::AliasedTable(cte.alias.clone()),
alias: nested_cte_alias.clone(),
});
select.with = sql::ast::With {
common_table_expressions: vec![cte],
};
select
});

// wrap in another CTE.
sql::ast::CommonTableExpression {
alias: outer_cte_alias,
column_names: None,
select: nested_cte_select,
}
Comment on lines +81 to +111
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to wrap ctes.

}
9 changes: 3 additions & 6 deletions crates/query-engine/translation/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ use query_engine_sql::sql;
use query_engine_translation::translation;

pub fn test_translation(testname: &str) -> Result<String, translation::error::Error> {
test_query_translation(Default::default(), testname)
test_query_translation(testname)
}

/// Translate a query to SQL and compare against the snapshot.
pub fn test_query_translation(
isolation_level: sql::ast::transaction::IsolationLevel,
testname: &str,
) -> Result<String, translation::error::Error> {
pub fn test_query_translation(testname: &str) -> Result<String, translation::error::Error> {
let metadata_versioned = serde_json::from_str(
fs::read_to_string(format!("tests/goldenfiles/{}/tables.json", testname))
.unwrap()
Expand All @@ -29,7 +26,7 @@ pub fn test_query_translation(
)
.unwrap();

let plan = translation::query::translate(&metadata, isolation_level, request)?;
let plan = translation::query::translate(&metadata, request)?;

let mut sqls: Vec<String> = vec![];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
source: crates/query-engine/translation/tests/tests.rs
expression: result
---
BEGIN
ISOLATION LEVEL READ COMMITTED READ ONLY;

SELECT
coalesce(json_agg(row_to_json("%9_universe")), '[]') AS "universe"
FROM
Expand Down Expand Up @@ -74,6 +71,4 @@ FROM
) AS "%10_rows"
) AS "%9_universe";

COMMIT;

{}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
source: crates/query-engine/translation/tests/tests.rs
expression: result
---
BEGIN
ISOLATION LEVEL READ COMMITTED READ ONLY;

SELECT
coalesce(json_agg(row_to_json("%1_universe")), '[]') AS "universe"
FROM
Expand Down Expand Up @@ -38,6 +35,4 @@ FROM
) AS "%3_aggregates"
) AS "%1_universe";

COMMIT;

{}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
source: crates/query-engine/translation/tests/tests.rs
expression: result
---
BEGIN
ISOLATION LEVEL READ COMMITTED READ ONLY;

SELECT
coalesce(json_agg(row_to_json("%1_universe")), '[]') AS "universe"
FROM
Expand All @@ -25,6 +22,4 @@ FROM
) AS "%3_aggregates"
) AS "%1_universe";

COMMIT;

{}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
source: crates/query-engine/translation/tests/tests.rs
expression: result
---
BEGIN
ISOLATION LEVEL READ COMMITTED READ ONLY;

SELECT
coalesce(json_agg(row_to_json("%1_universe")), '[]') AS "universe"
FROM
Expand All @@ -25,6 +22,4 @@ FROM
) AS "%3_aggregates"
) AS "%1_universe";

COMMIT;

{}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
source: crates/query-engine/translation/tests/tests.rs
expression: result
---
BEGIN
ISOLATION LEVEL READ COMMITTED READ ONLY;

SELECT
coalesce(json_agg(row_to_json("%1_universe")), '[]') AS "universe"
FROM
Expand All @@ -27,6 +24,4 @@ FROM
) AS "%2_rows"
) AS "%1_universe";

COMMIT;

{}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
source: crates/query-engine/translation/tests/tests.rs
expression: result
---
BEGIN
ISOLATION LEVEL READ COMMITTED READ ONLY;

SELECT
coalesce(json_agg("%8_universe_agg"."universe"), '[]') AS "universe"
FROM
Expand All @@ -15,16 +12,22 @@ FROM
jsonb_to_recordset($1) AS "%0_%variables_table"("%variable_order" int, "%variables" jsonb)
CROSS JOIN LATERAL (
WITH "%2_NATIVE_QUERY_array_series" AS (
SELECT
3 as three,
array_agg(arr.series) AS series
FROM
(
SELECT
generate_series(
cast(
(
("%0_%variables_table"."%variables" -> $2) #>> cast(ARRAY [] as text[])) as int4),cast((("%0_%variables_table"."%variables" -> $3) #>> cast(ARRAY [] as text[])) as int4)) AS series) AS arr
WITH "%9_NATIVE_QUERY_array_series" AS (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an example of a wrapping.

SELECT
3 as three,
array_agg(arr.series) AS series
FROM
(
SELECT
generate_series(
cast(
(
("%0_%variables_table"."%variables" -> $2) #>> cast(ARRAY [] as text[])) as int4),cast((("%0_%variables_table"."%variables" -> $3) #>> cast(ARRAY [] as text[])) as int4)) AS series) AS arr
)
SELECT
*
FROM
"%9_NATIVE_QUERY_array_series" AS "%10_NATIVE_QUERY_array_series"
)
SELECT
*
Expand Down Expand Up @@ -52,8 +55,6 @@ FROM
(
"%3_array"."element" #>> cast(ARRAY [] as text[])) as int4)) AS "element" FROM jsonb_array_elements(("%0_%variables_table"."%variables" -> $4)) AS "%3_array"("element"))) AS "%4_in_subquery"("value")))) AS "%6_rows") AS "%6_rows") AS "%5_universe" ORDER BY "%0_%variables_table"."%variable_order" ASC ) AS "%8_universe_agg";

COMMIT;

{
1: Variable(
"%VARIABLES_OBJECT_PLACEHOLDER",
Expand Down
Loading