722 lines
24 KiB
PL/PgSQL
722 lines
24 KiB
PL/PgSQL
-- =============================================================================
|
|
-- AgenciaPsi — Functions — realtime schema
|
|
-- =============================================================================
|
|
|
|
CREATE FUNCTION realtime.apply_rls(wal jsonb, max_record_bytes integer DEFAULT (1024 * 1024)) RETURNS SETOF realtime.wal_rls
|
|
LANGUAGE plpgsql
|
|
AS $$
|
|
declare
|
|
-- Regclass of the table e.g. public.notes
|
|
entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;
|
|
|
|
-- I, U, D, T: insert, update ...
|
|
action realtime.action = (
|
|
case wal ->> 'action'
|
|
when 'I' then 'INSERT'
|
|
when 'U' then 'UPDATE'
|
|
when 'D' then 'DELETE'
|
|
else 'ERROR'
|
|
end
|
|
);
|
|
|
|
-- Is row level security enabled for the table
|
|
is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;
|
|
|
|
subscriptions realtime.subscription[] = array_agg(subs)
|
|
from
|
|
realtime.subscription subs
|
|
where
|
|
subs.entity = entity_;
|
|
|
|
-- Subscription vars
|
|
roles regrole[] = array_agg(distinct us.claims_role::text)
|
|
from
|
|
unnest(subscriptions) us;
|
|
|
|
working_role regrole;
|
|
claimed_role regrole;
|
|
claims jsonb;
|
|
|
|
subscription_id uuid;
|
|
subscription_has_access bool;
|
|
visible_to_subscription_ids uuid[] = '{}';
|
|
|
|
-- structured info for wal's columns
|
|
columns realtime.wal_column[];
|
|
-- previous identity values for update/delete
|
|
old_columns realtime.wal_column[];
|
|
|
|
error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;
|
|
|
|
-- Primary jsonb output for record
|
|
output jsonb;
|
|
|
|
begin
|
|
perform set_config('role', null, true);
|
|
|
|
columns =
|
|
array_agg(
|
|
(
|
|
x->>'name',
|
|
x->>'type',
|
|
x->>'typeoid',
|
|
realtime.cast(
|
|
(x->'value') #>> '{}',
|
|
coalesce(
|
|
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4
|
|
(x->>'type')::regtype
|
|
)
|
|
),
|
|
(pks ->> 'name') is not null,
|
|
true
|
|
)::realtime.wal_column
|
|
)
|
|
from
|
|
jsonb_array_elements(wal -> 'columns') x
|
|
left join jsonb_array_elements(wal -> 'pk') pks
|
|
on (x ->> 'name') = (pks ->> 'name');
|
|
|
|
old_columns =
|
|
array_agg(
|
|
(
|
|
x->>'name',
|
|
x->>'type',
|
|
x->>'typeoid',
|
|
realtime.cast(
|
|
(x->'value') #>> '{}',
|
|
coalesce(
|
|
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4
|
|
(x->>'type')::regtype
|
|
)
|
|
),
|
|
(pks ->> 'name') is not null,
|
|
true
|
|
)::realtime.wal_column
|
|
)
|
|
from
|
|
jsonb_array_elements(wal -> 'identity') x
|
|
left join jsonb_array_elements(wal -> 'pk') pks
|
|
on (x ->> 'name') = (pks ->> 'name');
|
|
|
|
for working_role in select * from unnest(roles) loop
|
|
|
|
-- Update `is_selectable` for columns and old_columns
|
|
columns =
|
|
array_agg(
|
|
(
|
|
c.name,
|
|
c.type_name,
|
|
c.type_oid,
|
|
c.value,
|
|
c.is_pkey,
|
|
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')
|
|
)::realtime.wal_column
|
|
)
|
|
from
|
|
unnest(columns) c;
|
|
|
|
old_columns =
|
|
array_agg(
|
|
(
|
|
c.name,
|
|
c.type_name,
|
|
c.type_oid,
|
|
c.value,
|
|
c.is_pkey,
|
|
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')
|
|
)::realtime.wal_column
|
|
)
|
|
from
|
|
unnest(old_columns) c;
|
|
|
|
if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then
|
|
return next (
|
|
jsonb_build_object(
|
|
'schema', wal ->> 'schema',
|
|
'table', wal ->> 'table',
|
|
'type', action
|
|
),
|
|
is_rls_enabled,
|
|
-- subscriptions is already filtered by entity
|
|
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),
|
|
array['Error 400: Bad Request, no primary key']
|
|
)::realtime.wal_rls;
|
|
|
|
-- The claims role does not have SELECT permission to the primary key of entity
|
|
elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then
|
|
return next (
|
|
jsonb_build_object(
|
|
'schema', wal ->> 'schema',
|
|
'table', wal ->> 'table',
|
|
'type', action
|
|
),
|
|
is_rls_enabled,
|
|
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),
|
|
array['Error 401: Unauthorized']
|
|
)::realtime.wal_rls;
|
|
|
|
else
|
|
output = jsonb_build_object(
|
|
'schema', wal ->> 'schema',
|
|
'table', wal ->> 'table',
|
|
'type', action,
|
|
'commit_timestamp', to_char(
|
|
((wal ->> 'timestamp')::timestamptz at time zone 'utc'),
|
|
'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'
|
|
),
|
|
'columns', (
|
|
select
|
|
jsonb_agg(
|
|
jsonb_build_object(
|
|
'name', pa.attname,
|
|
'type', pt.typname
|
|
)
|
|
order by pa.attnum asc
|
|
)
|
|
from
|
|
pg_attribute pa
|
|
join pg_type pt
|
|
on pa.atttypid = pt.oid
|
|
where
|
|
attrelid = entity_
|
|
and attnum > 0
|
|
and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT')
|
|
)
|
|
)
|
|
-- Add "record" key for insert and update
|
|
|| case
|
|
when action in ('INSERT', 'UPDATE') then
|
|
jsonb_build_object(
|
|
'record',
|
|
(
|
|
select
|
|
jsonb_object_agg(
|
|
-- if unchanged toast, get column name and value from old record
|
|
coalesce((c).name, (oc).name),
|
|
case
|
|
when (c).name is null then (oc).value
|
|
else (c).value
|
|
end
|
|
)
|
|
from
|
|
unnest(columns) c
|
|
full outer join unnest(old_columns) oc
|
|
on (c).name = (oc).name
|
|
where
|
|
coalesce((c).is_selectable, (oc).is_selectable)
|
|
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
|
|
)
|
|
)
|
|
else '{}'::jsonb
|
|
end
|
|
-- Add "old_record" key for update and delete
|
|
|| case
|
|
when action = 'UPDATE' then
|
|
jsonb_build_object(
|
|
'old_record',
|
|
(
|
|
select jsonb_object_agg((c).name, (c).value)
|
|
from unnest(old_columns) c
|
|
where
|
|
(c).is_selectable
|
|
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
|
|
)
|
|
)
|
|
when action = 'DELETE' then
|
|
jsonb_build_object(
|
|
'old_record',
|
|
(
|
|
select jsonb_object_agg((c).name, (c).value)
|
|
from unnest(old_columns) c
|
|
where
|
|
(c).is_selectable
|
|
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
|
|
and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey
|
|
)
|
|
)
|
|
else '{}'::jsonb
|
|
end;
|
|
|
|
-- Create the prepared statement
|
|
if is_rls_enabled and action <> 'DELETE' then
|
|
if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then
|
|
deallocate walrus_rls_stmt;
|
|
end if;
|
|
execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns);
|
|
end if;
|
|
|
|
visible_to_subscription_ids = '{}';
|
|
|
|
for subscription_id, claims in (
|
|
select
|
|
subs.subscription_id,
|
|
subs.claims
|
|
from
|
|
unnest(subscriptions) subs
|
|
where
|
|
subs.entity = entity_
|
|
and subs.claims_role = working_role
|
|
and (
|
|
realtime.is_visible_through_filters(columns, subs.filters)
|
|
or (
|
|
action = 'DELETE'
|
|
and realtime.is_visible_through_filters(old_columns, subs.filters)
|
|
)
|
|
)
|
|
) loop
|
|
|
|
if not is_rls_enabled or action = 'DELETE' then
|
|
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id;
|
|
else
|
|
-- Check if RLS allows the role to see the record
|
|
perform
|
|
-- Trim leading and trailing quotes from working_role because set_config
|
|
-- doesn't recognize the role as valid if they are included
|
|
set_config('role', trim(both '"' from working_role::text), true),
|
|
set_config('request.jwt.claims', claims::text, true);
|
|
|
|
execute 'execute walrus_rls_stmt' into subscription_has_access;
|
|
|
|
if subscription_has_access then
|
|
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id;
|
|
end if;
|
|
end if;
|
|
end loop;
|
|
|
|
perform set_config('role', null, true);
|
|
|
|
return next (
|
|
output,
|
|
is_rls_enabled,
|
|
visible_to_subscription_ids,
|
|
case
|
|
when error_record_exceeds_max_size then array['Error 413: Payload Too Large']
|
|
else '{}'
|
|
end
|
|
)::realtime.wal_rls;
|
|
|
|
end if;
|
|
end loop;
|
|
|
|
perform set_config('role', null, true);
|
|
end;
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.apply_rls(wal jsonb, max_record_bytes integer) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: broadcast_changes(text, text, text, text, text, record, record, text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.broadcast_changes(topic_name text, event_name text, operation text, table_name text, table_schema text, new record, old record, level text DEFAULT 'ROW'::text) RETURNS void
|
|
LANGUAGE plpgsql
|
|
AS $$
|
|
DECLARE
|
|
-- Declare a variable to hold the JSONB representation of the row
|
|
row_data jsonb := '{}'::jsonb;
|
|
BEGIN
|
|
IF level = 'STATEMENT' THEN
|
|
RAISE EXCEPTION 'function can only be triggered for each row, not for each statement';
|
|
END IF;
|
|
-- Check the operation type and handle accordingly
|
|
IF operation = 'INSERT' OR operation = 'UPDATE' OR operation = 'DELETE' THEN
|
|
row_data := jsonb_build_object('old_record', OLD, 'record', NEW, 'operation', operation, 'table', table_name, 'schema', table_schema);
|
|
PERFORM realtime.send (row_data, event_name, topic_name);
|
|
ELSE
|
|
RAISE EXCEPTION 'Unexpected operation type: %', operation;
|
|
END IF;
|
|
EXCEPTION
|
|
WHEN OTHERS THEN
|
|
RAISE EXCEPTION 'Failed to process the row: %', SQLERRM;
|
|
END;
|
|
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.broadcast_changes(topic_name text, event_name text, operation text, table_name text, table_schema text, new record, old record, level text) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: build_prepared_statement_sql(text, regclass, realtime.wal_column[]); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[]) RETURNS text
|
|
LANGUAGE sql
|
|
AS $$
|
|
/*
|
|
Builds a sql string that, if executed, creates a prepared statement to
|
|
tests retrive a row from *entity* by its primary key columns.
|
|
Example
|
|
select realtime.build_prepared_statement_sql('public.notes', '{"id"}'::text[], '{"bigint"}'::text[])
|
|
*/
|
|
select
|
|
'prepare ' || prepared_statement_name || ' as
|
|
select
|
|
exists(
|
|
select
|
|
1
|
|
from
|
|
' || entity || '
|
|
where
|
|
' || string_agg(quote_ident(pkc.name) || '=' || quote_nullable(pkc.value #>> '{}') , ' and ') || '
|
|
)'
|
|
from
|
|
unnest(columns) pkc
|
|
where
|
|
pkc.is_pkey
|
|
group by
|
|
entity
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[]) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: cast(text, regtype); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime."cast"(val text, type_ regtype) RETURNS jsonb
|
|
LANGUAGE plpgsql IMMUTABLE
|
|
AS $$
|
|
declare
|
|
res jsonb;
|
|
begin
|
|
execute format('select to_jsonb(%L::'|| type_::text || ')', val) into res;
|
|
return res;
|
|
end
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime."cast"(val text, type_ regtype) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: check_equality_op(realtime.equality_op, regtype, text, text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) RETURNS boolean
|
|
LANGUAGE plpgsql IMMUTABLE
|
|
AS $$
|
|
/*
|
|
Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness
|
|
*/
|
|
declare
|
|
op_symbol text = (
|
|
case
|
|
when op = 'eq' then '='
|
|
when op = 'neq' then '!='
|
|
when op = 'lt' then '<'
|
|
when op = 'lte' then '<='
|
|
when op = 'gt' then '>'
|
|
when op = 'gte' then '>='
|
|
when op = 'in' then '= any'
|
|
else 'UNKNOWN OP'
|
|
end
|
|
);
|
|
res boolean;
|
|
begin
|
|
execute format(
|
|
'select %L::'|| type_::text || ' ' || op_symbol
|
|
|| ' ( %L::'
|
|
|| (
|
|
case
|
|
when op = 'in' then type_::text || '[]'
|
|
else type_::text end
|
|
)
|
|
|| ')', val_1, val_2) into res;
|
|
return res;
|
|
end;
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: is_visible_through_filters(realtime.wal_column[], realtime.user_defined_filter[]); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) RETURNS boolean
|
|
LANGUAGE sql IMMUTABLE
|
|
AS $_$
|
|
/*
|
|
Should the record be visible (true) or filtered out (false) after *filters* are applied
|
|
*/
|
|
select
|
|
-- Default to allowed when no filters present
|
|
$2 is null -- no filters. this should not happen because subscriptions has a default
|
|
or array_length($2, 1) is null -- array length of an empty array is null
|
|
or bool_and(
|
|
coalesce(
|
|
realtime.check_equality_op(
|
|
op:=f.op,
|
|
type_:=coalesce(
|
|
col.type_oid::regtype, -- null when wal2json version <= 2.4
|
|
col.type_name::regtype
|
|
),
|
|
-- cast jsonb to text
|
|
val_1:=col.value #>> '{}',
|
|
val_2:=f.value
|
|
),
|
|
false -- if null, filter does not match
|
|
)
|
|
)
|
|
from
|
|
unnest(filters) f
|
|
join unnest(columns) col
|
|
on f.column_name = col.name;
|
|
$_$;
|
|
|
|
|
|
ALTER FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: list_changes(name, name, integer, integer); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer) RETURNS SETOF realtime.wal_rls
|
|
LANGUAGE sql
|
|
SET log_min_messages TO 'fatal'
|
|
AS $$
|
|
with pub as (
|
|
select
|
|
concat_ws(
|
|
',',
|
|
case when bool_or(pubinsert) then 'insert' else null end,
|
|
case when bool_or(pubupdate) then 'update' else null end,
|
|
case when bool_or(pubdelete) then 'delete' else null end
|
|
) as w2j_actions,
|
|
coalesce(
|
|
string_agg(
|
|
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
|
|
','
|
|
) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
|
|
''
|
|
) w2j_add_tables
|
|
from
|
|
pg_publication pp
|
|
left join pg_publication_tables ppt
|
|
on pp.pubname = ppt.pubname
|
|
where
|
|
pp.pubname = publication
|
|
group by
|
|
pp.pubname
|
|
limit 1
|
|
),
|
|
w2j as (
|
|
select
|
|
x.*, pub.w2j_add_tables
|
|
from
|
|
pub,
|
|
pg_logical_slot_get_changes(
|
|
slot_name, null, max_changes,
|
|
'include-pk', 'true',
|
|
'include-transaction', 'false',
|
|
'include-timestamp', 'true',
|
|
'include-type-oids', 'true',
|
|
'format-version', '2',
|
|
'actions', pub.w2j_actions,
|
|
'add-tables', pub.w2j_add_tables
|
|
) x
|
|
)
|
|
select
|
|
xyz.wal,
|
|
xyz.is_rls_enabled,
|
|
xyz.subscription_ids,
|
|
xyz.errors
|
|
from
|
|
w2j,
|
|
realtime.apply_rls(
|
|
wal := w2j.data::jsonb,
|
|
max_record_bytes := max_record_bytes
|
|
) xyz(wal, is_rls_enabled, subscription_ids, errors)
|
|
where
|
|
w2j.w2j_add_tables <> ''
|
|
and xyz.subscription_ids[1] is not null
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: quote_wal2json(regclass); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.quote_wal2json(entity regclass) RETURNS text
|
|
LANGUAGE sql IMMUTABLE STRICT
|
|
AS $$
|
|
select
|
|
(
|
|
select string_agg('' || ch,'')
|
|
from unnest(string_to_array(nsp.nspname::text, null)) with ordinality x(ch, idx)
|
|
where
|
|
not (x.idx = 1 and x.ch = '"')
|
|
and not (
|
|
x.idx = array_length(string_to_array(nsp.nspname::text, null), 1)
|
|
and x.ch = '"'
|
|
)
|
|
)
|
|
|| '.'
|
|
|| (
|
|
select string_agg('' || ch,'')
|
|
from unnest(string_to_array(pc.relname::text, null)) with ordinality x(ch, idx)
|
|
where
|
|
not (x.idx = 1 and x.ch = '"')
|
|
and not (
|
|
x.idx = array_length(string_to_array(nsp.nspname::text, null), 1)
|
|
and x.ch = '"'
|
|
)
|
|
)
|
|
from
|
|
pg_class pc
|
|
join pg_namespace nsp
|
|
on pc.relnamespace = nsp.oid
|
|
where
|
|
pc.oid = entity
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.quote_wal2json(entity regclass) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: send(jsonb, text, text, boolean); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true) RETURNS void
|
|
LANGUAGE plpgsql
|
|
AS $$
|
|
DECLARE
|
|
generated_id uuid;
|
|
final_payload jsonb;
|
|
BEGIN
|
|
BEGIN
|
|
-- Generate a new UUID for the id
|
|
generated_id := gen_random_uuid();
|
|
|
|
-- Check if payload has an 'id' key, if not, add the generated UUID
|
|
IF payload ? 'id' THEN
|
|
final_payload := payload;
|
|
ELSE
|
|
final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id));
|
|
END IF;
|
|
|
|
-- Set the topic configuration
|
|
EXECUTE format('SET LOCAL realtime.topic TO %L', topic);
|
|
|
|
-- Attempt to insert the message
|
|
INSERT INTO realtime.messages (id, payload, event, topic, private, extension)
|
|
VALUES (generated_id, final_payload, event, topic, private, 'broadcast');
|
|
EXCEPTION
|
|
WHEN OTHERS THEN
|
|
-- Capture and notify the error
|
|
RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;
|
|
END;
|
|
END;
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: subscription_check_filters(); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.subscription_check_filters() RETURNS trigger
|
|
LANGUAGE plpgsql
|
|
AS $$
|
|
/*
|
|
Validates that the user defined filters for a subscription:
|
|
- refer to valid columns that the claimed role may access
|
|
- values are coercable to the correct column type
|
|
*/
|
|
declare
|
|
col_names text[] = coalesce(
|
|
array_agg(c.column_name order by c.ordinal_position),
|
|
'{}'::text[]
|
|
)
|
|
from
|
|
information_schema.columns c
|
|
where
|
|
format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity
|
|
and pg_catalog.has_column_privilege(
|
|
(new.claims ->> 'role'),
|
|
format('%I.%I', c.table_schema, c.table_name)::regclass,
|
|
c.column_name,
|
|
'SELECT'
|
|
);
|
|
filter realtime.user_defined_filter;
|
|
col_type regtype;
|
|
|
|
in_val jsonb;
|
|
begin
|
|
for filter in select * from unnest(new.filters) loop
|
|
-- Filtered column is valid
|
|
if not filter.column_name = any(col_names) then
|
|
raise exception 'invalid column for filter %', filter.column_name;
|
|
end if;
|
|
|
|
-- Type is sanitized and safe for string interpolation
|
|
col_type = (
|
|
select atttypid::regtype
|
|
from pg_catalog.pg_attribute
|
|
where attrelid = new.entity
|
|
and attname = filter.column_name
|
|
);
|
|
if col_type is null then
|
|
raise exception 'failed to lookup type for column %', filter.column_name;
|
|
end if;
|
|
|
|
-- Set maximum number of entries for in filter
|
|
if filter.op = 'in'::realtime.equality_op then
|
|
in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);
|
|
if coalesce(jsonb_array_length(in_val), 0) > 100 then
|
|
raise exception 'too many values for `in` filter. Maximum 100';
|
|
end if;
|
|
else
|
|
-- raises an exception if value is not coercable to type
|
|
perform realtime.cast(filter.value, col_type);
|
|
end if;
|
|
|
|
end loop;
|
|
|
|
-- Apply consistent order to filters so the unique constraint on
|
|
-- (subscription_id, entity, filters) can't be tricked by a different filter order
|
|
new.filters = coalesce(
|
|
array_agg(f order by f.column_name, f.op, f.value),
|
|
'{}'
|
|
) from unnest(new.filters) f;
|
|
|
|
return new;
|
|
end;
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.subscription_check_filters() OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: to_regrole(text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.to_regrole(role_name text) RETURNS regrole
|
|
LANGUAGE sql IMMUTABLE
|
|
AS $$ select role_name::regrole $$;
|
|
|
|
|
|
ALTER FUNCTION realtime.to_regrole(role_name text) OWNER TO supabase_admin;
|
|
|
|
--
|
|
-- Name: topic(); Type: FUNCTION; Schema: realtime; Owner: supabase_realtime_admin
|
|
--
|
|
|
|
CREATE FUNCTION realtime.topic() RETURNS text
|
|
LANGUAGE sql STABLE
|
|
AS $$
|
|
select nullif(current_setting('realtime.topic', true), '')::text;
|
|
$$;
|
|
|
|
|
|
ALTER FUNCTION realtime.topic() OWNER TO supabase_realtime_admin;
|
|
|
|
--
|
|
-- Name: can_insert_object(text, text, uuid, jsonb); Type: FUNCTION; Schema: storage; Owner: supabase_storage_admin
|
|
--
|
|
|