From c78acab0aadf5a8e743fdbac3aa84e7e5e243f3f Mon Sep 17 00:00:00 2001 From: Oliver Rice Date: Mon, 25 Nov 2024 12:53:39 -0600 Subject: [PATCH 1/4] Supabase Queues integration test --- nix/tests/expected/pgmq_rest_wrapper.out | 567 +++++++++++++++++++++++ nix/tests/sql/pgmq_rest_wrapper.sql | 465 +++++++++++++++++++ 2 files changed, 1032 insertions(+) create mode 100644 nix/tests/expected/pgmq_rest_wrapper.out create mode 100644 nix/tests/sql/pgmq_rest_wrapper.sql diff --git a/nix/tests/expected/pgmq_rest_wrapper.out b/nix/tests/expected/pgmq_rest_wrapper.out new file mode 100644 index 000000000..e3f12142f --- /dev/null +++ b/nix/tests/expected/pgmq_rest_wrapper.out @@ -0,0 +1,567 @@ +/* + This test is to validate the SQL for the Supabase Queues integration that + will be triggered in the FE and documented for how to manually expose + queues over supabase client libs by wrapping `pgmq`'s functions into a + separate schema that we can add to PostgREST's 'exposed_schemas' setting +*/ +/* + Emulate Role Setup from migrations/db/init-scripts/00000000000000-initial-schema.sql#L5 +*/ +-- Supabase super admin +alter user supabase_admin with superuser createdb createrole replication bypassrls; +-- Supabase replication user +create user supabase_replication_admin with login replication; +-- Supabase read-only user +create role supabase_read_only_user with login bypassrls; +grant pg_read_all_data to supabase_read_only_user; +-- Extension namespacing +create schema if not exists extensions; +create extension if not exists "uuid-ossp" with schema extensions; +NOTICE: extension "uuid-ossp" already exists, skipping +create extension if not exists pgcrypto with schema extensions; +NOTICE: extension "pgcrypto" already exists, skipping +create extension if not exists pgjwt with schema extensions; +NOTICE: extension "pgjwt" already exists, skipping +-- Set up auth roles for the developer +create role anon nologin noinherit; +create role authenticated nologin noinherit; -- "logged in" user: web_user, app_user, etc +create role service_role nologin noinherit bypassrls; -- allow developers to create JWT's that bypass their policies +create user authenticator noinherit; +grant anon to authenticator; +grant authenticated to authenticator; +grant service_role to authenticator; +grant supabase_admin to authenticator; +grant usage on schema public to postgres, anon, authenticated, service_role; +alter default privileges in schema public grant all on tables to postgres, anon, authenticated, service_role; +alter default privileges in schema public grant all on functions to postgres, anon, authenticated, service_role; +alter default privileges in schema public grant all on sequences to postgres, anon, authenticated, service_role; +-- Allow Extensions to be used in the API +grant usage on schema extensions to postgres, anon, authenticated, service_role; +-- Set up namespacing +alter user supabase_admin SET search_path TO public, extensions; -- don't include the "auth" schema +-- These are required so that the users receive grants whenever "supabase_admin" creates tables/function +alter default privileges for user supabase_admin in schema public grant all + on sequences to postgres, anon, authenticated, service_role; +alter default privileges for user supabase_admin in schema public grant all + on tables to postgres, anon, authenticated, service_role; +alter default privileges for user supabase_admin in schema public grant all + on functions to postgres, anon, authenticated, service_role; +-- Set short statement/query timeouts for API roles +alter role anon set statement_timeout = '3s'; +alter role authenticated set statement_timeout = '8s'; +/* + WORKFLOW: Enable Data APIs for Queues + +*/ +create schema if not exists queues_public; +grant usage on schema queues_public to postgres, anon, authenticated, service_role; +create or replace function queues_public.pop( + queue_name text +) + returns setof pgmq.message_record + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.pop( + queue_name := queue_name + ); +end; +$$; +comment on function queues_public.pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.'; +create or replace function queues_public.send( + queue_name text, + message jsonb, + sleep_seconds integer default 0 -- renamed from 'delay' +) + returns setof bigint + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.send( + queue_name := queue_name, + msg := message, + delay := sleep_seconds + ); +end; +$$; +comment on function queues_public.send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.'; +create or replace function queues_public.send_batch( + queue_name text, + messages jsonb[], + sleep_seconds integer default 0 -- renamed from 'delay' +) + returns setof bigint + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.send_batch( + queue_name := queue_name, + msgs := messages, + delay := sleep_seconds + ); +end; +$$; +comment on function queues_public.send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.'; +create or replace function queues_public.archive( + queue_name text, + message_id bigint +) + returns boolean + language plpgsql + set search_path = '' +as $$ +begin + return + pgmq.archive( + queue_name := queue_name, + msg_id := message_id + ); +end; +$$; +comment on function queues_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; +create or replace function queues_public.archive( + queue_name text, + message_id bigint +) + returns boolean + language plpgsql + set search_path = '' +as $$ +begin + return + pgmq.archive( + queue_name := queue_name, + msg_id := message_id + ); +end; +$$; +comment on function queues_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; +create or replace function queues_public.delete( + queue_name text, + message_id bigint +) + returns boolean + language plpgsql + set search_path = '' +as $$ +begin + return + pgmq.delete( + queue_name := queue_name, + msg_id := message_id + ); +end; +$$; +comment on function queues_public.delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.'; +create or replace function queues_public.read( + queue_name text, + sleep_seconds integer, + n integer +) + returns setof pgmq.message_record + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.read( + queue_name := queue_name, + vt := sleep_seconds, + qty := n + ); +end; +$$; +comment on function queues_public.read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).'; +-- Grant execute permissions on wrapper functions to roles +grant execute on function queues_public.pop(text) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.pop(text) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.archive(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.archive(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.delete(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.read(text, integer, integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.read(text, integer, integer, jsonb) to postgres, service_role, anon, authenticated; +ERROR: function pgmq.read(text, integer, integer, jsonb) does not exist +-- For the service role, we want full access +-- Grant permissions on existing tables +grant all privileges on all tables in schema pgmq to postgres, service_role; +-- Ensure `service_role` has permissions on future tables +alter default privileges in schema pgmq grant all privileges on tables to postgres, service_role; +grant usage on schema pgmq to postgres, anon, authenticated, service_role; +/* + Test Default Permissions +*/ +select pgmq.create('baz'); + create +-------- + +(1 row) + +-- FE should also automatically apply RLS, but we want to test default permissions and RLS separately +-- service_role can create a message, anon and authenticated can not +begin; + set local role service_role; + -- Should Succeed + select queues_public.send( + queue_name := 'baz', + message := '{}' + ); + send +------ + 1 +(1 row) + +rollback; +begin; + set local role anon; + -- Should Fail + select queues_public.send( + queue_name := 'baz', + message := '{}' + ); +ERROR: permission denied for table q_baz +CONTEXT: SQL statement " + INSERT INTO pgmq.q_baz (vt, message) + VALUES ((clock_timestamp() + '@ 0'), $1) + RETURNING msg_id; + " +PL/pgSQL function pgmq.send(text,jsonb,integer) line 14 at RETURN QUERY +SQL statement "select * + from pgmq.send( + queue_name := queue_name, + msg := message, + delay := sleep_seconds + )" +PL/pgSQL function queues_public.send(text,jsonb,integer) line 3 at RETURN QUERY +rollback; +begin; + set local role authenticated; + -- Should Fail + select queues_public.send( + queue_name := 'baz', + message := '{}' + ); +ERROR: permission denied for table q_baz +CONTEXT: SQL statement " + INSERT INTO pgmq.q_baz (vt, message) + VALUES ((clock_timestamp() + '@ 0'), $1) + RETURNING msg_id; + " +PL/pgSQL function pgmq.send(text,jsonb,integer) line 14 at RETURN QUERY +SQL statement "select * + from pgmq.send( + queue_name := queue_name, + msg := message, + delay := sleep_seconds + )" +PL/pgSQL function queues_public.send(text,jsonb,integer) line 3 at RETURN QUERY +rollback; +-- service_role can read mesages, anon and authenticated can not +begin; + set local role service_role; + -- Should Succeed + select queues_public.read( + queue_name := 'baz', + sleep_seconds := 0, + n := 1 + ); + read +------ +(0 rows) + +rollback; +begin; + set local role anon; + -- Should Fail + select queues_public.read( + queue_name := 'baz', + sleep_seconds := 0, + n := 1 + ); +ERROR: permission denied for table q_baz +CONTEXT: SQL statement " + WITH cte AS + ( + SELECT msg_id + FROM pgmq.q_baz + WHERE vt <= clock_timestamp() + ORDER BY msg_id ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE pgmq.q_baz m + SET + vt = clock_timestamp() + '@ 0', + read_ct = read_ct + 1 + FROM cte + WHERE m.msg_id = cte.msg_id + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + " +PL/pgSQL function pgmq.read(text,integer,integer) line 27 at RETURN QUERY +SQL statement "select * + from pgmq.read( + queue_name := queue_name, + vt := sleep_seconds, + qty := n + )" +PL/pgSQL function queues_public.read(text,integer,integer) line 3 at RETURN QUERY +rollback; +begin; + set local role authenticated; + -- Should Fail + select queues_public.read( + queue_name := 'baz', + sleep_seconds := 0, + n := 1 + ); +ERROR: permission denied for table q_baz +CONTEXT: SQL statement " + WITH cte AS + ( + SELECT msg_id + FROM pgmq.q_baz + WHERE vt <= clock_timestamp() + ORDER BY msg_id ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE pgmq.q_baz m + SET + vt = clock_timestamp() + '@ 0', + read_ct = read_ct + 1 + FROM cte + WHERE m.msg_id = cte.msg_id + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + " +PL/pgSQL function pgmq.read(text,integer,integer) line 27 at RETURN QUERY +SQL statement "select * + from pgmq.read( + queue_name := queue_name, + vt := sleep_seconds, + qty := n + )" +PL/pgSQL function queues_public.read(text,integer,integer) line 3 at RETURN QUERY +rollback; +/* + WORKFLOW: Create a Queue named "baz" + + In this example: + - authenticated users will have access + - anon users will not have access +*/ +create schema front_end; +-- THIS FUNCTION IS PURELY FOR TESTING PURPOSES. DO NOT CREATE IT +create or replace function front_end.create_queue( + queue_name text, + anon_has_access bool, + authenticated_has_access bool, + rls_policy text +) + returns void + language plpgsql + set search_path = '' +as $$ +begin + -- Create the queue + perform pgmq.create(queue_name); + + -- enable RLS (ALWAYS) + execute format('alter table pgmq.q_%s enable row level security;', queue_name); + + -- Note: in the FE we should have separate toggles for each of select/insert/update/delete + if anon_has_access then + -- Queue table + execute format('grant select, insert, update, delete on pgmq.q_%s to anon;', queue_name); + -- Archive table + execute format('grant select, insert, update, delete on pgmq.a_%s to anon;', queue_name); + end if; + + if authenticated_has_access then + -- Queue table + execute format('grant select, insert, update, delete on pgmq.q_%s to authenticated;', queue_name); + -- Archive table + execute format('grant select, insert, update, delete on pgmq.a_%s to authenticated;', queue_name); + end if; + + -- Note: basic implementation for testing. Use the usual RLS widgets for control + execute format( + 'create policy rls_q_%s on pgmq.q_%s for all using (%s) with check (%s)', + queue_name, + queue_name, + rls_policy, + rls_policy + ); + +end; +$$; +/* + Integration Test 1 + - authenticated has full access + - anon has no access + - RLS allows everything +*/ +select front_end.create_queue( + queue_name := 'qux', + anon_has_access := 'f', + authenticated_has_access := 't', + rls_policy := 'true' +); + create_queue +-------------- + +(1 row) + +begin; + set local role anon; + -- Should Fail + select queues_public.read( + queue_name := 'qux', + sleep_seconds := 0, + n := 1 + ); +ERROR: permission denied for table q_qux +CONTEXT: SQL statement " + WITH cte AS + ( + SELECT msg_id + FROM pgmq.q_qux + WHERE vt <= clock_timestamp() + ORDER BY msg_id ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE pgmq.q_qux m + SET + vt = clock_timestamp() + '@ 0', + read_ct = read_ct + 1 + FROM cte + WHERE m.msg_id = cte.msg_id + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + " +PL/pgSQL function pgmq.read(text,integer,integer) line 27 at RETURN QUERY +SQL statement "select * + from pgmq.read( + queue_name := queue_name, + vt := sleep_seconds, + qty := n + )" +PL/pgSQL function queues_public.read(text,integer,integer) line 3 at RETURN QUERY +rollback; +-- Should all work +begin; + set local role authenticated; + -- Should succeed + select queues_public.send( + queue_name := 'qux', + message := '{}' + ); + send +------ + 1 +(1 row) + + select queues_public.send_batch( + queue_name := 'qux', + messages := array['{"a": 1}', '{"b": 2}']::jsonb[] + ); + send_batch +------------ + 2 + 3 +(2 rows) + + select queues_public.pop( + queue_name := 'qux' + ); + pop +----- +(0 rows) + + select queues_public.delete( + queue_name := 'qux', + message_id := 2 + ); + delete +-------- + t +(1 row) + + select queues_public.archive( + queue_name := 'qux', + message_id := 3 + ); + archive +--------- + t +(1 row) + +rollback; +/* + Integration Test 2 + - authenticated has full access + - anon has no access + - RLS allows everything +*/ +select front_end.create_queue( + queue_name := 'qux', + anon_has_access := 'f', + authenticated_has_access := 't', + rls_policy := 'false' -- block all +); +NOTICE: relation "q_qux" already exists, skipping +NOTICE: relation "a_qux" already exists, skipping +NOTICE: relation "q_qux_vt_idx" already exists, skipping +NOTICE: relation "archived_at_idx_qux" already exists, skipping +ERROR: policy "rls_q_qux" for table "q_qux" already exists +CONTEXT: SQL statement "create policy rls_q_qux on pgmq.q_qux for all using (false) with check (false)" +PL/pgSQL function front_end.create_queue(text,boolean,boolean,text) line 25 at EXECUTE +begin; + set local role authenticated; + -- Should fail + select queues_public.send( + queue_name := 'waldo', + message := '{}' + ); +ERROR: relation "pgmq.q_waldo" does not exist +LINE 2: INSERT INTO pgmq.q_waldo (vt, message) + ^ +QUERY: + INSERT INTO pgmq.q_waldo (vt, message) + VALUES ((clock_timestamp() + '@ 0'), $1) + RETURNING msg_id; + +CONTEXT: PL/pgSQL function pgmq.send(text,jsonb,integer) line 14 at RETURN QUERY +SQL statement "select * + from pgmq.send( + queue_name := queue_name, + msg := message, + delay := sleep_seconds + )" +PL/pgSQL function queues_public.send(text,jsonb,integer) line 3 at RETURN QUERY +rollback; +/* + Cleanup +*/ +drop schema queues_public cascade; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to function queues_public.pop(text) +drop cascades to function queues_public.send(text,jsonb,integer) +drop cascades to function queues_public.send_batch(text,jsonb[],integer) +drop cascades to function queues_public.archive(text,bigint) +drop cascades to function queues_public.delete(text,bigint) +drop cascades to function queues_public.read(text,integer,integer) +drop extension pgmq cascade; create extension pgmq; diff --git a/nix/tests/sql/pgmq_rest_wrapper.sql b/nix/tests/sql/pgmq_rest_wrapper.sql new file mode 100644 index 000000000..df783b0ae --- /dev/null +++ b/nix/tests/sql/pgmq_rest_wrapper.sql @@ -0,0 +1,465 @@ +/* + This test is to validate the SQL for the Supabase Queues integration that + will be triggered in the FE and documented for how to manually expose + queues over supabase client libs by wrapping `pgmq`'s functions into a + separate schema that we can add to PostgREST's 'exposed_schemas' setting +*/ + +/* + Emulate Role Setup from migrations/db/init-scripts/00000000000000-initial-schema.sql#L5 +*/ + +-- Supabase super admin +alter user supabase_admin with superuser createdb createrole replication bypassrls; + +-- Supabase replication user +create user supabase_replication_admin with login replication; + +-- Supabase read-only user +create role supabase_read_only_user with login bypassrls; +grant pg_read_all_data to supabase_read_only_user; + +-- Extension namespacing +create schema if not exists extensions; +create extension if not exists "uuid-ossp" with schema extensions; +create extension if not exists pgcrypto with schema extensions; +create extension if not exists pgjwt with schema extensions; + +-- Set up auth roles for the developer +create role anon nologin noinherit; +create role authenticated nologin noinherit; -- "logged in" user: web_user, app_user, etc +create role service_role nologin noinherit bypassrls; -- allow developers to create JWT's that bypass their policies + +create user authenticator noinherit; +grant anon to authenticator; +grant authenticated to authenticator; +grant service_role to authenticator; +grant supabase_admin to authenticator; + +grant usage on schema public to postgres, anon, authenticated, service_role; +alter default privileges in schema public grant all on tables to postgres, anon, authenticated, service_role; +alter default privileges in schema public grant all on functions to postgres, anon, authenticated, service_role; +alter default privileges in schema public grant all on sequences to postgres, anon, authenticated, service_role; + +-- Allow Extensions to be used in the API +grant usage on schema extensions to postgres, anon, authenticated, service_role; + +-- Set up namespacing +alter user supabase_admin SET search_path TO public, extensions; -- don't include the "auth" schema + +-- These are required so that the users receive grants whenever "supabase_admin" creates tables/function +alter default privileges for user supabase_admin in schema public grant all + on sequences to postgres, anon, authenticated, service_role; +alter default privileges for user supabase_admin in schema public grant all + on tables to postgres, anon, authenticated, service_role; +alter default privileges for user supabase_admin in schema public grant all + on functions to postgres, anon, authenticated, service_role; + +-- Set short statement/query timeouts for API roles +alter role anon set statement_timeout = '3s'; +alter role authenticated set statement_timeout = '8s'; + + +/* + WORKFLOW: Enable Data APIs for Queues + +*/ +create schema if not exists queues_public; +grant usage on schema queues_public to postgres, anon, authenticated, service_role; + +create or replace function queues_public.pop( + queue_name text +) + returns setof pgmq.message_record + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.pop( + queue_name := queue_name + ); +end; +$$; + +comment on function queues_public.pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.'; + + +create or replace function queues_public.send( + queue_name text, + message jsonb, + sleep_seconds integer default 0 -- renamed from 'delay' +) + returns setof bigint + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.send( + queue_name := queue_name, + msg := message, + delay := sleep_seconds + ); +end; +$$; + +comment on function queues_public.send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.'; + + +create or replace function queues_public.send_batch( + queue_name text, + messages jsonb[], + sleep_seconds integer default 0 -- renamed from 'delay' +) + returns setof bigint + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.send_batch( + queue_name := queue_name, + msgs := messages, + delay := sleep_seconds + ); +end; +$$; + +comment on function queues_public.send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.'; + + +create or replace function queues_public.archive( + queue_name text, + message_id bigint +) + returns boolean + language plpgsql + set search_path = '' +as $$ +begin + return + pgmq.archive( + queue_name := queue_name, + msg_id := message_id + ); +end; +$$; + +comment on function queues_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; + + +create or replace function queues_public.archive( + queue_name text, + message_id bigint +) + returns boolean + language plpgsql + set search_path = '' +as $$ +begin + return + pgmq.archive( + queue_name := queue_name, + msg_id := message_id + ); +end; +$$; + +comment on function queues_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; + + +create or replace function queues_public.delete( + queue_name text, + message_id bigint +) + returns boolean + language plpgsql + set search_path = '' +as $$ +begin + return + pgmq.delete( + queue_name := queue_name, + msg_id := message_id + ); +end; +$$; + +comment on function queues_public.delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.'; + +create or replace function queues_public.read( + queue_name text, + sleep_seconds integer, + n integer +) + returns setof pgmq.message_record + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.read( + queue_name := queue_name, + vt := sleep_seconds, + qty := n + ); +end; +$$; + +comment on function queues_public.read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).'; + +-- Grant execute permissions on wrapper functions to roles +grant execute on function queues_public.pop(text) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.pop(text) to postgres, service_role, anon, authenticated; + + +grant execute on function queues_public.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; + +grant execute on function queues_public.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; + +grant execute on function queues_public.archive(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.archive(text, bigint) to postgres, service_role, anon, authenticated; + +grant execute on function queues_public.delete(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated; + +grant execute on function queues_public.read(text, integer, integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.read(text, integer, integer, jsonb) to postgres, service_role, anon, authenticated; + +-- For the service role, we want full access +-- Grant permissions on existing tables +grant all privileges on all tables in schema pgmq to postgres, service_role; + +-- Ensure `service_role` has permissions on future tables +alter default privileges in schema pgmq grant all privileges on tables to postgres, service_role; + +grant usage on schema pgmq to postgres, anon, authenticated, service_role; + +/* + Test Default Permissions +*/ + +select pgmq.create('baz'); +-- FE should also automatically apply RLS, but we want to test default permissions and RLS separately + + +-- service_role can create a message, anon and authenticated can not +begin; + set local role service_role; + + -- Should Succeed + select queues_public.send( + queue_name := 'baz', + message := '{}' + ); +rollback; + +begin; + set local role anon; + + -- Should Fail + select queues_public.send( + queue_name := 'baz', + message := '{}' + ); +rollback; + +begin; + set local role authenticated; + + -- Should Fail + select queues_public.send( + queue_name := 'baz', + message := '{}' + ); +rollback; + + +-- service_role can read mesages, anon and authenticated can not +begin; + set local role service_role; + + -- Should Succeed + select queues_public.read( + queue_name := 'baz', + sleep_seconds := 0, + n := 1 + ); +rollback; + +begin; + set local role anon; + + -- Should Fail + select queues_public.read( + queue_name := 'baz', + sleep_seconds := 0, + n := 1 + ); +rollback; + +begin; + set local role authenticated; + + -- Should Fail + select queues_public.read( + queue_name := 'baz', + sleep_seconds := 0, + n := 1 + ); +rollback; + + + + +/* + WORKFLOW: Create a Queue named "baz" + + In this example: + - authenticated users will have access + - anon users will not have access +*/ + +create schema front_end; + +-- THIS FUNCTION IS PURELY FOR TESTING PURPOSES. DO NOT CREATE IT +create or replace function front_end.create_queue( + queue_name text, + anon_has_access bool, + authenticated_has_access bool, + rls_policy text +) + returns void + language plpgsql + set search_path = '' +as $$ +begin + -- Create the queue + perform pgmq.create(queue_name); + + -- enable RLS (ALWAYS) + execute format('alter table pgmq.q_%s enable row level security;', queue_name); + + -- Note: in the FE we should have separate toggles for each of select/insert/update/delete + if anon_has_access then + -- Queue table + execute format('grant select, insert, update, delete on pgmq.q_%s to anon;', queue_name); + -- Archive table + execute format('grant select, insert, update, delete on pgmq.a_%s to anon;', queue_name); + end if; + + if authenticated_has_access then + -- Queue table + execute format('grant select, insert, update, delete on pgmq.q_%s to authenticated;', queue_name); + -- Archive table + execute format('grant select, insert, update, delete on pgmq.a_%s to authenticated;', queue_name); + end if; + + -- Note: basic implementation for testing. Use the usual RLS widgets for control + execute format( + 'create policy rls_q_%s on pgmq.q_%s for all using (%s) with check (%s)', + queue_name, + queue_name, + rls_policy, + rls_policy + ); + +end; +$$; + +/* + Integration Test 1 + - authenticated has full access + - anon has no access + - RLS allows everything +*/ + +select front_end.create_queue( + queue_name := 'qux', + anon_has_access := 'f', + authenticated_has_access := 't', + rls_policy := 'true' +); + +begin; + set local role anon; + + -- Should Fail + select queues_public.read( + queue_name := 'qux', + sleep_seconds := 0, + n := 1 + ); +rollback; + +-- Should all work +begin; + set local role authenticated; + + -- Should succeed + select queues_public.send( + queue_name := 'qux', + message := '{}' + ); + + select queues_public.send_batch( + queue_name := 'qux', + messages := array['{"a": 1}', '{"b": 2}']::jsonb[] + ); + + select queues_public.pop( + queue_name := 'qux' + ); + + select queues_public.delete( + queue_name := 'qux', + message_id := 2 + ); + + select queues_public.archive( + queue_name := 'qux', + message_id := 3 + ); + +rollback; + +/* + Integration Test 2 + - authenticated has full access + - anon has no access + - RLS allows everything +*/ + +select front_end.create_queue( + queue_name := 'qux', + anon_has_access := 'f', + authenticated_has_access := 't', + rls_policy := 'false' -- block all +); + + +begin; + set local role authenticated; + + -- Should fail + select queues_public.send( + queue_name := 'waldo', + message := '{}' + ); + +rollback; + + + +/* + Cleanup +*/ +drop schema queues_public cascade; +drop extension pgmq cascade; create extension pgmq; From c35983596b922cceabe34723a440f6e06c0d00a6 Mon Sep 17 00:00:00 2001 From: Oliver Rice Date: Mon, 25 Nov 2024 12:59:37 -0600 Subject: [PATCH 2/4] prefix names in in data apis with queue_* --- ..._wrapper.out => pgmq_data_api_wrapper.out} | 0 ..._wrapper.sql => pgmq_data_api_wrapper.sql} | 66 +++++++++---------- 2 files changed, 33 insertions(+), 33 deletions(-) rename nix/tests/expected/{pgmq_rest_wrapper.out => pgmq_data_api_wrapper.out} (100%) rename nix/tests/sql/{pgmq_rest_wrapper.sql => pgmq_data_api_wrapper.sql} (78%) diff --git a/nix/tests/expected/pgmq_rest_wrapper.out b/nix/tests/expected/pgmq_data_api_wrapper.out similarity index 100% rename from nix/tests/expected/pgmq_rest_wrapper.out rename to nix/tests/expected/pgmq_data_api_wrapper.out diff --git a/nix/tests/sql/pgmq_rest_wrapper.sql b/nix/tests/sql/pgmq_data_api_wrapper.sql similarity index 78% rename from nix/tests/sql/pgmq_rest_wrapper.sql rename to nix/tests/sql/pgmq_data_api_wrapper.sql index df783b0ae..af0ab220e 100644 --- a/nix/tests/sql/pgmq_rest_wrapper.sql +++ b/nix/tests/sql/pgmq_data_api_wrapper.sql @@ -67,7 +67,7 @@ alter role authenticated set statement_timeout = '8s'; create schema if not exists queues_public; grant usage on schema queues_public to postgres, anon, authenticated, service_role; -create or replace function queues_public.pop( +create or replace function queues_public.queue_pop( queue_name text ) returns setof pgmq.message_record @@ -83,10 +83,10 @@ begin end; $$; -comment on function queues_public.pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.'; +comment on function queues_public.queue_pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.'; -create or replace function queues_public.send( +create or replace function queues_public.queue_send( queue_name text, message jsonb, sleep_seconds integer default 0 -- renamed from 'delay' @@ -106,10 +106,10 @@ begin end; $$; -comment on function queues_public.send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.'; +comment on function queues_public.queue_send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.'; -create or replace function queues_public.send_batch( +create or replace function queues_public.queue_send_batch( queue_name text, messages jsonb[], sleep_seconds integer default 0 -- renamed from 'delay' @@ -129,10 +129,10 @@ begin end; $$; -comment on function queues_public.send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.'; +comment on function queues_public.queue_send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.'; -create or replace function queues_public.archive( +create or replace function queues_public.queue_archive( queue_name text, message_id bigint ) @@ -149,10 +149,10 @@ begin end; $$; -comment on function queues_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; +comment on function queues_public.queue_archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; -create or replace function queues_public.archive( +create or replace function queues_public.queue_archive( queue_name text, message_id bigint ) @@ -169,10 +169,10 @@ begin end; $$; -comment on function queues_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; +comment on function queues_public.queue_archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; -create or replace function queues_public.delete( +create or replace function queues_public.queue_delete( queue_name text, message_id bigint ) @@ -189,9 +189,9 @@ begin end; $$; -comment on function queues_public.delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.'; +comment on function queues_public.queue_delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.'; -create or replace function queues_public.read( +create or replace function queues_public.queue_read( queue_name text, sleep_seconds integer, n integer @@ -211,26 +211,26 @@ begin end; $$; -comment on function queues_public.read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).'; +comment on function queues_public.queue_read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).'; -- Grant execute permissions on wrapper functions to roles -grant execute on function queues_public.pop(text) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_pop(text) to postgres, service_role, anon, authenticated; grant execute on function pgmq.pop(text) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_send(text, jsonb, integer) to postgres, service_role, anon, authenticated; grant execute on function pgmq.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; grant execute on function pgmq.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.archive(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_archive(text, bigint) to postgres, service_role, anon, authenticated; grant execute on function pgmq.archive(text, bigint) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.delete(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_delete(text, bigint) to postgres, service_role, anon, authenticated; grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.read(text, integer, integer) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_read(text, integer, integer) to postgres, service_role, anon, authenticated; grant execute on function pgmq.read(text, integer, integer, jsonb) to postgres, service_role, anon, authenticated; -- For the service role, we want full access @@ -255,7 +255,7 @@ begin; set local role service_role; -- Should Succeed - select queues_public.send( + select queues_public.queue_send( queue_name := 'baz', message := '{}' ); @@ -265,7 +265,7 @@ begin; set local role anon; -- Should Fail - select queues_public.send( + select queues_public.queue_send( queue_name := 'baz', message := '{}' ); @@ -275,7 +275,7 @@ begin; set local role authenticated; -- Should Fail - select queues_public.send( + select queues_public.queue_send( queue_name := 'baz', message := '{}' ); @@ -287,7 +287,7 @@ begin; set local role service_role; -- Should Succeed - select queues_public.read( + select queues_public.queue_read( queue_name := 'baz', sleep_seconds := 0, n := 1 @@ -298,7 +298,7 @@ begin; set local role anon; -- Should Fail - select queues_public.read( + select queues_public.queue_read( queue_name := 'baz', sleep_seconds := 0, n := 1 @@ -309,7 +309,7 @@ begin; set local role authenticated; -- Should Fail - select queues_public.read( + select queues_public.queue_read( queue_name := 'baz', sleep_seconds := 0, n := 1 @@ -392,7 +392,7 @@ begin; set local role anon; -- Should Fail - select queues_public.read( + select queues_public.queue_read( queue_name := 'qux', sleep_seconds := 0, n := 1 @@ -404,26 +404,26 @@ begin; set local role authenticated; -- Should succeed - select queues_public.send( + select queues_public.queue_send( queue_name := 'qux', message := '{}' ); - select queues_public.send_batch( + select queues_public.queue_send_batch( queue_name := 'qux', messages := array['{"a": 1}', '{"b": 2}']::jsonb[] ); - select queues_public.pop( + select queues_public.queue_pop( queue_name := 'qux' ); - select queues_public.delete( + select queues_public.queue_delete( queue_name := 'qux', message_id := 2 ); - select queues_public.archive( + select queues_public.queue_archive( queue_name := 'qux', message_id := 3 ); @@ -449,7 +449,7 @@ begin; set local role authenticated; -- Should fail - select queues_public.send( + select queues_public.queue_send( queue_name := 'waldo', message := '{}' ); From e9e13d5a026add95d410304dbe6af1058b7b1b97 Mon Sep 17 00:00:00 2001 From: Oliver Rice Date: Mon, 25 Nov 2024 13:04:59 -0600 Subject: [PATCH 3/4] update test output with queue_ prefix --- nix/tests/expected/pgmq_data_api_wrapper.out | 126 +++++++++---------- 1 file changed, 63 insertions(+), 63 deletions(-) diff --git a/nix/tests/expected/pgmq_data_api_wrapper.out b/nix/tests/expected/pgmq_data_api_wrapper.out index e3f12142f..e4169ac8f 100644 --- a/nix/tests/expected/pgmq_data_api_wrapper.out +++ b/nix/tests/expected/pgmq_data_api_wrapper.out @@ -55,7 +55,7 @@ alter role authenticated set statement_timeout = '8s'; */ create schema if not exists queues_public; grant usage on schema queues_public to postgres, anon, authenticated, service_role; -create or replace function queues_public.pop( +create or replace function queues_public.queue_pop( queue_name text ) returns setof pgmq.message_record @@ -70,8 +70,8 @@ begin ); end; $$; -comment on function queues_public.pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.'; -create or replace function queues_public.send( +comment on function queues_public.queue_pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.'; +create or replace function queues_public.queue_send( queue_name text, message jsonb, sleep_seconds integer default 0 -- renamed from 'delay' @@ -90,8 +90,8 @@ begin ); end; $$; -comment on function queues_public.send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.'; -create or replace function queues_public.send_batch( +comment on function queues_public.queue_send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.'; +create or replace function queues_public.queue_send_batch( queue_name text, messages jsonb[], sleep_seconds integer default 0 -- renamed from 'delay' @@ -110,8 +110,8 @@ begin ); end; $$; -comment on function queues_public.send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.'; -create or replace function queues_public.archive( +comment on function queues_public.queue_send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.'; +create or replace function queues_public.queue_archive( queue_name text, message_id bigint ) @@ -127,8 +127,8 @@ begin ); end; $$; -comment on function queues_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; -create or replace function queues_public.archive( +comment on function queues_public.queue_archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; +create or replace function queues_public.queue_archive( queue_name text, message_id bigint ) @@ -144,8 +144,8 @@ begin ); end; $$; -comment on function queues_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; -create or replace function queues_public.delete( +comment on function queues_public.queue_archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; +create or replace function queues_public.queue_delete( queue_name text, message_id bigint ) @@ -161,8 +161,8 @@ begin ); end; $$; -comment on function queues_public.delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.'; -create or replace function queues_public.read( +comment on function queues_public.queue_delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.'; +create or replace function queues_public.queue_read( queue_name text, sleep_seconds integer, n integer @@ -181,19 +181,19 @@ begin ); end; $$; -comment on function queues_public.read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).'; +comment on function queues_public.queue_read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).'; -- Grant execute permissions on wrapper functions to roles -grant execute on function queues_public.pop(text) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_pop(text) to postgres, service_role, anon, authenticated; grant execute on function pgmq.pop(text) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_send(text, jsonb, integer) to postgres, service_role, anon, authenticated; grant execute on function pgmq.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; grant execute on function pgmq.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.archive(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_archive(text, bigint) to postgres, service_role, anon, authenticated; grant execute on function pgmq.archive(text, bigint) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.delete(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_delete(text, bigint) to postgres, service_role, anon, authenticated; grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated; -grant execute on function queues_public.read(text, integer, integer) to postgres, service_role, anon, authenticated; +grant execute on function queues_public.queue_read(text, integer, integer) to postgres, service_role, anon, authenticated; grant execute on function pgmq.read(text, integer, integer, jsonb) to postgres, service_role, anon, authenticated; ERROR: function pgmq.read(text, integer, integer, jsonb) does not exist -- For the service role, we want full access @@ -216,20 +216,20 @@ select pgmq.create('baz'); begin; set local role service_role; -- Should Succeed - select queues_public.send( + select queues_public.queue_send( queue_name := 'baz', message := '{}' ); - send ------- - 1 + queue_send +------------ + 1 (1 row) rollback; begin; set local role anon; -- Should Fail - select queues_public.send( + select queues_public.queue_send( queue_name := 'baz', message := '{}' ); @@ -246,12 +246,12 @@ SQL statement "select * msg := message, delay := sleep_seconds )" -PL/pgSQL function queues_public.send(text,jsonb,integer) line 3 at RETURN QUERY +PL/pgSQL function queues_public.queue_send(text,jsonb,integer) line 3 at RETURN QUERY rollback; begin; set local role authenticated; -- Should Fail - select queues_public.send( + select queues_public.queue_send( queue_name := 'baz', message := '{}' ); @@ -268,26 +268,26 @@ SQL statement "select * msg := message, delay := sleep_seconds )" -PL/pgSQL function queues_public.send(text,jsonb,integer) line 3 at RETURN QUERY +PL/pgSQL function queues_public.queue_send(text,jsonb,integer) line 3 at RETURN QUERY rollback; -- service_role can read mesages, anon and authenticated can not begin; set local role service_role; -- Should Succeed - select queues_public.read( + select queues_public.queue_read( queue_name := 'baz', sleep_seconds := 0, n := 1 ); - read ------- + queue_read +------------ (0 rows) rollback; begin; set local role anon; -- Should Fail - select queues_public.read( + select queues_public.queue_read( queue_name := 'baz', sleep_seconds := 0, n := 1 @@ -318,12 +318,12 @@ SQL statement "select * vt := sleep_seconds, qty := n )" -PL/pgSQL function queues_public.read(text,integer,integer) line 3 at RETURN QUERY +PL/pgSQL function queues_public.queue_read(text,integer,integer) line 3 at RETURN QUERY rollback; begin; set local role authenticated; -- Should Fail - select queues_public.read( + select queues_public.queue_read( queue_name := 'baz', sleep_seconds := 0, n := 1 @@ -354,7 +354,7 @@ SQL statement "select * vt := sleep_seconds, qty := n )" -PL/pgSQL function queues_public.read(text,integer,integer) line 3 at RETURN QUERY +PL/pgSQL function queues_public.queue_read(text,integer,integer) line 3 at RETURN QUERY rollback; /* WORKFLOW: Create a Queue named "baz" @@ -428,7 +428,7 @@ select front_end.create_queue( begin; set local role anon; -- Should Fail - select queues_public.read( + select queues_public.queue_read( queue_name := 'qux', sleep_seconds := 0, n := 1 @@ -459,53 +459,53 @@ SQL statement "select * vt := sleep_seconds, qty := n )" -PL/pgSQL function queues_public.read(text,integer,integer) line 3 at RETURN QUERY +PL/pgSQL function queues_public.queue_read(text,integer,integer) line 3 at RETURN QUERY rollback; -- Should all work begin; set local role authenticated; -- Should succeed - select queues_public.send( + select queues_public.queue_send( queue_name := 'qux', message := '{}' ); - send ------- - 1 + queue_send +------------ + 1 (1 row) - select queues_public.send_batch( + select queues_public.queue_send_batch( queue_name := 'qux', messages := array['{"a": 1}', '{"b": 2}']::jsonb[] ); - send_batch ------------- - 2 - 3 + queue_send_batch +------------------ + 2 + 3 (2 rows) - select queues_public.pop( + select queues_public.queue_pop( queue_name := 'qux' ); - pop ------ + queue_pop +----------- (0 rows) - select queues_public.delete( + select queues_public.queue_delete( queue_name := 'qux', message_id := 2 ); - delete --------- + queue_delete +-------------- t (1 row) - select queues_public.archive( + select queues_public.queue_archive( queue_name := 'qux', message_id := 3 ); - archive ---------- + queue_archive +--------------- t (1 row) @@ -532,7 +532,7 @@ PL/pgSQL function front_end.create_queue(text,boolean,boolean,text) line 25 at E begin; set local role authenticated; -- Should fail - select queues_public.send( + select queues_public.queue_send( queue_name := 'waldo', message := '{}' ); @@ -551,17 +551,17 @@ SQL statement "select * msg := message, delay := sleep_seconds )" -PL/pgSQL function queues_public.send(text,jsonb,integer) line 3 at RETURN QUERY +PL/pgSQL function queues_public.queue_send(text,jsonb,integer) line 3 at RETURN QUERY rollback; /* Cleanup */ drop schema queues_public cascade; NOTICE: drop cascades to 6 other objects -DETAIL: drop cascades to function queues_public.pop(text) -drop cascades to function queues_public.send(text,jsonb,integer) -drop cascades to function queues_public.send_batch(text,jsonb[],integer) -drop cascades to function queues_public.archive(text,bigint) -drop cascades to function queues_public.delete(text,bigint) -drop cascades to function queues_public.read(text,integer,integer) +DETAIL: drop cascades to function queues_public.queue_pop(text) +drop cascades to function queues_public.queue_send(text,jsonb,integer) +drop cascades to function queues_public.queue_send_batch(text,jsonb[],integer) +drop cascades to function queues_public.queue_archive(text,bigint) +drop cascades to function queues_public.queue_delete(text,bigint) +drop cascades to function queues_public.queue_read(text,integer,integer) drop extension pgmq cascade; create extension pgmq; From 1ddd53ee5bc9227b70c9e444a100f21dc217967a Mon Sep 17 00:00:00 2001 From: Oliver Rice Date: Mon, 25 Nov 2024 14:19:05 -0600 Subject: [PATCH 4/4] correct read function perms --- nix/tests/expected/pgmq_data_api_wrapper.out | 7 +++---- nix/tests/sql/pgmq_data_api_wrapper.sql | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/nix/tests/expected/pgmq_data_api_wrapper.out b/nix/tests/expected/pgmq_data_api_wrapper.out index e4169ac8f..1e0e732c9 100644 --- a/nix/tests/expected/pgmq_data_api_wrapper.out +++ b/nix/tests/expected/pgmq_data_api_wrapper.out @@ -1,5 +1,5 @@ -/* - This test is to validate the SQL for the Supabase Queues integration that +/* + This test is to validate the SQL for the Supabase Queues integration that will be triggered in the FE and documented for how to manually expose queues over supabase client libs by wrapping `pgmq`'s functions into a separate schema that we can add to PostgREST's 'exposed_schemas' setting @@ -194,8 +194,7 @@ grant execute on function pgmq.archive(text, bigint) to postgres, service_role, grant execute on function queues_public.queue_delete(text, bigint) to postgres, service_role, anon, authenticated; grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated; grant execute on function queues_public.queue_read(text, integer, integer) to postgres, service_role, anon, authenticated; -grant execute on function pgmq.read(text, integer, integer, jsonb) to postgres, service_role, anon, authenticated; -ERROR: function pgmq.read(text, integer, integer, jsonb) does not exist +grant execute on function pgmq.read(text, integer, integer) to postgres, service_role, anon, authenticated; -- For the service role, we want full access -- Grant permissions on existing tables grant all privileges on all tables in schema pgmq to postgres, service_role; diff --git a/nix/tests/sql/pgmq_data_api_wrapper.sql b/nix/tests/sql/pgmq_data_api_wrapper.sql index af0ab220e..bccad4442 100644 --- a/nix/tests/sql/pgmq_data_api_wrapper.sql +++ b/nix/tests/sql/pgmq_data_api_wrapper.sql @@ -1,5 +1,5 @@ -/* - This test is to validate the SQL for the Supabase Queues integration that +/* + This test is to validate the SQL for the Supabase Queues integration that will be triggered in the FE and documented for how to manually expose queues over supabase client libs by wrapping `pgmq`'s functions into a separate schema that we can add to PostgREST's 'exposed_schemas' setting @@ -231,7 +231,7 @@ grant execute on function queues_public.queue_delete(text, bigint) to postgres, grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated; grant execute on function queues_public.queue_read(text, integer, integer) to postgres, service_role, anon, authenticated; -grant execute on function pgmq.read(text, integer, integer, jsonb) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.read(text, integer, integer) to postgres, service_role, anon, authenticated; -- For the service role, we want full access -- Grant permissions on existing tables