forked from supabase/pg_net
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpg_net.sql
357 lines (316 loc) · 9.09 KB
/
pg_net.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
create schema if not exists net;
create domain net.http_method as text
check (
value ilike 'get'
or value ilike 'post'
or value ilike 'delete'
);
-- Store pending requests. The background worker reads from here
-- API: Private
create unlogged table net.http_request_queue(
id bigserial,
method net.http_method not null,
url text not null,
headers jsonb not null,
body bytea,
timeout_milliseconds int not null
);
create or replace function net.check_worker_is_up() returns void as $$
begin
if not exists (select pid from pg_stat_activity where backend_type ilike '%pg_net%') then
raise exception using
message = 'the pg_net background worker is not up'
, detail = 'the pg_net background worker is down due to an internal error and cannot process requests'
, hint = 'make sure that you didn''t modify any of pg_net internal tables';
end if;
end
$$ language plpgsql;
comment on function net.check_worker_is_up() is 'raises an exception if the pg_net background worker is not up, otherwise it doesn''t return anything';
-- Associates a response with a request
-- API: Private
create unlogged table net._http_response(
id bigint,
status_code integer,
content_type text,
headers jsonb,
content text,
timed_out bool,
error_msg text,
created timestamptz not null default now()
);
create index on net._http_response (created);
-- Blocks until an http_request is complete
-- API: Private
create or replace function net._await_response(
request_id bigint
)
returns bool
volatile
parallel safe
strict
language plpgsql
as $$
declare
rec net._http_response;
begin
while rec is null loop
select *
into rec
from net._http_response
where id = request_id;
if rec is null then
-- Wait 50 ms before checking again
perform pg_sleep(0.05);
end if;
end loop;
return true;
end;
$$;
-- url encode a string
-- API: Private
create or replace function net._urlencode_string(string varchar)
-- url encoded string
returns text
language 'c'
immutable
strict
as 'pg_net';
-- API: Private
create or replace function net._encode_url_with_params_array(url text, params_array text[])
-- url encoded string
returns text
strict
language 'c'
immutable
as 'pg_net';
create or replace function net.worker_restart()
returns bool
language 'c'
as 'pg_net';
-- Interface to make an async request
-- API: Public
create or replace function net.http_get(
-- url for the request
url text,
-- key/value pairs to be url encoded and appended to the `url`
params jsonb default '{}'::jsonb,
-- key/values to be included in request headers
headers jsonb default '{}'::jsonb,
-- the maximum number of milliseconds the request may take before being cancelled
timeout_milliseconds int default 5000
)
-- request_id reference
returns bigint
strict
volatile
parallel safe
language plpgsql
as $$
declare
request_id bigint;
params_array text[];
begin
select coalesce(array_agg(net._urlencode_string(key) || '=' || net._urlencode_string(value)), '{}')
into params_array
from jsonb_each_text(params);
-- Add to the request queue
insert into net.http_request_queue(method, url, headers, timeout_milliseconds)
values (
'GET',
net._encode_url_with_params_array(url, params_array),
headers,
timeout_milliseconds
)
returning id
into request_id;
return request_id;
end
$$;
-- Interface to make an async request
-- API: Public
create or replace function net.http_post(
-- url for the request
url text,
-- body of the POST request
body jsonb default '{}'::jsonb,
-- key/value pairs to be url encoded and appended to the `url`
params jsonb default '{}'::jsonb,
-- key/values to be included in request headers
headers jsonb default '{"Content-Type": "application/json"}'::jsonb,
-- the maximum number of milliseconds the request may take before being cancelled
timeout_milliseconds int DEFAULT 5000
)
-- request_id reference
returns bigint
volatile
parallel safe
language plpgsql
as $$
declare
request_id bigint;
params_array text[];
content_type text;
begin
-- Exctract the content_type from headers
select
header_value into content_type
from
jsonb_each_text(coalesce(headers, '{}'::jsonb)) r(header_name, header_value)
where
lower(header_name) = 'content-type'
limit
1;
-- If the user provided new headers and omitted the content type
-- add it back in automatically
if content_type is null then
select headers || '{"Content-Type": "application/json"}'::jsonb into headers;
end if;
-- Confirm that the content-type is set as "application/json"
if content_type <> 'application/json' then
raise exception 'Content-Type header must be "application/json"';
end if;
select
coalesce(array_agg(net._urlencode_string(key) || '=' || net._urlencode_string(value)), '{}')
into
params_array
from
jsonb_each_text(params);
-- Add to the request queue
insert into net.http_request_queue(method, url, headers, body, timeout_milliseconds)
values (
'POST',
net._encode_url_with_params_array(url, params_array),
headers,
convert_to(body::text, 'UTF8'),
timeout_milliseconds
)
returning id
into request_id;
return request_id;
end
$$;
-- Interface to make an async request
-- API: Public
create or replace function net.http_delete(
-- url for the request
url text,
-- key/value pairs to be url encoded and appended to the `url`
params jsonb default '{}'::jsonb,
-- key/values to be included in request headers
headers jsonb default '{}'::jsonb,
-- the maximum number of milliseconds the request may take before being cancelled
timeout_milliseconds int default 5000
)
-- request_id reference
returns bigint
strict
volatile
parallel safe
language plpgsql
as $$
declare
request_id bigint;
params_array text[];
begin
select coalesce(array_agg(net._urlencode_string(key) || '=' || net._urlencode_string(value)), '{}')
into params_array
from jsonb_each_text(params);
-- Add to the request queue
insert into net.http_request_queue(method, url, headers, timeout_milliseconds)
values (
'DELETE',
net._encode_url_with_params_array(url, params_array),
headers,
timeout_milliseconds
)
returning id
into request_id;
return request_id;
end
$$;
-- Lifecycle states of a request (all protocols)
-- API: Public
create type net.request_status as enum ('PENDING', 'SUCCESS', 'ERROR');
-- A response from an HTTP server
-- API: Public
create type net.http_response AS (
status_code integer,
headers jsonb,
body text
);
-- State wrapper around responses
-- API: Public
create type net.http_response_result as (
status net.request_status,
message text,
response net.http_response
);
-- Collect respones of an http request
-- API: Private
create or replace function net._http_collect_response(
-- request_id reference
request_id bigint,
-- when `true`, return immediately. when `false` wait for the request to complete before returning
async bool default true
)
-- http response composite wrapped in a result type
returns net.http_response_result
strict
volatile
parallel safe
language plpgsql
as $$
declare
rec net._http_response;
req_exists boolean;
begin
if not async then
perform net._await_response(request_id);
end if;
select *
into rec
from net._http_response
where id = request_id;
if rec is null or rec.error_msg is not null then
-- The request is either still processing or the request_id provided does not exist
-- TODO: request in progress is indistinguishable from request that doesn't exist
-- No request matching request_id found
return (
'ERROR',
coalesce(rec.error_msg, 'request matching request_id not found'),
null
)::net.http_response_result;
end if;
-- Return a valid, populated http_response_result
return (
'SUCCESS',
'ok',
(
rec.status_code,
rec.headers,
rec.content
)::net.http_response
)::net.http_response_result;
end;
$$;
create or replace function net.http_collect_response(
-- request_id reference
request_id bigint,
-- when `true`, return immediately. when `false` wait for the request to complete before returning
async bool default true
)
-- http response composite wrapped in a result type
returns net.http_response_result
strict
volatile
parallel safe
language plpgsql
as $$
begin
raise notice 'The net.http_collect_response function is deprecated.';
select net._http_collect_response(request_id, async);
end;
$$;
grant usage on schema net to PUBLIC;
grant all on all sequences in schema net to PUBLIC;
grant all on all tables in schema net to PUBLIC;