Skip to content

Commit 86cc780

Browse files
author
Sean Loiselle
committed
sql: improve PG subsource name resolution errors
Include a few improvements to resolving table names when creating PG sources, including: Duplicate subsource names are easy to encounter, but the error they produced didn't provide any context around how to fix the issue. We can instead produce an error that gives users more guidance. FOR SCHEMAS (..) didn't error if a referenced schema didn't identify any tables in the publication; this seems like a potential source of confusion for users. Other misc. simplifications.
1 parent 681e78d commit 86cc780

8 files changed

+172
-47
lines changed

src/sql/src/plan/error.rs

+30-7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use mz_expr::EvalError;
2020
use mz_ore::error::ErrorExt;
2121
use mz_ore::stack::RecursionLimitError;
2222
use mz_ore::str::{separated, StrExt};
23+
use mz_postgres_util::PostgresError;
2324
use mz_repr::adt::char::InvalidCharLengthError;
2425
use mz_repr::adt::numeric::InvalidNumericMaxScaleError;
2526
use mz_repr::adt::system::Oid;
@@ -107,7 +108,7 @@ pub enum PlanError {
107108
schema_lookup: String,
108109
cause: Arc<dyn Error + Send + Sync>,
109110
},
110-
FetchingPostgresPublicationInfoFailed {
111+
PostgresConnectionErr {
111112
cause: Arc<mz_postgres_util::PostgresError>,
112113
},
113114
InvalidProtobufSchema {
@@ -154,6 +155,14 @@ pub enum PlanError {
154155
cluster_name: String,
155156
linked_object_name: String,
156157
},
158+
EmptyPublication(String),
159+
DuplicateSubsourceReference {
160+
name: UnresolvedItemName,
161+
upstream_references: Vec<UnresolvedItemName>,
162+
},
163+
PostgresDatabaseMissingFilteredSchemas {
164+
schemas: Vec<String>,
165+
},
157166
// TODO(benesch): eventually all errors should be structured.
158167
Unstructured(String),
159168
}
@@ -169,9 +178,7 @@ impl PlanError {
169178
pub fn detail(&self) -> Option<String> {
170179
match self {
171180
Self::FetchingCsrSchemaFailed { cause, .. } => Some(cause.to_string_with_causes()),
172-
Self::FetchingPostgresPublicationInfoFailed { cause } => {
173-
Some(cause.to_string_with_causes())
174-
}
181+
Self::PostgresConnectionErr { cause } => Some(cause.to_string_with_causes()),
175182
Self::InvalidProtobufSchema { cause } => Some(cause.to_string_with_causes()),
176183
Self::InvalidOptionValue { err, .. } => err.detail(),
177184
_ => None,
@@ -205,7 +212,7 @@ impl PlanError {
205212
as text."
206213
.into(),
207214
),
208-
Self::FetchingPostgresPublicationInfoFailed { cause } => {
215+
Self::PostgresConnectionErr { cause } => {
209216
if let Some(cause) = cause.source() {
210217
if let Some(cause) = cause.downcast_ref::<io::Error>() {
211218
if cause.kind() == io::ErrorKind::TimedOut {
@@ -239,6 +246,9 @@ impl PlanError {
239246
let supported_azs_str = supported_azs.iter().join("\n ");
240247
Some(format!("Did you supply an availability zone name instead of an ID? Known availability zone IDs:\n {}", supported_azs_str))
241248
}
249+
Self::DuplicateSubsourceReference { .. } => {
250+
Some("Specify target table names using FOR TABLES (foo AS bar), or limit the upstream tables using FOR SCHEMAS (foo)".into())
251+
}
242252
_ => None,
243253
}
244254
}
@@ -358,8 +368,8 @@ impl fmt::Display for PlanError {
358368
Self::FetchingCsrSchemaFailed { schema_lookup, .. } => {
359369
write!(f, "failed to fetch schema {schema_lookup} from schema registry")
360370
}
361-
Self::FetchingPostgresPublicationInfoFailed { .. } => {
362-
write!(f, "failed to fetch publication information from PostgreSQL database")
371+
Self::PostgresConnectionErr { .. } => {
372+
write!(f, "failed to connect to PostgreSQL database")
363373
}
364374
Self::InvalidProtobufSchema { .. } => {
365375
write!(f, "invalid protobuf schema")
@@ -401,6 +411,13 @@ impl fmt::Display for PlanError {
401411
Self::InvalidSchemaName => write!(f, "no schema has been selected to create in"),
402412
Self::ItemAlreadyExists { name, item_type } => write!(f, "{item_type} {} already exists", name.quoted()),
403413
Self::ModifyLinkedCluster {cluster_name, ..} => write!(f, "cannot modify linked cluster {}", cluster_name.quoted()),
414+
Self::EmptyPublication(publication) => write!(f, "PostgreSQL PUBLICATION {publication} is empty"),
415+
Self::DuplicateSubsourceReference { name, upstream_references } => {
416+
write!(f, "multiple tables with name {}: {}", name.to_ast_string_stable(), itertools::join(upstream_references.iter().map(|n| n.to_ast_string_stable()), ", "))
417+
},
418+
Self::PostgresDatabaseMissingFilteredSchemas { schemas} => {
419+
write!(f, "FOR SCHEMAS (..) included {}, but PostgreSQL database has no schema with that name", itertools::join(schemas.iter(), ", "))
420+
}
404421
}
405422
}
406423
}
@@ -474,6 +491,12 @@ impl From<ParserError> for PlanError {
474491
}
475492
}
476493

494+
impl From<PostgresError> for PlanError {
495+
fn from(e: PostgresError) -> PlanError {
496+
PlanError::PostgresConnectionErr { cause: Arc::new(e) }
497+
}
498+
}
499+
477500
struct ColumnDisplay<'a> {
478501
table: &'a Option<PartialItemName>,
479502
column: &'a ColumnName,

src/sql/src/pure.rs

+53-17
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::path::Path;
1717
use std::sync::Arc;
1818

1919
use anyhow::anyhow;
20+
use itertools::Itertools;
2021
use mz_repr::adt::system::Oid;
2122
use prost::Message;
2223
use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
@@ -291,11 +292,11 @@ pub async fn purify_create_source(
291292
.config(&*connection_context.secrets_reader)
292293
.await?;
293294
let publication_tables =
294-
mz_postgres_util::publication_info(&config, &publication, None)
295-
.await
296-
.map_err(|cause| PlanError::FetchingPostgresPublicationInfoFailed {
297-
cause: Arc::new(cause),
298-
})?;
295+
mz_postgres_util::publication_info(&config, &publication, None).await?;
296+
297+
if publication_tables.is_empty() {
298+
return Err(PlanError::EmptyPublication(publication));
299+
}
299300

300301
// An index from table name -> schema name -> database name -> PostgresTableDesc
301302
let mut tables_by_name = BTreeMap::new();
@@ -316,9 +317,6 @@ pub async fn purify_create_source(
316317
let mut validated_requested_subsources = vec![];
317318
match referenced_subsources {
318319
Some(ReferencedSubsources::All) => {
319-
if publication_tables.is_empty() {
320-
sql_bail!("FOR ALL TABLES is only valid for non-empty publications");
321-
}
322320
for table in &publication_tables {
323321
let upstream_name = UnresolvedItemName::qualified(&[
324322
&connection.database,
@@ -330,26 +328,41 @@ pub async fn purify_create_source(
330328
}
331329
}
332330
Some(ReferencedSubsources::SubsetSchemas(schemas)) => {
333-
let schemas: BTreeSet<_> = schemas.iter().map(|s| s.as_str()).collect();
331+
let available_schemas: BTreeSet<_> = mz_postgres_util::get_schemas(&config)
332+
.await?
333+
.into_iter()
334+
.map(|s| s.name)
335+
.collect();
336+
337+
let requested_schemas: BTreeSet<_> =
338+
schemas.iter().map(|s| s.as_str().to_string()).collect();
339+
340+
let missing_schemas: Vec<_> = requested_schemas
341+
.difference(&available_schemas)
342+
.map(|s| s.to_string())
343+
.collect();
344+
345+
if !missing_schemas.is_empty() {
346+
return Err(PlanError::PostgresDatabaseMissingFilteredSchemas {
347+
schemas: missing_schemas,
348+
});
349+
}
350+
334351
for table in &publication_tables {
335-
if !schemas.contains(table.namespace.as_str()) {
352+
if !requested_schemas.contains(table.namespace.as_str()) {
336353
continue;
337354
}
338355

339-
let upstream_name = UnresolvedObjectName::qualified(&[
356+
let upstream_name = UnresolvedItemName::qualified(&[
340357
&connection.database,
341358
&table.namespace,
342359
&table.name,
343360
]);
344-
let subsource_name = UnresolvedObjectName::unqualified(&table.name);
361+
let subsource_name = UnresolvedItemName::unqualified(&table.name);
345362
validated_requested_subsources.push((upstream_name, subsource_name, table));
346363
}
347364
}
348365
Some(ReferencedSubsources::SubsetTables(subsources)) => {
349-
// TODO: can subsources be empty?
350-
if publication_tables.is_empty() {
351-
sql_bail!("FOR TABLES (..) is only valid for non-empty publications");
352-
}
353366
// The user manually selected a subset of upstream tables so we need to
354367
// validate that the names actually exist and are not ambiguous
355368

@@ -366,11 +379,34 @@ pub async fn purify_create_source(
366379

367380
if validated_requested_subsources.is_empty() {
368381
sql_bail!(
369-
"Postgres source must ingest at least one table, but {} matched none",
382+
"[internal error]: Postgres source must ingest at least one table, but {} matched none",
370383
referenced_subsources.as_ref().unwrap().to_ast_string()
371384
);
372385
}
373386

387+
// This condition would get caught during the catalog transaction, but produces a
388+
// vague, non-contextual error. Instead, error here so we can suggest to the user
389+
// how to fix the problem.
390+
if let Some(name) = validated_requested_subsources
391+
.iter()
392+
.map(|(_, subsource_name, _)| subsource_name)
393+
.duplicates()
394+
.next()
395+
.cloned()
396+
{
397+
let mut upstream_references: Vec<_> = validated_requested_subsources
398+
.into_iter()
399+
.filter_map(|(u, t, _)| if t == name { Some(u) } else { None })
400+
.collect();
401+
402+
upstream_references.sort();
403+
404+
return Err(PlanError::DuplicateSubsourceReference {
405+
name,
406+
upstream_references,
407+
});
408+
}
409+
374410
let mut text_cols_dict: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
375411

376412
for name in text_columns.iter_mut() {

test/pg-cdc/pg-cdc.td

+14-16
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ $ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=mat
153153
! CREATE SOURCE "no_such_host"
154154
FROM POSTGRES CONNECTION no_such_host (PUBLICATION 'mz_source');
155155
# TODO: assert on `detail` here.
156-
exact:failed to fetch publication information from PostgreSQL database
156+
exact:failed to connect to PostgreSQL database
157157

158158
> CREATE CONNECTION no_such_port TO POSTGRES (
159159
HOST postgres,
@@ -165,7 +165,7 @@ exact:failed to fetch publication information from PostgreSQL database
165165
! CREATE SOURCE "no_such_port"
166166
FROM POSTGRES CONNECTION no_such_port (PUBLICATION 'mz_source');
167167
# TODO: assert on `detail` here.
168-
exact:failed to fetch publication information from PostgreSQL database
168+
exact:failed to connect to PostgreSQL database
169169

170170
> CREATE CONNECTION no_such_user TO POSTGRES (
171171
HOST postgres,
@@ -176,7 +176,7 @@ exact:failed to fetch publication information from PostgreSQL database
176176
! CREATE SOURCE "no_such_user"
177177
FROM POSTGRES CONNECTION no_such_user (PUBLICATION 'mz_source');
178178
# TODO: assert on `detail` here.
179-
exact:failed to fetch publication information from PostgreSQL database
179+
exact:failed to connect to PostgreSQL database
180180

181181
> CREATE SECRET badpass AS 'badpass'
182182
> CREATE CONNECTION no_such_password TO POSTGRES (
@@ -188,7 +188,7 @@ exact:failed to fetch publication information from PostgreSQL database
188188
! CREATE SOURCE "no_such_password"
189189
FROM POSTGRES CONNECTION no_such_password (PUBLICATION 'mz_source');
190190
# TODO: assert on `detail` here.
191-
exact:failed to fetch publication information from PostgreSQL database
191+
exact:failed to connect to PostgreSQL database
192192

193193
> CREATE CONNECTION no_such_dbname TO POSTGRES (
194194
HOST postgres,
@@ -199,12 +199,12 @@ exact:failed to fetch publication information from PostgreSQL database
199199
! CREATE SOURCE "no_such_dbname"
200200
FROM POSTGRES CONNECTION no_such_dbname (PUBLICATION 'mz_source');
201201
# TODO: assert on `detail` here.
202-
exact:failed to fetch publication information from PostgreSQL database
202+
exact:failed to connect to PostgreSQL database
203203

204204
! CREATE SOURCE "no_such_publication"
205205
FROM POSTGRES CONNECTION pg (PUBLICATION 'no_such_publication');
206206
# TODO: assert on `detail` here.
207-
exact:failed to fetch publication information from PostgreSQL database
207+
exact:failed to connect to PostgreSQL database
208208

209209
! CREATE SOURCE "mz_source"
210210
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
@@ -221,11 +221,6 @@ regex: the following columns contain unsupported types:\npostgres.public.enum_ta
221221
);
222222
regex: the following columns contain unsupported types:\npostgres\.public\.another_enum_table\."колона" \(OID \d+\)\npostgres\.public\.enum_table\.a \(OID \d+\)
223223

224-
! CREATE SOURCE "mz_source"
225-
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
226-
FOR SCHEMAS (dne);
227-
regex: Postgres source must ingest at least one table, but FOR SCHEMAS \(dne\) matched none
228-
229224
#
230225
# Establish direct replication
231226
#
@@ -573,31 +568,34 @@ contains: invalid TEXT COLUMNS option value: table table_dne not found in source
573568
PUBLICATION 'mz_source',
574569
TEXT COLUMNS [another_enum_table."колона"]
575570
)
576-
FOR ALL TABLES;
571+
FOR TABLES(
572+
conflict_schema.another_enum_table AS conflict_enum,
573+
public.another_enum_table AS public_enum
574+
);
577575
contains: invalid TEXT COLUMNS option value: table another_enum_table is ambiguous, consider specifying the schema
578576

579577
! CREATE SOURCE enum_source
580578
FROM POSTGRES CONNECTION pg (
581579
PUBLICATION 'mz_source',
582580
TEXT COLUMNS [foo]
583581
)
584-
FOR ALL TABLES;
582+
FOR SCHEMAS (public);
585583
contains: invalid TEXT COLUMNS option value: column name 'foo' must have at least a table qualification
586584

587585
! CREATE SOURCE enum_source
588586
FROM POSTGRES CONNECTION pg (
589587
PUBLICATION 'mz_source',
590588
TEXT COLUMNS [foo.bar.qux.qax.foo]
591589
)
592-
FOR ALL TABLES;
590+
FOR SCHEMAS (public);
593591
contains: invalid TEXT COLUMNS option value: qualified name did not have between 1 and 3 components: foo.bar.qux.qax
594592

595593
! CREATE SOURCE enum_source
596594
FROM POSTGRES CONNECTION pg (
597595
PUBLICATION 'mz_source',
598596
TEXT COLUMNS [enum_table.a, enum_table.a]
599597
)
600-
FOR ALL TABLES;
598+
FOR SCHEMAS (public);
601599
contains: invalid TEXT COLUMNS option value: unexpected multiple references to postgres.public.enum_table.a
602600

603601
# utf8_table is not part of mz_source_narrow publication
@@ -606,7 +604,7 @@ contains: invalid TEXT COLUMNS option value: unexpected multiple references to p
606604
PUBLICATION 'mz_source_narrow',
607605
TEXT COLUMNS [enum_table.a, utf8_table.f1]
608606
)
609-
FOR ALL TABLES;
607+
FOR SCHEMAS (public);
610608
contains: invalid TEXT COLUMNS option value: table utf8_table not found in source
611609

612610
# n.b includes a reference to pk_table, which is not a table that's part of the
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
# IMPORTANT: The Postgres server has a custom pg_hba.conf that only
11+
# accepts connections from specific users. You will have to update
12+
# pg_hba.conf if you modify the existing user names or add new ones.
13+
14+
> CREATE SECRET pgpass AS 'postgres'
15+
> CREATE CONNECTION pg TO POSTGRES (
16+
HOST postgres,
17+
DATABASE postgres,
18+
USER postgres,
19+
PASSWORD SECRET pgpass
20+
)
21+
22+
$ postgres-execute connection=postgres://postgres:postgres@postgres
23+
ALTER USER postgres WITH replication;
24+
DROP SCHEMA IF EXISTS public CASCADE;
25+
CREATE SCHEMA public;
26+
DROP SCHEMA IF EXISTS other CASCADE;
27+
CREATE SCHEMA other;
28+
29+
DROP PUBLICATION IF EXISTS mz_source;
30+
CREATE PUBLICATION mz_source FOR ALL TABLES;
31+
32+
CREATE TABLE t (f1 INT);
33+
INSERT INTO t VALUES (1);
34+
ALTER TABLE t REPLICA IDENTITY FULL;
35+
36+
CREATE TABLE other.t (f1 INT);
37+
INSERT INTO other.t VALUES (1);
38+
ALTER TABLE other.t REPLICA IDENTITY FULL;
39+
40+
! CREATE SOURCE mz_source
41+
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
42+
FOR ALL TABLES;
43+
contains:multiple tables with name "t": "postgres"."other"."t", "postgres"."public"."t"
44+
45+
> CREATE SOURCE mz_source
46+
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
47+
FOR SCHEMAS (other);
48+
49+
> SHOW sources
50+
mz_source postgres 4
51+
mz_source_progress subsource <null>
52+
t subsource <null>
53+
54+
$ postgres-execute connection=postgres://postgres:postgres@postgres
55+
DROP SCHEMA other CASCADE;

0 commit comments

Comments
 (0)